diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c5e8429d49..07086d1a45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3282,7 +3282,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a [[LoadDataStatement]]. + * Create a [[LoadData]]. * * For example: * {{{ @@ -3291,8 +3291,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) { - LoadDataStatement( - tableName = visitMultipartIdentifier(ctx.multipartIdentifier), + LoadData( + child = UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)), path = string(ctx.path), isLocal = ctx.LOCAL != null, isOverwrite = ctx.OVERWRITE != null, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index e711a6ad43..246e7f3bcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -347,16 +347,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends */ case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement -/** - * A LOAD DATA INTO TABLE statement, as parsed from SQL - */ -case class LoadDataStatement( - tableName: Seq[String], - path: String, - isLocal: Boolean, - isOverwrite: Boolean, - partition: Option[TablePartitionSpec]) extends ParsedStatement - /** * A SHOW CREATE TABLE statement, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 94d4e7ecfa..b5386f5044 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -261,7 +261,7 @@ case class ReplaceTableAsSelect( } /** - * The logical plan of the CREATE NAMESPACE command that works for v2 catalogs. + * The logical plan of the CREATE NAMESPACE command. */ case class CreateNamespace( catalog: SupportsNamespaces, @@ -270,7 +270,7 @@ case class CreateNamespace( properties: Map[String, String]) extends Command /** - * The logical plan of the DROP NAMESPACE command that works for v2 catalogs. + * The logical plan of the DROP NAMESPACE command. */ case class DropNamespace( namespace: LogicalPlan, @@ -280,7 +280,7 @@ case class DropNamespace( } /** - * The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs. + * The logical plan of the DESCRIBE NAMESPACE command. */ case class DescribeNamespace( namespace: LogicalPlan, @@ -296,7 +296,7 @@ case class DescribeNamespace( /** * The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET (DBPROPERTIES|PROPERTIES) - * command that works for v2 catalogs. + * command. */ case class AlterNamespaceSetProperties( namespace: LogicalPlan, @@ -305,8 +305,7 @@ case class AlterNamespaceSetProperties( } /** - * The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION - * command that works for v2 catalogs. + * The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command. */ case class AlterNamespaceSetLocation( namespace: LogicalPlan, @@ -315,7 +314,7 @@ case class AlterNamespaceSetLocation( } /** - * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. + * The logical plan of the SHOW NAMESPACES command. */ case class ShowNamespaces( namespace: LogicalPlan, @@ -327,7 +326,7 @@ case class ShowNamespaces( } /** - * The logical plan of the DESCRIBE relation_name command that works for v2 tables. + * The logical plan of the DESCRIBE relation_name command. */ case class DescribeRelation( relation: LogicalPlan, @@ -338,7 +337,7 @@ case class DescribeRelation( } /** - * The logical plan of the DESCRIBE relation_name col_name command that works for v2 tables. + * The logical plan of the DESCRIBE relation_name col_name command. */ case class DescribeColumn( relation: LogicalPlan, @@ -349,7 +348,7 @@ case class DescribeColumn( } /** - * The logical plan of the DELETE FROM command that works for v2 tables. + * The logical plan of the DELETE FROM command. */ case class DeleteFromTable( table: LogicalPlan, @@ -358,7 +357,7 @@ case class DeleteFromTable( } /** - * The logical plan of the UPDATE TABLE command that works for v2 tables. + * The logical plan of the UPDATE TABLE command. */ case class UpdateTable( table: LogicalPlan, @@ -368,7 +367,7 @@ case class UpdateTable( } /** - * The logical plan of the MERGE INTO command that works for v2 tables. + * The logical plan of the MERGE INTO command. */ case class MergeIntoTable( targetTable: LogicalPlan, @@ -407,7 +406,7 @@ case class Assignment(key: Expression, value: Expression) extends Expression wit } /** - * The logical plan of the DROP TABLE command that works for v2 tables. + * The logical plan of the DROP TABLE command. */ case class DropTable( child: LogicalPlan, @@ -422,7 +421,7 @@ case class DropTable( case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command /** - * The logical plan of the ALTER TABLE command that works for v2 tables. + * The logical plan of the ALTER TABLE command. */ case class AlterTable( catalog: TableCatalog, @@ -454,7 +453,7 @@ case class AlterTable( } /** - * The logical plan of the ALTER TABLE RENAME command that works for v2 tables. + * The logical plan of the ALTER TABLE RENAME command. */ case class RenameTable( catalog: TableCatalog, @@ -462,7 +461,7 @@ case class RenameTable( newIdent: Identifier) extends Command /** - * The logical plan of the SHOW TABLE command that works for v2 catalogs. + * The logical plan of the SHOW TABLE command. */ case class ShowTables( namespace: LogicalPlan, @@ -475,7 +474,7 @@ case class ShowTables( } /** - * The logical plan of the SHOW VIEWS command that works for v1 and v2 catalogs. + * The logical plan of the SHOW VIEWS command. * * Notes: v2 catalogs do not support views API yet, the command will fallback to * v1 ShowViewsCommand during ResolveSessionCatalog. @@ -491,7 +490,7 @@ case class ShowViews( } /** - * The logical plan of the USE/USE NAMESPACE command that works for v2 catalogs. + * The logical plan of the USE/USE NAMESPACE command. */ case class SetCatalogAndNamespace( catalogManager: CatalogManager, @@ -499,14 +498,14 @@ case class SetCatalogAndNamespace( namespace: Option[Seq[String]]) extends Command /** - * The logical plan of the REFRESH TABLE command that works for v2 catalogs. + * The logical plan of the REFRESH TABLE command. */ case class RefreshTable(child: LogicalPlan) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } /** - * The logical plan of the SHOW CURRENT NAMESPACE command that works for v2 catalogs. + * The logical plan of the SHOW CURRENT NAMESPACE command. */ case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command { override val output: Seq[Attribute] = Seq( @@ -515,7 +514,7 @@ case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command } /** - * The logical plan of the SHOW TBLPROPERTIES command that works for v2 catalogs. + * The logical plan of the SHOW TBLPROPERTIES command. */ case class ShowTableProperties( table: LogicalPlan, @@ -556,21 +555,21 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { } /** - * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. + * The logical plan of the REFRESH FUNCTION command. */ case class RefreshFunction(child: LogicalPlan) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } /** - * The logical plan of the DESCRIBE FUNCTION command that works for v2 catalogs. + * The logical plan of the DESCRIBE FUNCTION command. */ case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } /** - * The logical plan of the DROP FUNCTION command that works for v2 catalogs. + * The logical plan of the DROP FUNCTION command. */ case class DropFunction( child: LogicalPlan, @@ -580,7 +579,7 @@ case class DropFunction( } /** - * The logical plan of the SHOW FUNCTIONS command that works for v2 catalogs. + * The logical plan of the SHOW FUNCTIONS command. */ case class ShowFunctions( child: Option[LogicalPlan], @@ -591,7 +590,7 @@ case class ShowFunctions( } /** - * The logical plan of the ANALYZE TABLE command that works for v2 catalogs. + * The logical plan of the ANALYZE TABLE command. */ case class AnalyzeTable( child: LogicalPlan, @@ -601,7 +600,7 @@ case class AnalyzeTable( } /** - * The logical plan of the ANALYZE TABLE FOR COLUMNS command that works for v2 catalogs. + * The logical plan of the ANALYZE TABLE FOR COLUMNS command. */ case class AnalyzeColumn( child: LogicalPlan, @@ -611,3 +610,15 @@ case class AnalyzeColumn( "mutually exclusive. Only one of them should be specified.") override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the LOAD DATA INTO TABLE command. + */ +case class LoadData( + child: LogicalPlan, + path: String, + isLocal: Boolean, + isOverwrite: Boolean, + partition: Option[TablePartitionSpec]) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index aca7602bdb..085aaf148c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1605,15 +1605,15 @@ class DDLParserSuite extends AnalysisTest { test("LOAD DATA INTO table") { comparePlans( parsePlan("LOAD DATA INPATH 'filepath' INTO TABLE a.b.c"), - LoadDataStatement(Seq("a", "b", "c"), "filepath", false, false, None)) + LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", false, false, None)) comparePlans( parsePlan("LOAD DATA LOCAL INPATH 'filepath' INTO TABLE a.b.c"), - LoadDataStatement(Seq("a", "b", "c"), "filepath", true, false, None)) + LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", true, false, None)) comparePlans( parsePlan("LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c"), - LoadDataStatement(Seq("a", "b", "c"), "filepath", true, true, None)) + LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", true, true, None)) comparePlans( parsePlan( @@ -1621,8 +1621,8 @@ class DDLParserSuite extends AnalysisTest { |LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c |PARTITION(ds='2017-06-10') """.stripMargin), - LoadDataStatement( - Seq("a", "b", "c"), + LoadData( + UnresolvedTable(Seq("a", "b", "c")), "filepath", true, true, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 610632ac92..59652229a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -312,8 +312,8 @@ class ResolveSessionCatalog( ignoreIfExists = c.ifNotExists) } - case RefreshTable(r @ ResolvedTable(_, _, _: V1Table)) if isSessionCatalog(r.catalog) => - RefreshTableCommand(r.identifier.asTableIdentifier) + case RefreshTable(ResolvedV1TableIdentifier(ident)) => + RefreshTableCommand(ident.asTableIdentifier) case RefreshTable(r: ResolvedView) => RefreshTableCommand(r.identifier.asTableIdentifier) @@ -358,9 +358,8 @@ class ResolveSessionCatalog( orCreate = c.orCreate) } - case DropTable( - r @ ResolvedTable(_, _, _: V1Table), ifExists, purge) if isSessionCatalog(r.catalog) => - DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge) + case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) => + DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge) // v1 DROP TABLE supports temp view. case DropTable(r: ResolvedView, ifExists, purge) => @@ -427,10 +426,9 @@ class ResolveSessionCatalog( v1TableName.asTableIdentifier, "MSCK REPAIR TABLE") - case LoadDataStatement(tbl, path, isLocal, isOverwrite, partition) => - val v1TableName = parseV1Table(tbl, "LOAD DATA") + case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => LoadDataCommand( - v1TableName.asTableIdentifier, + ident.asTableIdentifier, path, isLocal, isOverwrite, @@ -573,9 +571,8 @@ class ResolveSessionCatalog( "SHOW VIEWS, only SessionCatalog supports this command.") } - case ShowTableProperties( - r @ ResolvedTable(_, _, _: V1Table), propertyKey) if isSessionCatalog(r.catalog) => - ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) + case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) => + ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey) case ShowTableProperties(r: ResolvedView, propertyKey) => ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) @@ -696,9 +693,16 @@ class ResolveSessionCatalog( } } - object ResolvedV1TableOrViewIdentifier { + object ResolvedV1TableIdentifier { def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match { case ResolvedTable(catalog, ident, _: V1Table) if isSessionCatalog(catalog) => Some(ident) + case _ => None + } + } + + object ResolvedV1TableOrViewIdentifier { + def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match { + case ResolvedV1TableIdentifier(ident) => Some(ident) case ResolvedView(ident, _) => Some(ident) case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 648929eaa3..817b3cecf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -283,6 +283,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case AnalyzeTable(_: ResolvedTable, _, _) | AnalyzeColumn(_: ResolvedTable, _, _) => throw new AnalysisException("ANALYZE TABLE is not supported for v2 tables.") + case LoadData(_: ResolvedTable, _, _, _, _) => + throw new AnalysisException("LOAD DATA is not supported for v2 tables.") + case _ => Nil } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 444daf8233..ee3f7bed7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2074,10 +2074,10 @@ class DataSourceV2SQLSuite |PARTITIONED BY (id) """.stripMargin) - testV1Command("LOAD DATA", s"INPATH 'filepath' INTO TABLE $t") - testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' INTO TABLE $t") - testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t") - testV1Command("LOAD DATA", + testNotSupportedV2Command("LOAD DATA", s"INPATH 'filepath' INTO TABLE $t") + testNotSupportedV2Command("LOAD DATA", s"LOCAL INPATH 'filepath' INTO TABLE $t") + testNotSupportedV2Command("LOAD DATA", s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t") + testNotSupportedV2Command("LOAD DATA", s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t PARTITION(id=1)") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 7a6b0b8d6d..8889ea1777 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -168,17 +168,20 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat") - assertNoSuchTable(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""") - assertNoSuchTable(s"TRUNCATE TABLE $viewName") val e2 = intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""") + }.getMessage + assert(e2.contains(s"$viewName is a temp view not table")) + assertNoSuchTable(s"TRUNCATE TABLE $viewName") + val e3 = intercept[AnalysisException] { sql(s"SHOW CREATE TABLE $viewName") }.getMessage - assert(e2.contains("SHOW CREATE TABLE is not supported on a temporary view")) + assert(e3.contains("SHOW CREATE TABLE is not supported on a temporary view")) assertNoSuchTable(s"SHOW PARTITIONS $viewName") - val e3 = intercept[AnalysisException] { + val e4 = intercept[AnalysisException] { sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") }.getMessage - assert(e3.contains(s"$viewName is a temp view not table or permanent view")) + assert(e4.contains(s"$viewName is a temp view not table or permanent view")) assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") } } @@ -208,7 +211,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { e = intercept[AnalysisException] { sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""") }.getMessage - assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`testview`")) + assert(e.contains("default.testView is a view not table")) e = intercept[AnalysisException] { sql(s"TRUNCATE TABLE $viewName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 082aa8d765..10cb200550 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -496,7 +496,10 @@ private[hive] class TestHiveSparkSession( def getLoadedTables: collection.mutable.HashSet[String] = sharedState.loadedTables def loadTestTable(name: String): Unit = { - if (!sharedState.loadedTables.contains(name)) { + // LOAD DATA does not work on temporary views. Since temporary views are resolved first, + // skip loading if there exists a temporary view with the given name. + if (sessionState.catalog.getTempView(name).isEmpty && + !sharedState.loadedTables.contains(name)) { // Marks the table as loaded first to prevent infinite mutually recursive table loading. sharedState.loadedTables += name logDebug(s"Loading test table $name")