From 7ed008853933d0227cce9e00538a6cf3732d1937 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 22 Jul 2019 12:08:46 +0800 Subject: [PATCH] [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 ## What changes were proposed in this pull request? Implements the `REPLACE TABLE` and `REPLACE TABLE AS SELECT` logical plans. `REPLACE TABLE` is now a valid operation in spark-sql provided that the tables being modified are managed by V2 catalogs. This also introduces an atomic mix-in that table catalogs can choose to implement. Table catalogs can now implement `TransactionalTableCatalog`. The semantics of this API are that table creation and replacement can be "staged" and then "committed". On the execution of `REPLACE TABLE AS SELECT`, `REPLACE TABLE`, and `CREATE TABLE AS SELECT`, if the catalog implements transactional operations, the physical plan will use said functionality. Otherwise, these operations fall back on non-atomic variants. For `REPLACE TABLE` in particular, the usage of non-atomic operations can unfortunately lead to inconsistent state. ## How was this patch tested? Unit tests - multiple additions to `DataSourceV2SQLSuite`. Closes #24798 from mccheah/spark-27724. Authored-by: mcheah Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/parser/SqlBase.g4 | 12 + .../sql/catalog/v2/StagingTableCatalog.java | 142 +++++ .../spark/sql/sources/v2/StagedTable.java | 52 ++ .../CannotReplaceMissingTableException.scala | 29 + .../sql/catalyst/parser/AstBuilder.scala | 74 ++- .../plans/logical/basicLogicalOperators.scala | 41 ++ .../logical/sql/ReplaceTableStatement.scala | 60 ++ .../sql/catalyst/parser/DDLParserSuite.scala | 537 ++++++++++-------- .../datasources/DataSourceResolution.scala | 73 ++- .../datasources/v2/DataSourceV2Strategy.scala | 44 +- .../datasources/v2/ReplaceTableExec.scala | 91 +++ .../v2/WriteToDataSourceV2Exec.scala | 181 +++++- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 169 +++++- .../sources/v2/TestInMemoryTableCatalog.scala | 153 ++++- 14 files changed, 1401 insertions(+), 257 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/StagedTable.java create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ReplaceTableStatement.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index d991e7cf7e..0a142c29a1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -113,6 +113,14 @@ statement (AS? query)? #createHiveTable | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier LIKE source=tableIdentifier locationSpec? #createTableLike + | replaceTableHeader ('(' colTypeList ')')? tableProvider + ((OPTIONS options=tablePropertyList) | + (PARTITIONED BY partitioning=transformList) | + bucketSpec | + locationSpec | + (COMMENT comment=STRING) | + (TBLPROPERTIES tableProps=tablePropertyList))* + (AS? query)? #replaceTable | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS (identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze | ALTER TABLE multipartIdentifier @@ -261,6 +269,10 @@ createTableHeader : CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier ; +replaceTableHeader + : (CREATE OR)? REPLACE TABLE multipartIdentifier + ; + bucketSpec : CLUSTERED BY identifierList (SORTED BY orderedIdentifierList)? diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java new file mode 100644 index 0000000000..fc055e91a6 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.StagedTable; +import org.apache.spark.sql.sources.v2.SupportsWrite; +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of + * the a table before committing the table's metadata along with its contents in CREATE TABLE AS + * SELECT or REPLACE TABLE AS SELECT operations. + *

+ * It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS + * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE + * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first + * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via + * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform + * the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the + * write operation fails, the catalog will have already dropped the table, and the planner cannot + * roll back the dropping of the table. + *

+ * If the catalog implements this plugin, the catalog can implement the methods to "stage" the + * creation and the replacement of a table. After the table's + * {@link BatchWrite#commit(WriterCommitMessage[])} is called, + * {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can + * complete both the data write and the metadata swap operation atomically. + */ +public interface StagingTableCatalog extends TableCatalog { + + /** + * Stage the creation of a table, preparing it to be committed into the metastore. + *

+ * When the table is committed, the contents of any writes performed by the Spark planner are + * committed along with the metadata about the table passed into this method's arguments. If the + * table exists when this method is called, the method should throw an exception accordingly. If + * another process concurrently creates the table before this table's staged changes are + * committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table or view already exists for the identifier + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + StagedTable stageCreate( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException; + + /** + * Stage the replacement of a table, preparing it to be committed into the metastore when the + * returned table's {@link StagedTable#commitStagedChanges()} is called. + *

+ * When the table is committed, the contents of any writes performed by the Spark planner are + * committed along with the metadata about the table passed into this method's arguments. If the + * table exists, the metadata and the contents of this table replace the metadata and contents of + * the existing table. If a concurrent process commits changes to the table's data or metadata + * while the write is being performed but before the staged changes are committed, the catalog + * can decide whether to move forward with the table replacement anyways or abort the commit + * operation. + *

+ * If the table does not exist, committing the staged changes should fail with + * {@link NoSuchTableException}. This differs from the semantics of + * {@link #stageCreateOrReplace(Identifier, StructType, Transform[], Map)}, which should create + * the table in the data source if the table does not exist at the time of committing the + * operation. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + * @throws NoSuchTableException If the table does not exist + */ + StagedTable stageReplace( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws NoSuchNamespaceException, NoSuchTableException; + + /** + * Stage the creation or replacement of a table, preparing it to be committed into the metastore + * when the returned table's {@link StagedTable#commitStagedChanges()} is called. + *

+ * When the table is committed, the contents of any writes performed by the Spark planner are + * committed along with the metadata about the table passed into this method's arguments. If the + * table exists, the metadata and the contents of this table replace the metadata and contents of + * the existing table. If a concurrent process commits changes to the table's data or metadata + * while the write is being performed but before the staged changes are committed, the catalog + * can decide whether to move forward with the table replacement anyways or abort the commit + * operation. + *

+ * If the table does not exist when the changes are committed, the table should be created in the + * backing data source. This differs from the expected semantics of + * {@link #stageReplace(Identifier, StructType, Transform[], Map)}, which should fail when + * the staged changes are committed but the table doesn't exist at commit time. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions transforms to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws UnsupportedOperationException If a requested partition transform is not supported + * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) + */ + StagedTable stageCreateOrReplace( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws NoSuchNamespaceException; +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/StagedTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/StagedTable.java new file mode 100644 index 0000000000..b2baa93b14 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/StagedTable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Map; +import org.apache.spark.sql.catalog.v2.Identifier; +import org.apache.spark.sql.catalog.v2.StagingTableCatalog; +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * Represents a table which is staged for being committed to the metastore. + *

+ * This is used to implement atomic CREATE TABLE AS SELECT and REPLACE TABLE AS SELECT queries. The + * planner will create one of these via + * {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or + * {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the + * table for being written to. This table should usually implement {@link SupportsWrite}. A new + * writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}, + * and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()}, + * at which point implementations are expected to commit the table's metadata into the metastore + * along with the data that was written by the writes from the write builder this table created. + */ +public interface StagedTable extends Table { + + /** + * Finalize the creation or replacement of this table. + */ + void commitStagedChanges(); + + /** + * Abort the changes that were staged, both in metadata and from temporary outputs of this + * table's writers. + */ + void abortStagedChanges(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala new file mode 100644 index 0000000000..3036f7c210 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalog.v2.Identifier + +class CannotReplaceMissingTableException( + tableIdentifier: Identifier, + cause: Option[Throwable] = None) + extends AnalysisException( + s"Table $tableIdentifier cannot be replaced as it did not exist." + + s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d9f8b9a720..e599128f83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -2127,6 +2127,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging (multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null) } + /** + * Validate a replace table statement and return the [[TableIdentifier]]. + */ + override def visitReplaceTableHeader( + ctx: ReplaceTableHeaderContext): TableHeader = withOrigin(ctx) { + val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText) + (multipartIdentifier, false, false, false) + } + /** * Parse a qualified name to a multipart name. */ @@ -2294,6 +2303,69 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Replace a table, returning a [[ReplaceTableStatement]] logical plan. + * + * Expected format: + * {{{ + * [CREATE OR] REPLACE TABLE [db_name.]table_name + * USING table_provider + * replace_table_clauses + * [[AS] select_statement]; + * + * replace_table_clauses (order insensitive): + * [OPTIONS table_property_list] + * [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)] + * [CLUSTERED BY (col_name, col_name, ...) + * [SORTED BY (col_name [ASC|DESC], ...)] + * INTO num_buckets BUCKETS + * ] + * [LOCATION path] + * [COMMENT table_comment] + * [TBLPROPERTIES (property_name=property_value, ...)] + * }}} + */ + override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) { + val (table, _, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader) + if (external) { + operationNotAllowed("REPLACE EXTERNAL TABLE ... USING", ctx) + } + + checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx) + checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx) + checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx) + checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) + checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx) + checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) + + val schema = Option(ctx.colTypeList()).map(createSchema) + val partitioning: Seq[Transform] = + Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil) + val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) + val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + + val provider = ctx.tableProvider.qualifiedName.getText + val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec) + val comment = Option(ctx.comment).map(string) + val orCreate = ctx.replaceTableHeader().CREATE() != null + + Option(ctx.query).map(plan) match { + case Some(_) if schema.isDefined => + operationNotAllowed( + "Schema may not be specified in a Replace Table As Select (RTAS) statement", + ctx) + + case Some(query) => + ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties, + provider, options, location, comment, orCreate = orCreate) + + case _ => + ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning, + bucketSpec, properties, provider, options, location, comment, orCreate = orCreate) + } + } + /** * Create a [[DropTableStatement]] command. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2cb04c9ec7..2698ba282f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -441,6 +441,47 @@ case class CreateTableAsSelect( } } +/** + * Replace a table with a v2 catalog. + * + * If the table does not exist, and orCreate is true, then it will be created. + * If the table does not exist, and orCreate is false, then an exception will be thrown. + * + * The persisted table will have no contents as a result of this operation. + */ +case class ReplaceTable( + catalog: TableCatalog, + tableName: Identifier, + tableSchema: StructType, + partitioning: Seq[Transform], + properties: Map[String, String], + orCreate: Boolean) extends Command + +/** + * Replaces a table from a select query with a v2 catalog. + * + * If the table does not exist, and orCreate is true, then it will be created. + * If the table does not exist, and orCreate is false, then an exception will be thrown. + */ +case class ReplaceTableAsSelect( + catalog: TableCatalog, + tableName: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + properties: Map[String, String], + writeOptions: Map[String, String], + orCreate: Boolean) extends Command { + + override def children: Seq[LogicalPlan] = Seq(query) + + override lazy val resolved: Boolean = { + // the table schema is created from the query schema, so the only resolution needed is to check + // that the columns referenced by the table's partitioning exist in the query schema + val references = partitioning.flatMap(_.references).toSet + references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined) + } +} + /** * Append data to an existing table. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ReplaceTableStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ReplaceTableStatement.scala new file mode 100644 index 0000000000..2808892b08 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ReplaceTableStatement.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.StructType + +/** + * A REPLACE TABLE command, as parsed from SQL. + * + * If the table exists prior to running this command, executing this statement + * will replace the table's metadata and clear the underlying rows from the table. + */ +case class ReplaceTableStatement( + tableName: Seq[String], + tableSchema: StructType, + partitioning: Seq[Transform], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + orCreate: Boolean) extends ParsedStatement + +/** + * A REPLACE TABLE AS SELECT command, as parsed from SQL. + */ +case class ReplaceTableAsSelectStatement( + tableName: Seq[String], + asSelect: LogicalPlan, + partitioning: Seq[Transform], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + orCreate: Boolean) extends ParsedStatement { + + override def children: Seq[LogicalPlan] = Seq(asSelect) +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index d008b3c78f..dd84170e26 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale -import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} +import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -47,82 +47,71 @@ class DDLParserSuite extends AnalysisTest { comparePlans(parsePlan(sql), expected, checkAnalysis = false) } - test("create table using - schema") { - val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" + test("create/replace table using - schema") { + val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" + val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType)), + Seq.empty[Transform], + None, + Map.empty[String, String], + "parquet", + Map.empty[String, String], + None, + None) - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType() - .add("a", IntegerType, nullable = true, "test") - .add("b", StringType)) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet", "no viable alternative at input") } - test("create table - with IF NOT EXISTS") { + test("create/replace table - with IF NOT EXISTS") { val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING parquet" - - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") - } + testCreateOrReplaceDdl( + sql, + TableSpec( + Seq("my_tab"), + Some(new StructType().add("a", IntegerType).add("b", StringType)), + Seq.empty[Transform], + None, + Map.empty[String, String], + "parquet", + Map.empty[String, String], + None, + None), + expectedIfNotExists = true) } - test("create table - with partitioned by") { - val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + + test("create/replace table - with partitioned by") { + val createSql = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + "USING parquet PARTITIONED BY (a)" - - parsePlan(query) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType() - .add("a", IntegerType, nullable = true, "test") - .add("b", StringType)) - assert(create.partitioning == Seq(IdentityTransform(FieldReference("a")))) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") + val replaceSql = "REPLACE TABLE my_tab(a INT comment 'test', b STRING) " + + "USING parquet PARTITIONED BY (a)" + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType() + .add("a", IntegerType, nullable = true, "test") + .add("b", StringType)), + Seq(IdentityTransform(FieldReference("a"))), + None, + Map.empty[String, String], + "parquet", + Map.empty[String, String], + None, + None) + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } } - test("create table - partitioned by transforms") { - val sql = + test("create/replace table - partitioned by transforms") { + val createSql = """ |CREATE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet |PARTITIONED BY ( @@ -135,154 +124,151 @@ class DDLParserSuite extends AnalysisTest { | foo(a, "bar", 34)) """.stripMargin - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType() - .add("a", IntegerType) - .add("b", StringType) - .add("ts", TimestampType)) - assert(create.partitioning == Seq( - IdentityTransform(FieldReference("a")), - BucketTransform(LiteralValue(16, IntegerType), Seq(FieldReference("b"))), - YearsTransform(FieldReference("ts")), - MonthsTransform(FieldReference("ts")), - DaysTransform(FieldReference("ts")), - HoursTransform(FieldReference("ts")), - ApplyTransform("foo", Seq( - FieldReference("a"), - LiteralValue(UTF8String.fromString("bar"), StringType), - LiteralValue(34, IntegerType))))) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + val replaceSql = + """ + |REPLACE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet + |PARTITIONED BY ( + | a, + | bucket(16, b), + | years(ts), + | months(ts), + | days(ts), + | hours(ts), + | foo(a, "bar", 34)) + """.stripMargin + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType() + .add("a", IntegerType) + .add("b", StringType) + .add("ts", TimestampType)), + Seq( + IdentityTransform(FieldReference("a")), + BucketTransform(LiteralValue(16, IntegerType), Seq(FieldReference("b"))), + YearsTransform(FieldReference("ts")), + MonthsTransform(FieldReference("ts")), + DaysTransform(FieldReference("ts")), + HoursTransform(FieldReference("ts")), + ApplyTransform("foo", Seq( + FieldReference("a"), + LiteralValue(UTF8String.fromString("bar"), StringType), + LiteralValue(34, IntegerType)))), + None, + Map.empty[String, String], + "parquet", + Map.empty[String, String], + None, + None) + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } } - test("create table - with bucket") { - val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + + test("create/replace table - with bucket") { + val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" - parsePlan(query) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.contains(BucketSpec(5, Seq("a"), Seq("b")))) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) + val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet " + + "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS" - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $query") + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType().add("a", IntegerType).add("b", StringType)), + Seq.empty[Transform], + Some(BucketSpec(5, Seq("a"), Seq("b"))), + Map.empty[String, String], + "parquet", + Map.empty[String, String], + None, + None) + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } } - test("create table - with comment") { - val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" - - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.contains("abc")) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("create/replace table - with comment") { + val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'" + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType().add("a", IntegerType).add("b", StringType)), + Seq.empty[Transform], + None, + Map.empty[String, String], + "parquet", + Map.empty[String, String], + None, + Some("abc")) + Seq(createSql, replaceSql).foreach{ sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } } - test("create table - with table properties") { - val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')" - - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties == Map("test" -> "test")) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("create/replace table - with table properties") { + val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet" + + " TBLPROPERTIES('test' = 'test')" + val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet" + + " TBLPROPERTIES('test' = 'test')" + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType().add("a", IntegerType).add("b", StringType)), + Seq.empty[Transform], + None, + Map("test" -> "test"), + "parquet", + Map.empty[String, String], + None, + None) + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } } - test("create table - with location") { - val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" - - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("my_tab")) - assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType)) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.contains("/tmp/file")) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("create/replace table - with location") { + val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'" + val expectedTableSpec = TableSpec( + Seq("my_tab"), + Some(new StructType().add("a", IntegerType).add("b", StringType)), + Seq.empty[Transform], + None, + Map.empty[String, String], + "parquet", + Map.empty[String, String], + Some("/tmp/file"), + None) + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } } - test("create table - byte length literal table name") { - val sql = "CREATE TABLE 1m.2g(a INT) USING parquet" - - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("1m", "2g")) - assert(create.tableSchema == new StructType().add("a", IntegerType)) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + test("create/replace table - byte length literal table name") { + val createSql = "CREATE TABLE 1m.2g(a INT) USING parquet" + val replaceSql = "REPLACE TABLE 1m.2g(a INT) USING parquet" + val expectedTableSpec = TableSpec( + Seq("1m", "2g"), + Some(new StructType().add("a", IntegerType)), + Seq.empty[Transform], + None, + Map.empty[String, String], + "parquet", + Map.empty[String, String], + None, + None) + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false) } } - test("Duplicate clauses - create table") { + test("Duplicate clauses - create/replace table") { def createTableHeader(duplicateClause: String): String = { s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause" } + def replaceTableHeader(duplicateClause: String): String = { + s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause" + } + intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"), "Found duplicate clauses: TBLPROPERTIES") intercept(createTableHeader("LOCATION '/tmp/file'"), @@ -293,31 +279,44 @@ class DDLParserSuite extends AnalysisTest { "Found duplicate clauses: CLUSTERED BY") intercept(createTableHeader("PARTITIONED BY (b)"), "Found duplicate clauses: PARTITIONED BY") + + intercept(replaceTableHeader("TBLPROPERTIES('test' = 'test2')"), + "Found duplicate clauses: TBLPROPERTIES") + intercept(replaceTableHeader("LOCATION '/tmp/file'"), + "Found duplicate clauses: LOCATION") + intercept(replaceTableHeader("COMMENT 'a table'"), + "Found duplicate clauses: COMMENT") + intercept(replaceTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"), + "Found duplicate clauses: CLUSTERED BY") + intercept(replaceTableHeader("PARTITIONED BY (b)"), + "Found duplicate clauses: PARTITIONED BY") } test("support for other types in OPTIONS") { - val sql = + val createSql = """ |CREATE TABLE table_name USING json |OPTIONS (a 1, b 0.1, c TRUE) """.stripMargin - - parsePlan(sql) match { - case create: CreateTableStatement => - assert(create.tableName == Seq("table_name")) - assert(create.tableSchema == new StructType) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties.isEmpty) - assert(create.provider == "json") - assert(create.options == Map("a" -> "1", "b" -> "0.1", "c" -> "true")) - assert(create.location.isEmpty) - assert(create.comment.isEmpty) - assert(!create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," + - s"got ${other.getClass.getName}: $sql") + val replaceSql = + """ + |REPLACE TABLE table_name USING json + |OPTIONS (a 1, b 0.1, c TRUE) + """.stripMargin + Seq(createSql, replaceSql).foreach { sql => + testCreateOrReplaceDdl( + sql, + TableSpec( + Seq("table_name"), + Some(new StructType), + Seq.empty[Transform], + Option.empty[BucketSpec], + Map.empty[String, String], + "json", + Map("a" -> "1", "b" -> "0.1", "c" -> "true"), + None, + None), + expectedIfNotExists = false) } } @@ -352,27 +351,28 @@ class DDLParserSuite extends AnalysisTest { |AS SELECT * FROM src """.stripMargin - checkParsing(s1) - checkParsing(s2) - checkParsing(s3) + val s4 = + """ + |REPLACE TABLE mydb.page_view + |USING parquet + |COMMENT 'This is the staging page view table' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src + """.stripMargin - def checkParsing(sql: String): Unit = { - parsePlan(sql) match { - case create: CreateTableAsSelectStatement => - assert(create.tableName == Seq("mydb", "page_view")) - assert(create.partitioning.isEmpty) - assert(create.bucketSpec.isEmpty) - assert(create.properties == Map("p1" -> "v1", "p2" -> "v2")) - assert(create.provider == "parquet") - assert(create.options.isEmpty) - assert(create.location.contains("/user/external/page_view")) - assert(create.comment.contains("This is the staging page view table")) - assert(create.ifNotExists) - - case other => - fail(s"Expected to parse ${classOf[CreateTableAsSelectStatement].getClass.getName} " + - s"from query, got ${other.getClass.getName}: $sql") - } + val expectedTableSpec = TableSpec( + Seq("mydb", "page_view"), + None, + Seq.empty[Transform], + None, + Map("p1" -> "v1", "p2" -> "v2"), + "parquet", + Map.empty[String, String], + Some("/user/external/page_view"), + Some("This is the staging page view table")) + Seq(s1, s2, s3, s4).foreach { sql => + testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = true) } } @@ -403,6 +403,28 @@ class DDLParserSuite extends AnalysisTest { parseCompare(s"DROP VIEW IF EXISTS view", DropViewStatement(Seq("view"), ifExists = true)) } + private def testCreateOrReplaceDdl( + sqlStatement: String, + tableSpec: TableSpec, + expectedIfNotExists: Boolean) { + val parsedPlan = parsePlan(sqlStatement) + val newTableToken = sqlStatement.split(" ")(0).trim.toUpperCase(Locale.ROOT) + parsedPlan match { + case create: CreateTableStatement if newTableToken == "CREATE" => + assert(create.ifNotExists == expectedIfNotExists) + case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => + assert(ctas.ifNotExists == expectedIfNotExists) + case replace: ReplaceTableStatement if newTableToken == "REPLACE" => + case replace: ReplaceTableAsSelectStatement if newTableToken == "REPLACE" => + case other => + fail("First token in statement does not match the expected parsed plan; CREATE TABLE" + + " should create a CreateTableStatement, and REPLACE TABLE should create a" + + s" ReplaceTableStatement. Statement: $sqlStatement, plan type:" + + s" ${parsedPlan.getClass.getName}.") + } + assert(TableSpec(parsedPlan) === tableSpec) + } + // ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment); // ALTER VIEW view_name UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key'); test("alter view: alter view properties") { @@ -593,4 +615,69 @@ class DDLParserSuite extends AnalysisTest { Seq(Seq("x"), Seq("y"), Seq("a", "b", "c")))) } } + + private case class TableSpec( + name: Seq[String], + schema: Option[StructType], + partitioning: Seq[Transform], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String]) + + private object TableSpec { + def apply(plan: LogicalPlan): TableSpec = { + plan match { + case create: CreateTableStatement => + TableSpec( + create.tableName, + Some(create.tableSchema), + create.partitioning, + create.bucketSpec, + create.properties, + create.provider, + create.options, + create.location, + create.comment) + case replace: ReplaceTableStatement => + TableSpec( + replace.tableName, + Some(replace.tableSchema), + replace.partitioning, + replace.bucketSpec, + replace.properties, + replace.provider, + replace.options, + replace.location, + replace.comment) + case ctas: CreateTableAsSelectStatement => + TableSpec( + ctas.tableName, + Some(ctas.asSelect).filter(_.resolved).map(_.schema), + ctas.partitioning, + ctas.bucketSpec, + ctas.properties, + ctas.provider, + ctas.options, + ctas.location, + ctas.comment) + case rtas: ReplaceTableAsSelectStatement => + TableSpec( + rtas.tableName, + Some(rtas.asSelect).filter(_.resolved).map(_.schema), + rtas.partitioning, + rtas.bucketSpec, + rtas.properties, + rtas.provider, + rtas.options, + rtas.location, + rtas.comment) + case other => + fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" + + s" from query, got ${other.getClass.getName}.") + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 1b7bb169b3..8685d2f7a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand} import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} @@ -94,6 +94,38 @@ case class DataSourceResolution( convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create) } + case ReplaceTableStatement( + AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, + V1WriteProvider(provider), options, location, comment, orCreate) => + throw new AnalysisException( + s"Replacing tables is not supported using the legacy / v1 Spark external catalog" + + s" API. Write provider name: $provider, identifier: $table.") + + case ReplaceTableAsSelectStatement( + AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, + V1WriteProvider(provider), options, location, comment, orCreate) => + throw new AnalysisException( + s"Replacing tables is not supported using the legacy / v1 Spark external catalog" + + s" API. Write provider name: $provider, identifier: $table.") + + case replace: ReplaceTableStatement => + // the provider was not a v1 source, convert to a v2 plan + val CatalogObjectIdentifier(maybeCatalog, identifier) = replace.tableName + val catalog = maybeCatalog.orElse(defaultCatalog) + .getOrElse(throw new AnalysisException( + s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) + .asTableCatalog + convertReplaceTable(catalog, identifier, replace) + + case rtas: ReplaceTableAsSelectStatement => + // the provider was not a v1 source, convert to a v2 plan + val CatalogObjectIdentifier(maybeCatalog, identifier) = rtas.tableName + val catalog = maybeCatalog.orElse(defaultCatalog) + .getOrElse(throw new AnalysisException( + s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) + .asTableCatalog + convertRTAS(catalog, identifier, rtas) + case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) => DropTable(catalog.asTableCatalog, ident, ifExists) @@ -226,6 +258,43 @@ case class DataSourceResolution( ignoreIfExists = create.ifNotExists) } + private def convertRTAS( + catalog: TableCatalog, + identifier: Identifier, + rtas: ReplaceTableAsSelectStatement): ReplaceTableAsSelect = { + // convert the bucket spec and add it as a transform + val partitioning = rtas.partitioning ++ rtas.bucketSpec.map(_.asTransform) + val properties = convertTableProperties( + rtas.properties, rtas.options, rtas.location, rtas.comment, rtas.provider) + + ReplaceTableAsSelect( + catalog, + identifier, + partitioning, + rtas.asSelect, + properties, + writeOptions = rtas.options.filterKeys(_ != "path"), + orCreate = rtas.orCreate) + } + + private def convertReplaceTable( + catalog: TableCatalog, + identifier: Identifier, + replace: ReplaceTableStatement): ReplaceTable = { + // convert the bucket spec and add it as a transform + val partitioning = replace.partitioning ++ replace.bucketSpec.map(_.asTransform) + val properties = convertTableProperties( + replace.properties, replace.options, replace.location, replace.comment, replace.provider) + + ReplaceTable( + catalog, + identifier, + replace.tableSchema, + partitioning, + properties, + orCreate = replace.orCreate) + } + private def convertTableProperties( properties: Map[String, String], options: Map[String, String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4f8507da39..52e2896536 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -21,9 +21,10 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} +import org.apache.spark.sql.catalog.v2.StagingTableCatalog import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -165,8 +166,45 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => val writeOptions = new CaseInsensitiveStringMap(options.asJava) - CreateTableAsSelectExec( - catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil + catalog match { + case staging: StagingTableCatalog => + AtomicCreateTableAsSelectExec( + staging, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil + case _ => + CreateTableAsSelectExec( + catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil + } + + case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => + catalog match { + case staging: StagingTableCatalog => + AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil + case _ => + ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil + } + + case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) => + val writeOptions = new CaseInsensitiveStringMap(options.asJava) + catalog match { + case staging: StagingTableCatalog => + AtomicReplaceTableAsSelectExec( + staging, + ident, + parts, + planLater(query), + props, + writeOptions, + orCreate = orCreate) :: Nil + case _ => + ReplaceTableAsSelectExec( + catalog, + ident, + parts, + planLater(query), + props, + writeOptions, + orCreate = orCreate) :: Nil + } case AppendData(r: DataSourceV2Relation, query, _) => AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala new file mode 100644 index 0000000000..35d86ee2ab --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog} +import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.sources.v2.StagedTable +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils + +case class ReplaceTableExec( + catalog: TableCatalog, + ident: Identifier, + tableSchema: StructType, + partitioning: Seq[Transform], + tableProperties: Map[String, String], + orCreate: Boolean) extends LeafExecNode { + + override protected def doExecute(): RDD[InternalRow] = { + if (catalog.tableExists(ident)) { + catalog.dropTable(ident) + } else if (!orCreate) { + throw new CannotReplaceMissingTableException(ident) + } + catalog.createTable(ident, tableSchema, partitioning.toArray, tableProperties.asJava) + sqlContext.sparkContext.parallelize(Seq.empty, 1) + } + + override def output: Seq[Attribute] = Seq.empty +} + +case class AtomicReplaceTableExec( + catalog: StagingTableCatalog, + identifier: Identifier, + tableSchema: StructType, + partitioning: Seq[Transform], + tableProperties: Map[String, String], + orCreate: Boolean) extends LeafExecNode { + + override protected def doExecute(): RDD[InternalRow] = { + val staged = if (orCreate) { + catalog.stageCreateOrReplace( + identifier, tableSchema, partitioning.toArray, tableProperties.asJava) + } else if (catalog.tableExists(identifier)) { + try { + catalog.stageReplace( + identifier, tableSchema, partitioning.toArray, tableProperties.asJava) + } catch { + case e: NoSuchTableException => + throw new CannotReplaceMissingTableException(identifier, Some(e)) + } + } else { + throw new CannotReplaceMissingTableException(identifier) + } + commitOrAbortStagedChanges(staged) + + sqlContext.sparkContext.parallelize(Seq.empty, 1) + } + + override def output: Seq[Attribute] = Seq.empty + + private def commitOrAbortStagedChanges(staged: StagedTable): Unit = { + Utils.tryWithSafeFinallyAndFailureCallbacks({ + staged.commitStagedChanges() + })(catchBlock = { + staged.abortStagedChanges() + }) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 6c771ea988..9f644de192 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -26,15 +26,15 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog} +import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.{AlwaysTrue, Filter} -import org.apache.spark.sql.sources.v2.SupportsWrite +import org.apache.spark.sql.sources.v2.{StagedTable, SupportsWrite} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LongAccumulator, Utils} @@ -51,11 +51,13 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) } /** - * Physical plan node for v2 create table as select. + * Physical plan node for v2 create table as select when the catalog does not support staging + * the table creation. * * A new table will be created using the schema of the query, and rows from the query are appended. - * If either table creation or the append fails, the table will be deleted. This implementation does - * not provide an atomic CTAS. + * If either table creation or the append fails, the table will be deleted. This implementation is + * not atomic; for an atomic variant for catalogs that support the appropriate features, see + * CreateTableAsSelectStagingExec. */ case class CreateTableAsSelectExec( catalog: TableCatalog, @@ -78,7 +80,8 @@ case class CreateTableAsSelectExec( } Utils.tryWithSafeFinallyAndFailureCallbacks({ - catalog.createTable(ident, query.schema, partitioning.toArray, properties.asJava) match { + catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) match { case table: SupportsWrite => val batchWrite = table.newWriteBuilder(writeOptions) .withInputDataSchema(query.schema) @@ -89,15 +92,145 @@ case class CreateTableAsSelectExec( case _ => // table does not support writes - throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}") + throw new SparkException( + s"Table implementation does not support writes: ${ident.quoted}") } - })(catchBlock = { catalog.dropTable(ident) }) } } +/** + * Physical plan node for v2 create table as select, when the catalog is determined to support + * staging table creation. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * The CTAS operation is atomic. The creation of the table is staged and the commit of the write + * should bundle the commitment of the metadata and the table contents in a single unit. If the + * write fails, the table is instructed to roll back all staged changes. + */ +case class AtomicCreateTableAsSelectExec( + catalog: StagingTableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: SparkPlan, + properties: Map[String, String], + writeOptions: CaseInsensitiveStringMap, + ifNotExists: Boolean) extends AtomicTableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { + if (catalog.tableExists(ident)) { + if (ifNotExists) { + return sparkContext.parallelize(Seq.empty, 1) + } + + throw new TableAlreadyExistsException(ident) + } + val stagedTable = catalog.stageCreate( + ident, query.schema, partitioning.toArray, properties.asJava) + writeToStagedTable(stagedTable, writeOptions, ident) + } +} + +/** + * Physical plan node for v2 replace table as select when the catalog does not support staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic + * CTAS. For an atomic implementation for catalogs with the appropriate support, see + * ReplaceTableAsSelectStagingExec. + */ +case class ReplaceTableAsSelectExec( + catalog: TableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: SparkPlan, + properties: Map[String, String], + writeOptions: CaseInsensitiveStringMap, + orCreate: Boolean) extends AtomicTableWriteExec { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper + + override protected def doExecute(): RDD[InternalRow] = { + // Note that this operation is potentially unsafe, but these are the strict semantics of + // RTAS if the catalog does not support atomic operations. + // + // There are numerous cases we concede to where the table will be dropped and irrecoverable: + // + // 1. Creating the new table fails, + // 2. Writing to the new table fails, + // 3. The table returned by catalog.createTable doesn't support writing. + if (catalog.tableExists(ident)) { + catalog.dropTable(ident) + } else if (!orCreate) { + throw new CannotReplaceMissingTableException(ident) + } + val createdTable = catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) + Utils.tryWithSafeFinallyAndFailureCallbacks({ + createdTable match { + case table: SupportsWrite => + val batchWrite = table.newWriteBuilder(writeOptions) + .withInputDataSchema(query.schema) + .withQueryId(UUID.randomUUID().toString) + .buildForBatch() + + doWrite(batchWrite) + + case _ => + // table does not support writes + throw new SparkException( + s"Table implementation does not support writes: ${ident.quoted}") + } + })(catchBlock = { + catalog.dropTable(ident) + }) + } +} + +/** + * + * Physical plan node for v2 replace table as select when the catalog supports staging + * table replacement. + * + * A new table will be created using the schema of the query, and rows from the query are appended. + * If the table exists, its contents and schema should be replaced with the schema and the contents + * of the query. This implementation is atomic. The table replacement is staged, and the commit + * operation at the end should perform tne replacement of the table's metadata and contents. If the + * write fails, the table is instructed to roll back staged changes and any previously written table + * is left untouched. + */ +case class AtomicReplaceTableAsSelectExec( + catalog: StagingTableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: SparkPlan, + properties: Map[String, String], + writeOptions: CaseInsensitiveStringMap, + orCreate: Boolean) extends AtomicTableWriteExec { + + override protected def doExecute(): RDD[InternalRow] = { + val staged = if (orCreate) { + catalog.stageCreateOrReplace( + ident, query.schema, partitioning.toArray, properties.asJava) + } else if (catalog.tableExists(ident)) { + try { + catalog.stageReplace( + ident, query.schema, partitioning.toArray, properties.asJava) + } catch { + case e: NoSuchTableException => + throw new CannotReplaceMissingTableException(ident, Some(e)) + } + } else { + throw new CannotReplaceMissingTableException(ident) + } + writeToStagedTable(staged, writeOptions, ident) + } +} + /** * Physical plan node for append into a v2 table. * @@ -330,6 +463,36 @@ object DataWritingSparkTask extends Logging { } } +private[v2] trait AtomicTableWriteExec extends V2TableWriteExec { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper + + protected def writeToStagedTable( + stagedTable: StagedTable, + writeOptions: CaseInsensitiveStringMap, + ident: Identifier): RDD[InternalRow] = { + Utils.tryWithSafeFinallyAndFailureCallbacks({ + stagedTable match { + case table: SupportsWrite => + val batchWrite = table.newWriteBuilder(writeOptions) + .withInputDataSchema(query.schema) + .withQueryId(UUID.randomUUID().toString) + .buildForBatch() + + val writtenRows = doWrite(batchWrite) + stagedTable.commitStagedChanges() + writtenRows + case _ => + // Table does not support writes - staged changes are also rolled back below. + throw new SparkException( + s"Table implementation does not support writes: ${ident.quoted}") + } + })(catchBlock = { + // Failure rolls back the staged writes and metadata changes. + stagedTable.abortStagedChanges() + }) + } +} + private[v2] case class DataWritingSparkTaskResult( numRows: Long, writerCommitMessage: WriterCommitMessage) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index c90090aca3..c173bdb953 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalog.v2.Identifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG @@ -39,6 +39,8 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn before { spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set( + "spark.sql.catalog.testcat_atomic", classOf[TestStagingInMemoryCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) spark.conf.set(V2_SESSION_CATALOG.key, classOf[TestInMemoryTableCatalog].getName) @@ -50,6 +52,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn after { spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() + spark.catalog("testcat_atomic").asInstanceOf[TestInMemoryTableCatalog].clearTables() spark.catalog("session").asInstanceOf[TestInMemoryTableCatalog].clearTables() } @@ -159,20 +162,168 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } test("CreateTableAsSelect: use v2 plan because catalog is set") { + val basicCatalog = spark.catalog("testcat").asTableCatalog + val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog + val basicIdentifier = "testcat.table_name" + val atomicIdentifier = "testcat_atomic.table_name" + + Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach { + case (catalog, identifier) => + spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source") + + val table = catalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(table.name == identifier) + assert(table.partitioning.isEmpty) + assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.schema == new StructType() + .add("id", LongType, nullable = false) + .add("data", StringType)) + + val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) + checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + } + } + + test("ReplaceTableAsSelect: basic v2 implementation.") { + val basicCatalog = spark.catalog("testcat").asTableCatalog + val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog + val basicIdentifier = "testcat.table_name" + val atomicIdentifier = "testcat_atomic.table_name" + + Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach { + case (catalog, identifier) => + spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source") + val originalTable = catalog.loadTable(Identifier.of(Array(), "table_name")) + + spark.sql(s"REPLACE TABLE $identifier USING foo AS SELECT id FROM source") + val replacedTable = catalog.loadTable(Identifier.of(Array(), "table_name")) + + assert(replacedTable != originalTable, "Table should have been replaced.") + assert(replacedTable.name == identifier) + assert(replacedTable.partitioning.isEmpty) + assert(replacedTable.properties == Map("provider" -> "foo").asJava) + assert(replacedTable.schema == new StructType() + .add("id", LongType, nullable = false)) + + val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows) + checkAnswer( + spark.internalCreateDataFrame(rdd, replacedTable.schema), + spark.table("source").select("id")) + } + } + + test("ReplaceTableAsSelect: Non-atomic catalog drops the table if the write fails.") { spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty) + + intercept[Exception] { + spark.sql("REPLACE TABLE testcat.table_name" + + s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" + + s" AS SELECT id FROM source") + } + + assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")), + "Table should have been dropped as a result of the replace.") + } + + test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently if the" + + " subsequent table creation fails.") { + spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source") + val testCatalog = spark.catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty) + + intercept[Exception] { + spark.sql("REPLACE TABLE testcat.table_name" + + s" USING foo" + + s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" + + s" AS SELECT id FROM source") + } + + assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")), + "Table should have been dropped and failed to be created.") + } + + test("ReplaceTableAsSelect: Atomic catalog does not drop the table when replace fails.") { + spark.sql("CREATE TABLE testcat_atomic.table_name USING foo AS SELECT id, data FROM source") + val testCatalog = spark.catalog("testcat_atomic").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + + intercept[Exception] { + spark.sql("REPLACE TABLE testcat_atomic.table_name" + + s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}=true)" + + s" AS SELECT id FROM source") + } + + var maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(maybeReplacedTable === table, "Table should not have changed.") + + intercept[Exception] { + spark.sql("REPLACE TABLE testcat_atomic.table_name" + + s" USING foo" + + s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" + + s" AS SELECT id FROM source") + } + + maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(maybeReplacedTable === table, "Table should not have changed.") + } + + test("ReplaceTable: Erases the table contents and changes the metadata.") { + spark.sql(s"CREATE TABLE testcat.table_name USING $orc2 AS SELECT id, data FROM source") val testCatalog = spark.catalog("testcat").asTableCatalog val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) + assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty) - assert(table.name == "testcat.table_name") - assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) - assert(table.schema == new StructType() - .add("id", LongType, nullable = false) - .add("data", StringType)) + spark.sql("REPLACE TABLE testcat.table_name (id bigint) USING foo") + val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name")) - val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) + assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty, + "Replaced table should have no rows after committing.") + assert(replaced.schema().fields.length === 1, + "Replaced table should have new schema.") + assert(replaced.schema().fields(0).name === "id", + "Replaced table should have new schema.") + } + + test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as CTAS.") { + Seq("testcat", "testcat_atomic").foreach { catalog => + spark.sql(s"CREATE TABLE $catalog.created USING $orc2 AS SELECT id, data FROM source") + spark.sql( + s"CREATE OR REPLACE TABLE $catalog.replaced USING $orc2 AS SELECT id, data FROM source") + + val testCatalog = spark.catalog(catalog).asTableCatalog + val createdTable = testCatalog.loadTable(Identifier.of(Array(), "created")) + val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "replaced")) + + assert(createdTable.asInstanceOf[InMemoryTable].rows === + replacedTable.asInstanceOf[InMemoryTable].rows) + assert(createdTable.schema === replacedTable.schema) + } + } + + test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table does not exist.") { + Seq("testcat", "testcat_atomic").foreach { catalog => + spark.sql(s"CREATE TABLE $catalog.created USING $orc2 AS SELECT id, data FROM source") + intercept[CannotReplaceMissingTableException] { + spark.sql(s"REPLACE TABLE $catalog.replaced USING $orc2 AS SELECT id, data FROM source") + } + } + } + + test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") { + import TestInMemoryTableCatalog._ + spark.sql(s"CREATE TABLE testcat_atomic.created USING $orc2 AS SELECT id, data FROM source") + intercept[CannotReplaceMissingTableException] { + spark.sql(s"REPLACE TABLE testcat_atomic.replaced" + + s" USING $orc2" + + s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" + + s" AS SELECT id, data FROM source") + } } test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 380df7a365..95398082b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -23,11 +23,11 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.types.StructType @@ -38,7 +38,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap class TestInMemoryTableCatalog extends TableCatalog { import CatalogV2Implicits._ - private val tables: util.Map[Identifier, InMemoryTable] = + protected val tables: util.Map[Identifier, InMemoryTable] = new ConcurrentHashMap[Identifier, InMemoryTable]() private var _name: Option[String] = None @@ -66,11 +66,10 @@ class TestInMemoryTableCatalog extends TableCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - if (tables.containsKey(ident)) { throw new TableAlreadyExistsException(ident) } - + TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) if (partitions.nonEmpty) { throw new UnsupportedOperationException( s"Catalog $name: Partitioned tables are not supported") @@ -104,7 +103,9 @@ class TestInMemoryTableCatalog extends TableCatalog { } } - override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined + override def dropTable(ident: Identifier): Boolean = { + Option(tables.remove(ident)).isDefined + } def clearTables(): Unit = { tables.clear() @@ -114,7 +115,7 @@ class TestInMemoryTableCatalog extends TableCatalog { /** * A simple in-memory table. Rows are stored as a buffered group produced by each output task. */ -private class InMemoryTable( +class InMemoryTable( val name: String, val schema: StructType, override val properties: util.Map[String, String]) @@ -155,6 +156,7 @@ private class InMemoryTable( } override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + TestInMemoryTableCatalog.maybeSimulateFailedTableWrite(options) new WriteBuilder with SupportsTruncate { private var shouldTruncate: Boolean = false @@ -196,7 +198,142 @@ private class InMemoryTable( } } -private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +object TestInMemoryTableCatalog { + val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite" + val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate" + val SIMULATE_DROP_BEFORE_REPLACE_PROPERTY = "spark.sql.test.simulateDropBeforeReplace" + + def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { + if ("true".equalsIgnoreCase( + tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) { + throw new IllegalStateException("Manual create table failure.") + } + } + + def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = { + if (tableOptions.getBoolean( + TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) { + throw new IllegalStateException("Manual write to table failure.") + } + } +} + +class TestStagingInMemoryCatalog + extends TestInMemoryTableCatalog with StagingTableCatalog { + import CatalogV2Implicits.IdentifierHelper + import org.apache.spark.sql.sources.v2.TestInMemoryTableCatalog._ + + override def stageCreate( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { + validateStagedTable(partitions, properties) + new TestStagedCreateTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties)) + } + + override def stageReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { + validateStagedTable(partitions, properties) + new TestStagedReplaceTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties)) + } + + override def stageCreateOrReplace( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { + validateStagedTable(partitions, properties) + new TestStagedCreateOrReplaceTable( + ident, + new InMemoryTable(s"$name.${ident.quoted}", schema, properties)) + } + + private def validateStagedTable( + partitions: Array[Transform], + properties: util.Map[String, String]): Unit = { + if (partitions.nonEmpty) { + throw new UnsupportedOperationException( + s"Catalog $name: Partitioned tables are not supported") + } + + maybeSimulateFailedTableCreation(properties) + } + + private abstract class TestStagedTable( + ident: Identifier, + delegateTable: InMemoryTable) + extends StagedTable with SupportsWrite with SupportsRead { + + override def abortStagedChanges(): Unit = {} + + override def name(): String = delegateTable.name + + override def schema(): StructType = delegateTable.schema + + override def capabilities(): util.Set[TableCapability] = delegateTable.capabilities + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + delegateTable.newWriteBuilder(options) + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + delegateTable.newScanBuilder(options) + } + } + + private class TestStagedCreateTable( + ident: Identifier, + delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) { + + override def commitStagedChanges(): Unit = { + val maybePreCommittedTable = tables.putIfAbsent(ident, delegateTable) + if (maybePreCommittedTable != null) { + throw new TableAlreadyExistsException( + s"Table with identifier $ident and name $name was already created.") + } + } + } + + private class TestStagedReplaceTable( + ident: Identifier, + delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) { + + override def commitStagedChanges(): Unit = { + maybeSimulateDropBeforeCommit() + val maybePreCommittedTable = tables.replace(ident, delegateTable) + if (maybePreCommittedTable == null) { + throw new CannotReplaceMissingTableException(ident) + } + } + + private def maybeSimulateDropBeforeCommit(): Unit = { + if ("true".equalsIgnoreCase( + delegateTable.properties.get(SIMULATE_DROP_BEFORE_REPLACE_PROPERTY))) { + tables.remove(ident) + } + } + } + + private class TestStagedCreateOrReplaceTable( + ident: Identifier, + delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) { + + override def commitStagedChanges(): Unit = { + tables.put(ident, delegateTable) + } + } +} + + +class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { val rows = new mutable.ArrayBuffer[InternalRow]() }