[SPARK-33366][SQL] Migrate LOAD DATA command to use UnresolvedTable to resolve the identifier

### What changes were proposed in this pull request?

This PR proposes to migrate `LOAD DATA` to use `UnresolvedTable` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

Note that `LOAD DATA` is not supported for v2 tables.

### Why are the changes needed?

The changes allow consistent resolution behavior when resolving the table identifier. For example, the following is the current behavior:
```scala
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
sql("CREATE DATABASE db")
sql("CREATE TABLE t (key INT, value STRING) USING hive")
sql("USE db")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE t") // Succeeds
```
With this change, `LOAD DATA` above fails with the following:
```
org.apache.spark.sql.AnalysisException: t is a temp view not table.; line 1 pos 0
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.$anonfun$applyOrElse$39(Analyzer.scala:865)
    at scala.Option.foreach(Option.scala:407)
```
, which is expected since temporary view is resolved first and `LOAD DATA` doesn't support a temporary view.

### Does this PR introduce _any_ user-facing change?

After this PR, `LOAD DATA ... t` is resolved to a temp view `t` instead of table `db.t` in the above scenario.

### How was this patch tested?

Updated existing tests.

Closes #30270 from imback82/load_data_cmd.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2020-11-10 05:28:06 +00:00 committed by Wenchen Fan
parent a1f84d8714
commit 90f6f39e42
9 changed files with 82 additions and 68 deletions

View file

@ -3282,7 +3282,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
* Create a [[LoadDataStatement]].
* Create a [[LoadData]].
*
* For example:
* {{{
@ -3291,8 +3291,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
LoadDataStatement(
tableName = visitMultipartIdentifier(ctx.multipartIdentifier),
LoadData(
child = UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)),
path = string(ctx.path),
isLocal = ctx.LOCAL != null,
isOverwrite = ctx.OVERWRITE != null,

View file

@ -347,16 +347,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement
/**
* A LOAD DATA INTO TABLE statement, as parsed from SQL
*/
case class LoadDataStatement(
tableName: Seq[String],
path: String,
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[TablePartitionSpec]) extends ParsedStatement
/**
* A SHOW CREATE TABLE statement, as parsed from SQL.
*/

View file

@ -261,7 +261,7 @@ case class ReplaceTableAsSelect(
}
/**
* The logical plan of the CREATE NAMESPACE command that works for v2 catalogs.
* The logical plan of the CREATE NAMESPACE command.
*/
case class CreateNamespace(
catalog: SupportsNamespaces,
@ -270,7 +270,7 @@ case class CreateNamespace(
properties: Map[String, String]) extends Command
/**
* The logical plan of the DROP NAMESPACE command that works for v2 catalogs.
* The logical plan of the DROP NAMESPACE command.
*/
case class DropNamespace(
namespace: LogicalPlan,
@ -280,7 +280,7 @@ case class DropNamespace(
}
/**
* The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs.
* The logical plan of the DESCRIBE NAMESPACE command.
*/
case class DescribeNamespace(
namespace: LogicalPlan,
@ -296,7 +296,7 @@ case class DescribeNamespace(
/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET (DBPROPERTIES|PROPERTIES)
* command that works for v2 catalogs.
* command.
*/
case class AlterNamespaceSetProperties(
namespace: LogicalPlan,
@ -305,8 +305,7 @@ case class AlterNamespaceSetProperties(
}
/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION
* command that works for v2 catalogs.
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command.
*/
case class AlterNamespaceSetLocation(
namespace: LogicalPlan,
@ -315,7 +314,7 @@ case class AlterNamespaceSetLocation(
}
/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
* The logical plan of the SHOW NAMESPACES command.
*/
case class ShowNamespaces(
namespace: LogicalPlan,
@ -327,7 +326,7 @@ case class ShowNamespaces(
}
/**
* The logical plan of the DESCRIBE relation_name command that works for v2 tables.
* The logical plan of the DESCRIBE relation_name command.
*/
case class DescribeRelation(
relation: LogicalPlan,
@ -338,7 +337,7 @@ case class DescribeRelation(
}
/**
* The logical plan of the DESCRIBE relation_name col_name command that works for v2 tables.
* The logical plan of the DESCRIBE relation_name col_name command.
*/
case class DescribeColumn(
relation: LogicalPlan,
@ -349,7 +348,7 @@ case class DescribeColumn(
}
/**
* The logical plan of the DELETE FROM command that works for v2 tables.
* The logical plan of the DELETE FROM command.
*/
case class DeleteFromTable(
table: LogicalPlan,
@ -358,7 +357,7 @@ case class DeleteFromTable(
}
/**
* The logical plan of the UPDATE TABLE command that works for v2 tables.
* The logical plan of the UPDATE TABLE command.
*/
case class UpdateTable(
table: LogicalPlan,
@ -368,7 +367,7 @@ case class UpdateTable(
}
/**
* The logical plan of the MERGE INTO command that works for v2 tables.
* The logical plan of the MERGE INTO command.
*/
case class MergeIntoTable(
targetTable: LogicalPlan,
@ -407,7 +406,7 @@ case class Assignment(key: Expression, value: Expression) extends Expression wit
}
/**
* The logical plan of the DROP TABLE command that works for v2 tables.
* The logical plan of the DROP TABLE command.
*/
case class DropTable(
child: LogicalPlan,
@ -422,7 +421,7 @@ case class DropTable(
case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command
/**
* The logical plan of the ALTER TABLE command that works for v2 tables.
* The logical plan of the ALTER TABLE command.
*/
case class AlterTable(
catalog: TableCatalog,
@ -454,7 +453,7 @@ case class AlterTable(
}
/**
* The logical plan of the ALTER TABLE RENAME command that works for v2 tables.
* The logical plan of the ALTER TABLE RENAME command.
*/
case class RenameTable(
catalog: TableCatalog,
@ -462,7 +461,7 @@ case class RenameTable(
newIdent: Identifier) extends Command
/**
* The logical plan of the SHOW TABLE command that works for v2 catalogs.
* The logical plan of the SHOW TABLE command.
*/
case class ShowTables(
namespace: LogicalPlan,
@ -475,7 +474,7 @@ case class ShowTables(
}
/**
* The logical plan of the SHOW VIEWS command that works for v1 and v2 catalogs.
* The logical plan of the SHOW VIEWS command.
*
* Notes: v2 catalogs do not support views API yet, the command will fallback to
* v1 ShowViewsCommand during ResolveSessionCatalog.
@ -491,7 +490,7 @@ case class ShowViews(
}
/**
* The logical plan of the USE/USE NAMESPACE command that works for v2 catalogs.
* The logical plan of the USE/USE NAMESPACE command.
*/
case class SetCatalogAndNamespace(
catalogManager: CatalogManager,
@ -499,14 +498,14 @@ case class SetCatalogAndNamespace(
namespace: Option[Seq[String]]) extends Command
/**
* The logical plan of the REFRESH TABLE command that works for v2 catalogs.
* The logical plan of the REFRESH TABLE command.
*/
case class RefreshTable(child: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
/**
* The logical plan of the SHOW CURRENT NAMESPACE command that works for v2 catalogs.
* The logical plan of the SHOW CURRENT NAMESPACE command.
*/
case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command {
override val output: Seq[Attribute] = Seq(
@ -515,7 +514,7 @@ case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command
}
/**
* The logical plan of the SHOW TBLPROPERTIES command that works for v2 catalogs.
* The logical plan of the SHOW TBLPROPERTIES command.
*/
case class ShowTableProperties(
table: LogicalPlan,
@ -556,21 +555,21 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command {
}
/**
* The logical plan of the REFRESH FUNCTION command that works for v2 catalogs.
* The logical plan of the REFRESH FUNCTION command.
*/
case class RefreshFunction(child: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
/**
* The logical plan of the DESCRIBE FUNCTION command that works for v2 catalogs.
* The logical plan of the DESCRIBE FUNCTION command.
*/
case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
/**
* The logical plan of the DROP FUNCTION command that works for v2 catalogs.
* The logical plan of the DROP FUNCTION command.
*/
case class DropFunction(
child: LogicalPlan,
@ -580,7 +579,7 @@ case class DropFunction(
}
/**
* The logical plan of the SHOW FUNCTIONS command that works for v2 catalogs.
* The logical plan of the SHOW FUNCTIONS command.
*/
case class ShowFunctions(
child: Option[LogicalPlan],
@ -591,7 +590,7 @@ case class ShowFunctions(
}
/**
* The logical plan of the ANALYZE TABLE command that works for v2 catalogs.
* The logical plan of the ANALYZE TABLE command.
*/
case class AnalyzeTable(
child: LogicalPlan,
@ -601,7 +600,7 @@ case class AnalyzeTable(
}
/**
* The logical plan of the ANALYZE TABLE FOR COLUMNS command that works for v2 catalogs.
* The logical plan of the ANALYZE TABLE FOR COLUMNS command.
*/
case class AnalyzeColumn(
child: LogicalPlan,
@ -611,3 +610,15 @@ case class AnalyzeColumn(
"mutually exclusive. Only one of them should be specified.")
override def children: Seq[LogicalPlan] = child :: Nil
}
/**
* The logical plan of the LOAD DATA INTO TABLE command.
*/
case class LoadData(
child: LogicalPlan,
path: String,
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[TablePartitionSpec]) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

View file

@ -1605,15 +1605,15 @@ class DDLParserSuite extends AnalysisTest {
test("LOAD DATA INTO table") {
comparePlans(
parsePlan("LOAD DATA INPATH 'filepath' INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", false, false, None))
LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", false, false, None))
comparePlans(
parsePlan("LOAD DATA LOCAL INPATH 'filepath' INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", true, false, None))
LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", true, false, None))
comparePlans(
parsePlan("LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", true, true, None))
LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", true, true, None))
comparePlans(
parsePlan(
@ -1621,8 +1621,8 @@ class DDLParserSuite extends AnalysisTest {
|LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c
|PARTITION(ds='2017-06-10')
""".stripMargin),
LoadDataStatement(
Seq("a", "b", "c"),
LoadData(
UnresolvedTable(Seq("a", "b", "c")),
"filepath",
true,
true,

View file

@ -312,8 +312,8 @@ class ResolveSessionCatalog(
ignoreIfExists = c.ifNotExists)
}
case RefreshTable(r @ ResolvedTable(_, _, _: V1Table)) if isSessionCatalog(r.catalog) =>
RefreshTableCommand(r.identifier.asTableIdentifier)
case RefreshTable(ResolvedV1TableIdentifier(ident)) =>
RefreshTableCommand(ident.asTableIdentifier)
case RefreshTable(r: ResolvedView) =>
RefreshTableCommand(r.identifier.asTableIdentifier)
@ -358,9 +358,8 @@ class ResolveSessionCatalog(
orCreate = c.orCreate)
}
case DropTable(
r @ ResolvedTable(_, _, _: V1Table), ifExists, purge) if isSessionCatalog(r.catalog) =>
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge)
case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
case DropTable(r: ResolvedView, ifExists, purge) =>
@ -427,10 +426,9 @@ class ResolveSessionCatalog(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")
case LoadDataStatement(tbl, path, isLocal, isOverwrite, partition) =>
val v1TableName = parseV1Table(tbl, "LOAD DATA")
case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
LoadDataCommand(
v1TableName.asTableIdentifier,
ident.asTableIdentifier,
path,
isLocal,
isOverwrite,
@ -573,9 +571,8 @@ class ResolveSessionCatalog(
"SHOW VIEWS, only SessionCatalog supports this command.")
}
case ShowTableProperties(
r @ ResolvedTable(_, _, _: V1Table), propertyKey) if isSessionCatalog(r.catalog) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) =>
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey)
case ShowTableProperties(r: ResolvedView, propertyKey) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
@ -696,9 +693,16 @@ class ResolveSessionCatalog(
}
}
object ResolvedV1TableOrViewIdentifier {
object ResolvedV1TableIdentifier {
def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
case ResolvedTable(catalog, ident, _: V1Table) if isSessionCatalog(catalog) => Some(ident)
case _ => None
}
}
object ResolvedV1TableOrViewIdentifier {
def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
case ResolvedV1TableIdentifier(ident) => Some(ident)
case ResolvedView(ident, _) => Some(ident)
case _ => None
}

View file

@ -283,6 +283,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case AnalyzeTable(_: ResolvedTable, _, _) | AnalyzeColumn(_: ResolvedTable, _, _) =>
throw new AnalysisException("ANALYZE TABLE is not supported for v2 tables.")
case LoadData(_: ResolvedTable, _, _, _, _) =>
throw new AnalysisException("LOAD DATA is not supported for v2 tables.")
case _ => Nil
}
}

View file

@ -2074,10 +2074,10 @@ class DataSourceV2SQLSuite
|PARTITIONED BY (id)
""".stripMargin)
testV1Command("LOAD DATA", s"INPATH 'filepath' INTO TABLE $t")
testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' INTO TABLE $t")
testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t")
testV1Command("LOAD DATA",
testNotSupportedV2Command("LOAD DATA", s"INPATH 'filepath' INTO TABLE $t")
testNotSupportedV2Command("LOAD DATA", s"LOCAL INPATH 'filepath' INTO TABLE $t")
testNotSupportedV2Command("LOAD DATA", s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t")
testNotSupportedV2Command("LOAD DATA",
s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t PARTITION(id=1)")
}
}

View file

@ -168,17 +168,20 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat")
assertNoSuchTable(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""")
assertNoSuchTable(s"TRUNCATE TABLE $viewName")
val e2 = intercept[AnalysisException] {
sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""")
}.getMessage
assert(e2.contains(s"$viewName is a temp view not table"))
assertNoSuchTable(s"TRUNCATE TABLE $viewName")
val e3 = intercept[AnalysisException] {
sql(s"SHOW CREATE TABLE $viewName")
}.getMessage
assert(e2.contains("SHOW CREATE TABLE is not supported on a temporary view"))
assert(e3.contains("SHOW CREATE TABLE is not supported on a temporary view"))
assertNoSuchTable(s"SHOW PARTITIONS $viewName")
val e3 = intercept[AnalysisException] {
val e4 = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
}.getMessage
assert(e3.contains(s"$viewName is a temp view not table or permanent view"))
assert(e4.contains(s"$viewName is a temp view not table or permanent view"))
assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
}
}
@ -208,7 +211,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
e = intercept[AnalysisException] {
sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""")
}.getMessage
assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`testview`"))
assert(e.contains("default.testView is a view not table"))
e = intercept[AnalysisException] {
sql(s"TRUNCATE TABLE $viewName")

View file

@ -496,7 +496,10 @@ private[hive] class TestHiveSparkSession(
def getLoadedTables: collection.mutable.HashSet[String] = sharedState.loadedTables
def loadTestTable(name: String): Unit = {
if (!sharedState.loadedTables.contains(name)) {
// LOAD DATA does not work on temporary views. Since temporary views are resolved first,
// skip loading if there exists a temporary view with the given name.
if (sessionState.catalog.getTempView(name).isEmpty &&
!sharedState.loadedTables.contains(name)) {
// Marks the table as loaded first to prevent infinite mutually recursive table loading.
sharedState.loadedTables += name
logDebug(s"Loading test table $name")