[SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2

## What changes were proposed in this pull request?

Implements the `REPLACE TABLE` and `REPLACE TABLE AS SELECT` logical plans. `REPLACE TABLE` is now a valid operation in spark-sql provided that the tables being modified are managed by V2 catalogs.

This also introduces an atomic mix-in that table catalogs can choose to implement. Table catalogs can now implement `TransactionalTableCatalog`. The semantics of this API are that table creation and replacement can be "staged" and then "committed".

On the execution of `REPLACE TABLE AS SELECT`, `REPLACE TABLE`, and `CREATE TABLE AS SELECT`, if the catalog implements transactional operations, the physical plan will use said functionality. Otherwise, these operations fall back on non-atomic variants. For `REPLACE TABLE` in particular, the usage of non-atomic operations can unfortunately lead to inconsistent state.

## How was this patch tested?

Unit tests - multiple additions to `DataSourceV2SQLSuite`.

Closes #24798 from mccheah/spark-27724.

Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
mcheah 2019-07-22 12:08:46 +08:00 committed by Wenchen Fan
parent a783690d8a
commit 7ed0088539
14 changed files with 1401 additions and 257 deletions

View file

@ -113,6 +113,14 @@ statement
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #replaceTable
| ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
(identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)? #analyze
| ALTER TABLE multipartIdentifier
@ -261,6 +269,10 @@ createTableHeader
: CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier
;
replaceTableHeader
: (CREATE OR)? REPLACE TABLE multipartIdentifier
;
bucketSpec
: CLUSTERED BY identifierList
(SORTED BY orderedIdentifierList)?

View file

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalog.v2;
import java.util.Map;
import org.apache.spark.sql.catalog.v2.expressions.Transform;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.sources.v2.StagedTable;
import org.apache.spark.sql.sources.v2.SupportsWrite;
import org.apache.spark.sql.sources.v2.writer.BatchWrite;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An optional mix-in for implementations of {@link TableCatalog} that support staging creation of
* the a table before committing the table's metadata along with its contents in CREATE TABLE AS
* SELECT or REPLACE TABLE AS SELECT operations.
* <p>
* It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS
* SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
* {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform
* the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
* write operation fails, the catalog will have already dropped the table, and the planner cannot
* roll back the dropping of the table.
* <p>
* If the catalog implements this plugin, the catalog can implement the methods to "stage" the
* creation and the replacement of a table. After the table's
* {@link BatchWrite#commit(WriterCommitMessage[])} is called,
* {@link StagedTable#commitStagedChanges()} is called, at which point the staged table can
* complete both the data write and the metadata swap operation atomically.
*/
public interface StagingTableCatalog extends TableCatalog {
/**
* Stage the creation of a table, preparing it to be committed into the metastore.
* <p>
* When the table is committed, the contents of any writes performed by the Spark planner are
* committed along with the metadata about the table passed into this method's arguments. If the
* table exists when this method is called, the method should throw an exception accordingly. If
* another process concurrently creates the table before this table's staged changes are
* committed, an exception should be thrown by {@link StagedTable#commitStagedChanges()}.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
StagedTable stageCreate(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException;
/**
* Stage the replacement of a table, preparing it to be committed into the metastore when the
* returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* When the table is committed, the contents of any writes performed by the Spark planner are
* committed along with the metadata about the table passed into this method's arguments. If the
* table exists, the metadata and the contents of this table replace the metadata and contents of
* the existing table. If a concurrent process commits changes to the table's data or metadata
* while the write is being performed but before the staged changes are committed, the catalog
* can decide whether to move forward with the table replacement anyways or abort the commit
* operation.
* <p>
* If the table does not exist, committing the staged changes should fail with
* {@link NoSuchTableException}. This differs from the semantics of
* {@link #stageCreateOrReplace(Identifier, StructType, Transform[], Map)}, which should create
* the table in the data source if the table does not exist at the time of committing the
* operation.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
* @throws NoSuchTableException If the table does not exist
*/
StagedTable stageReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
/**
* Stage the creation or replacement of a table, preparing it to be committed into the metastore
* when the returned table's {@link StagedTable#commitStagedChanges()} is called.
* <p>
* When the table is committed, the contents of any writes performed by the Spark planner are
* committed along with the metadata about the table passed into this method's arguments. If the
* table exists, the metadata and the contents of this table replace the metadata and contents of
* the existing table. If a concurrent process commits changes to the table's data or metadata
* while the write is being performed but before the staged changes are committed, the catalog
* can decide whether to move forward with the table replacement anyways or abort the commit
* operation.
* <p>
* If the table does not exist when the changes are committed, the table should be created in the
* backing data source. This differs from the expected semantics of
* {@link #stageReplace(Identifier, StructType, Transform[], Map)}, which should fail when
* the staged changes are committed but the table doesn't exist at commit time.
*
* @param ident a table identifier
* @param schema the schema of the new table, as a struct type
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
*/
StagedTable stageCreateOrReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException;
}

View file

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2;
import java.util.Map;
import org.apache.spark.sql.catalog.v2.Identifier;
import org.apache.spark.sql.catalog.v2.StagingTableCatalog;
import org.apache.spark.sql.catalog.v2.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* Represents a table which is staged for being committed to the metastore.
* <p>
* This is used to implement atomic CREATE TABLE AS SELECT and REPLACE TABLE AS SELECT queries. The
* planner will create one of these via
* {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or
* {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the
* table for being written to. This table should usually implement {@link SupportsWrite}. A new
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)},
* and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()},
* at which point implementations are expected to commit the table's metadata into the metastore
* along with the data that was written by the writes from the write builder this table created.
*/
public interface StagedTable extends Table {
/**
* Finalize the creation or replacement of this table.
*/
void commitStagedChanges();
/**
* Abort the changes that were staged, both in metadata and from temporary outputs of this
* table's writers.
*/
void abortStagedChanges();
}

View file

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.Identifier
class CannotReplaceMissingTableException(
tableIdentifier: Identifier,
cause: Option[Throwable] = None)
extends AnalysisException(
s"Table $tableIdentifier cannot be replaced as it did not exist." +
s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause)

View file

@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@ -2127,6 +2127,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
(multipartIdentifier, temporary, ifNotExists, ctx.EXTERNAL != null)
}
/**
* Validate a replace table statement and return the [[TableIdentifier]].
*/
override def visitReplaceTableHeader(
ctx: ReplaceTableHeaderContext): TableHeader = withOrigin(ctx) {
val multipartIdentifier = ctx.multipartIdentifier.parts.asScala.map(_.getText)
(multipartIdentifier, false, false, false)
}
/**
* Parse a qualified name to a multipart name.
*/
@ -2294,6 +2303,69 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}
/**
* Replace a table, returning a [[ReplaceTableStatement]] logical plan.
*
* Expected format:
* {{{
* [CREATE OR] REPLACE TABLE [db_name.]table_name
* USING table_provider
* replace_table_clauses
* [[AS] select_statement];
*
* replace_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
val (table, _, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader)
if (external) {
operationNotAllowed("REPLACE EXTERNAL TABLE ... USING", ctx)
}
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
val schema = Option(ctx.colTypeList()).map(createSchema)
val partitioning: Seq[Transform] =
Option(ctx.partitioning).map(visitTransformList).getOrElse(Nil)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val comment = Option(ctx.comment).map(string)
val orCreate = ctx.replaceTableHeader().CREATE() != null
Option(ctx.query).map(plan) match {
case Some(_) if schema.isDefined =>
operationNotAllowed(
"Schema may not be specified in a Replace Table As Select (RTAS) statement",
ctx)
case Some(query) =>
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
provider, options, location, comment, orCreate = orCreate)
case _ =>
ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
bucketSpec, properties, provider, options, location, comment, orCreate = orCreate)
}
}
/**
* Create a [[DropTableStatement]] command.
*/

View file

@ -441,6 +441,47 @@ case class CreateTableAsSelect(
}
}
/**
* Replace a table with a v2 catalog.
*
* If the table does not exist, and orCreate is true, then it will be created.
* If the table does not exist, and orCreate is false, then an exception will be thrown.
*
* The persisted table will have no contents as a result of this operation.
*/
case class ReplaceTable(
catalog: TableCatalog,
tableName: Identifier,
tableSchema: StructType,
partitioning: Seq[Transform],
properties: Map[String, String],
orCreate: Boolean) extends Command
/**
* Replaces a table from a select query with a v2 catalog.
*
* If the table does not exist, and orCreate is true, then it will be created.
* If the table does not exist, and orCreate is false, then an exception will be thrown.
*/
case class ReplaceTableAsSelect(
catalog: TableCatalog,
tableName: Identifier,
partitioning: Seq[Transform],
query: LogicalPlan,
properties: Map[String, String],
writeOptions: Map[String, String],
orCreate: Boolean) extends Command {
override def children: Seq[LogicalPlan] = Seq(query)
override lazy val resolved: Boolean = {
// the table schema is created from the query schema, so the only resolution needed is to check
// that the columns referenced by the table's partitioning exist in the query schema
val references = partitioning.flatMap(_.references).toSet
references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined)
}
}
/**
* Append data to an existing table.
*/

View file

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical.sql
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.StructType
/**
* A REPLACE TABLE command, as parsed from SQL.
*
* If the table exists prior to running this command, executing this statement
* will replace the table's metadata and clear the underlying rows from the table.
*/
case class ReplaceTableStatement(
tableName: Seq[String],
tableSchema: StructType,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
options: Map[String, String],
location: Option[String],
comment: Option[String],
orCreate: Boolean) extends ParsedStatement
/**
* A REPLACE TABLE AS SELECT command, as parsed from SQL.
*/
case class ReplaceTableAsSelectStatement(
tableName: Seq[String],
asSelect: LogicalPlan,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
options: Map[String, String],
location: Option[String],
comment: Option[String],
orCreate: Boolean) extends ParsedStatement {
override def children: Seq[LogicalPlan] = Seq(asSelect)
}

View file

@ -19,11 +19,11 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
@ -47,82 +47,71 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsePlan(sql), expected, checkAnalysis = false)
}
test("create table using - schema") {
val sql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
test("create/replace table using - schema") {
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType()
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
"parquet",
Map.empty[String, String],
None,
None)
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType()
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet",
"no viable alternative at input")
}
test("create table - with IF NOT EXISTS") {
test("create/replace table - with IF NOT EXISTS") {
val sql = "CREATE TABLE IF NOT EXISTS my_tab(a INT, b STRING) USING parquet"
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
}
testCreateOrReplaceDdl(
sql,
TableSpec(
Seq("my_tab"),
Some(new StructType().add("a", IntegerType).add("b", StringType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
"parquet",
Map.empty[String, String],
None,
None),
expectedIfNotExists = true)
}
test("create table - with partitioned by") {
val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
test("create/replace table - with partitioned by") {
val createSql = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
"USING parquet PARTITIONED BY (a)"
parsePlan(query) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType()
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType))
assert(create.partitioning == Seq(IdentityTransform(FieldReference("a"))))
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
val replaceSql = "REPLACE TABLE my_tab(a INT comment 'test', b STRING) " +
"USING parquet PARTITIONED BY (a)"
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType()
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)),
Seq(IdentityTransform(FieldReference("a"))),
None,
Map.empty[String, String],
"parquet",
Map.empty[String, String],
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
test("create table - partitioned by transforms") {
val sql =
test("create/replace table - partitioned by transforms") {
val createSql =
"""
|CREATE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet
|PARTITIONED BY (
@ -135,154 +124,151 @@ class DDLParserSuite extends AnalysisTest {
| foo(a, "bar", 34))
""".stripMargin
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType()
.add("a", IntegerType)
.add("b", StringType)
.add("ts", TimestampType))
assert(create.partitioning == Seq(
IdentityTransform(FieldReference("a")),
BucketTransform(LiteralValue(16, IntegerType), Seq(FieldReference("b"))),
YearsTransform(FieldReference("ts")),
MonthsTransform(FieldReference("ts")),
DaysTransform(FieldReference("ts")),
HoursTransform(FieldReference("ts")),
ApplyTransform("foo", Seq(
FieldReference("a"),
LiteralValue(UTF8String.fromString("bar"), StringType),
LiteralValue(34, IntegerType)))))
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
val replaceSql =
"""
|REPLACE TABLE my_tab (a INT, b STRING, ts TIMESTAMP) USING parquet
|PARTITIONED BY (
| a,
| bucket(16, b),
| years(ts),
| months(ts),
| days(ts),
| hours(ts),
| foo(a, "bar", 34))
""".stripMargin
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType()
.add("a", IntegerType)
.add("b", StringType)
.add("ts", TimestampType)),
Seq(
IdentityTransform(FieldReference("a")),
BucketTransform(LiteralValue(16, IntegerType), Seq(FieldReference("b"))),
YearsTransform(FieldReference("ts")),
MonthsTransform(FieldReference("ts")),
DaysTransform(FieldReference("ts")),
HoursTransform(FieldReference("ts")),
ApplyTransform("foo", Seq(
FieldReference("a"),
LiteralValue(UTF8String.fromString("bar"), StringType),
LiteralValue(34, IntegerType)))),
None,
Map.empty[String, String],
"parquet",
Map.empty[String, String],
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
test("create table - with bucket") {
val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
test("create/replace table - with bucket") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
parsePlan(query) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.contains(BucketSpec(5, Seq("a"), Seq("b"))))
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet " +
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $query")
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType().add("a", IntegerType).add("b", StringType)),
Seq.empty[Transform],
Some(BucketSpec(5, Seq("a"), Seq("b"))),
Map.empty[String, String],
"parquet",
Map.empty[String, String],
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
test("create table - with comment") {
val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.contains("abc"))
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
test("create/replace table - with comment") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType().add("a", IntegerType).add("b", StringType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
"parquet",
Map.empty[String, String],
None,
Some("abc"))
Seq(createSql, replaceSql).foreach{ sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
test("create table - with table properties") {
val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet TBLPROPERTIES('test' = 'test')"
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties == Map("test" -> "test"))
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
test("create/replace table - with table properties") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet" +
" TBLPROPERTIES('test' = 'test')"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet" +
" TBLPROPERTIES('test' = 'test')"
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType().add("a", IntegerType).add("b", StringType)),
Seq.empty[Transform],
None,
Map("test" -> "test"),
"parquet",
Map.empty[String, String],
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
test("create table - with location") {
val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("my_tab"))
assert(create.tableSchema == new StructType().add("a", IntegerType).add("b", StringType))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.contains("/tmp/file"))
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
test("create/replace table - with location") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet LOCATION '/tmp/file'"
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType().add("a", IntegerType).add("b", StringType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
"parquet",
Map.empty[String, String],
Some("/tmp/file"),
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
test("create table - byte length literal table name") {
val sql = "CREATE TABLE 1m.2g(a INT) USING parquet"
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("1m", "2g"))
assert(create.tableSchema == new StructType().add("a", IntegerType))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
test("create/replace table - byte length literal table name") {
val createSql = "CREATE TABLE 1m.2g(a INT) USING parquet"
val replaceSql = "REPLACE TABLE 1m.2g(a INT) USING parquet"
val expectedTableSpec = TableSpec(
Seq("1m", "2g"),
Some(new StructType().add("a", IntegerType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
"parquet",
Map.empty[String, String],
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
test("Duplicate clauses - create table") {
test("Duplicate clauses - create/replace table") {
def createTableHeader(duplicateClause: String): String = {
s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause"
}
def replaceTableHeader(duplicateClause: String): String = {
s"CREATE TABLE my_tab(a INT, b STRING) USING parquet $duplicateClause $duplicateClause"
}
intercept(createTableHeader("TBLPROPERTIES('test' = 'test2')"),
"Found duplicate clauses: TBLPROPERTIES")
intercept(createTableHeader("LOCATION '/tmp/file'"),
@ -293,31 +279,44 @@ class DDLParserSuite extends AnalysisTest {
"Found duplicate clauses: CLUSTERED BY")
intercept(createTableHeader("PARTITIONED BY (b)"),
"Found duplicate clauses: PARTITIONED BY")
intercept(replaceTableHeader("TBLPROPERTIES('test' = 'test2')"),
"Found duplicate clauses: TBLPROPERTIES")
intercept(replaceTableHeader("LOCATION '/tmp/file'"),
"Found duplicate clauses: LOCATION")
intercept(replaceTableHeader("COMMENT 'a table'"),
"Found duplicate clauses: COMMENT")
intercept(replaceTableHeader("CLUSTERED BY(b) INTO 256 BUCKETS"),
"Found duplicate clauses: CLUSTERED BY")
intercept(replaceTableHeader("PARTITIONED BY (b)"),
"Found duplicate clauses: PARTITIONED BY")
}
test("support for other types in OPTIONS") {
val sql =
val createSql =
"""
|CREATE TABLE table_name USING json
|OPTIONS (a 1, b 0.1, c TRUE)
""".stripMargin
parsePlan(sql) match {
case create: CreateTableStatement =>
assert(create.tableName == Seq("table_name"))
assert(create.tableSchema == new StructType)
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties.isEmpty)
assert(create.provider == "json")
assert(create.options == Map("a" -> "1", "b" -> "0.1", "c" -> "true"))
assert(create.location.isEmpty)
assert(create.comment.isEmpty)
assert(!create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableStatement].getClass.getName} from query," +
s"got ${other.getClass.getName}: $sql")
val replaceSql =
"""
|REPLACE TABLE table_name USING json
|OPTIONS (a 1, b 0.1, c TRUE)
""".stripMargin
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(
sql,
TableSpec(
Seq("table_name"),
Some(new StructType),
Seq.empty[Transform],
Option.empty[BucketSpec],
Map.empty[String, String],
"json",
Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
None,
None),
expectedIfNotExists = false)
}
}
@ -352,27 +351,28 @@ class DDLParserSuite extends AnalysisTest {
|AS SELECT * FROM src
""".stripMargin
checkParsing(s1)
checkParsing(s2)
checkParsing(s3)
val s4 =
"""
|REPLACE TABLE mydb.page_view
|USING parquet
|COMMENT 'This is the staging page view table'
|LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2')
|AS SELECT * FROM src
""".stripMargin
def checkParsing(sql: String): Unit = {
parsePlan(sql) match {
case create: CreateTableAsSelectStatement =>
assert(create.tableName == Seq("mydb", "page_view"))
assert(create.partitioning.isEmpty)
assert(create.bucketSpec.isEmpty)
assert(create.properties == Map("p1" -> "v1", "p2" -> "v2"))
assert(create.provider == "parquet")
assert(create.options.isEmpty)
assert(create.location.contains("/user/external/page_view"))
assert(create.comment.contains("This is the staging page view table"))
assert(create.ifNotExists)
case other =>
fail(s"Expected to parse ${classOf[CreateTableAsSelectStatement].getClass.getName} " +
s"from query, got ${other.getClass.getName}: $sql")
}
val expectedTableSpec = TableSpec(
Seq("mydb", "page_view"),
None,
Seq.empty[Transform],
None,
Map("p1" -> "v1", "p2" -> "v2"),
"parquet",
Map.empty[String, String],
Some("/user/external/page_view"),
Some("This is the staging page view table"))
Seq(s1, s2, s3, s4).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = true)
}
}
@ -403,6 +403,28 @@ class DDLParserSuite extends AnalysisTest {
parseCompare(s"DROP VIEW IF EXISTS view", DropViewStatement(Seq("view"), ifExists = true))
}
private def testCreateOrReplaceDdl(
sqlStatement: String,
tableSpec: TableSpec,
expectedIfNotExists: Boolean) {
val parsedPlan = parsePlan(sqlStatement)
val newTableToken = sqlStatement.split(" ")(0).trim.toUpperCase(Locale.ROOT)
parsedPlan match {
case create: CreateTableStatement if newTableToken == "CREATE" =>
assert(create.ifNotExists == expectedIfNotExists)
case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
assert(ctas.ifNotExists == expectedIfNotExists)
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
case replace: ReplaceTableAsSelectStatement if newTableToken == "REPLACE" =>
case other =>
fail("First token in statement does not match the expected parsed plan; CREATE TABLE" +
" should create a CreateTableStatement, and REPLACE TABLE should create a" +
s" ReplaceTableStatement. Statement: $sqlStatement, plan type:" +
s" ${parsedPlan.getClass.getName}.")
}
assert(TableSpec(parsedPlan) === tableSpec)
}
// ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment);
// ALTER VIEW view_name UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
test("alter view: alter view properties") {
@ -593,4 +615,69 @@ class DDLParserSuite extends AnalysisTest {
Seq(Seq("x"), Seq("y"), Seq("a", "b", "c"))))
}
}
private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: String,
options: Map[String, String],
location: Option[String],
comment: Option[String])
private object TableSpec {
def apply(plan: LogicalPlan): TableSpec = {
plan match {
case create: CreateTableStatement =>
TableSpec(
create.tableName,
Some(create.tableSchema),
create.partitioning,
create.bucketSpec,
create.properties,
create.provider,
create.options,
create.location,
create.comment)
case replace: ReplaceTableStatement =>
TableSpec(
replace.tableName,
Some(replace.tableSchema),
replace.partitioning,
replace.bucketSpec,
replace.properties,
replace.provider,
replace.options,
replace.location,
replace.comment)
case ctas: CreateTableAsSelectStatement =>
TableSpec(
ctas.tableName,
Some(ctas.asSelect).filter(_.resolved).map(_.schema),
ctas.partitioning,
ctas.bucketSpec,
ctas.properties,
ctas.provider,
ctas.options,
ctas.location,
ctas.comment)
case rtas: ReplaceTableAsSelectStatement =>
TableSpec(
rtas.tableName,
Some(rtas.asSelect).filter(_.resolved).map(_.schema),
rtas.partitioning,
rtas.bucketSpec,
rtas.properties,
rtas.provider,
rtas.options,
rtas.location,
rtas.comment)
case other =>
fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
s" from query, got ${other.getClass.getName}.")
}
}
}
}

View file

@ -27,8 +27,8 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DropTableCommand}
import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation}
@ -94,6 +94,38 @@ case class DataSourceResolution(
convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create)
}
case ReplaceTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, orCreate) =>
throw new AnalysisException(
s"Replacing tables is not supported using the legacy / v1 Spark external catalog" +
s" API. Write provider name: $provider, identifier: $table.")
case ReplaceTableAsSelectStatement(
AsTableIdentifier(table), query, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, orCreate) =>
throw new AnalysisException(
s"Replacing tables is not supported using the legacy / v1 Spark external catalog" +
s" API. Write provider name: $provider, identifier: $table.")
case replace: ReplaceTableStatement =>
// the provider was not a v1 source, convert to a v2 plan
val CatalogObjectIdentifier(maybeCatalog, identifier) = replace.tableName
val catalog = maybeCatalog.orElse(defaultCatalog)
.getOrElse(throw new AnalysisException(
s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
.asTableCatalog
convertReplaceTable(catalog, identifier, replace)
case rtas: ReplaceTableAsSelectStatement =>
// the provider was not a v1 source, convert to a v2 plan
val CatalogObjectIdentifier(maybeCatalog, identifier) = rtas.tableName
val catalog = maybeCatalog.orElse(defaultCatalog)
.getOrElse(throw new AnalysisException(
s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
.asTableCatalog
convertRTAS(catalog, identifier, rtas)
case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) =>
DropTable(catalog.asTableCatalog, ident, ifExists)
@ -226,6 +258,43 @@ case class DataSourceResolution(
ignoreIfExists = create.ifNotExists)
}
private def convertRTAS(
catalog: TableCatalog,
identifier: Identifier,
rtas: ReplaceTableAsSelectStatement): ReplaceTableAsSelect = {
// convert the bucket spec and add it as a transform
val partitioning = rtas.partitioning ++ rtas.bucketSpec.map(_.asTransform)
val properties = convertTableProperties(
rtas.properties, rtas.options, rtas.location, rtas.comment, rtas.provider)
ReplaceTableAsSelect(
catalog,
identifier,
partitioning,
rtas.asSelect,
properties,
writeOptions = rtas.options.filterKeys(_ != "path"),
orCreate = rtas.orCreate)
}
private def convertReplaceTable(
catalog: TableCatalog,
identifier: Identifier,
replace: ReplaceTableStatement): ReplaceTable = {
// convert the bucket spec and add it as a transform
val partitioning = replace.partitioning ++ replace.bucketSpec.map(_.asTransform)
val properties = convertTableProperties(
replace.properties, replace.options, replace.location, replace.comment, replace.provider)
ReplaceTable(
catalog,
identifier,
replace.tableSchema,
partitioning,
properties,
orCreate = replace.orCreate)
}
private def convertTableProperties(
properties: Map[String, String],
options: Map[String, String],

View file

@ -21,9 +21,10 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalog.v2.StagingTableCatalog
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@ -165,8 +166,45 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
CreateTableAsSelectExec(
catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(
staging, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(
catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil
}
case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil
case _ =>
ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil
}
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
staging,
ident,
parts,
planLater(query),
props,
writeOptions,
orCreate = orCreate) :: Nil
case _ =>
ReplaceTableAsSelectExec(
catalog,
ident,
parts,
planLater(query),
props,
writeOptions,
orCreate = orCreate) :: Nil
}
case AppendData(r: DataSourceV2Relation, query, _) =>
AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil

View file

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.sources.v2.StagedTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
case class ReplaceTableExec(
catalog: TableCatalog,
ident: Identifier,
tableSchema: StructType,
partitioning: Seq[Transform],
tableProperties: Map[String, String],
orCreate: Boolean) extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = {
if (catalog.tableExists(ident)) {
catalog.dropTable(ident)
} else if (!orCreate) {
throw new CannotReplaceMissingTableException(ident)
}
catalog.createTable(ident, tableSchema, partitioning.toArray, tableProperties.asJava)
sqlContext.sparkContext.parallelize(Seq.empty, 1)
}
override def output: Seq[Attribute] = Seq.empty
}
case class AtomicReplaceTableExec(
catalog: StagingTableCatalog,
identifier: Identifier,
tableSchema: StructType,
partitioning: Seq[Transform],
tableProperties: Map[String, String],
orCreate: Boolean) extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] = {
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
} else if (catalog.tableExists(identifier)) {
try {
catalog.stageReplace(
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
} catch {
case e: NoSuchTableException =>
throw new CannotReplaceMissingTableException(identifier, Some(e))
}
} else {
throw new CannotReplaceMissingTableException(identifier)
}
commitOrAbortStagedChanges(staged)
sqlContext.sparkContext.parallelize(Seq.empty, 1)
}
override def output: Seq[Attribute] = Seq.empty
private def commitOrAbortStagedChanges(staged: StagedTable): Unit = {
Utils.tryWithSafeFinallyAndFailureCallbacks({
staged.commitStagedChanges()
})(catchBlock = {
staged.abortStagedChanges()
})
}
}

View file

@ -26,15 +26,15 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.sql.sources.v2.{StagedTable, SupportsWrite}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}
@ -51,11 +51,13 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
}
/**
* Physical plan node for v2 create table as select.
* Physical plan node for v2 create table as select when the catalog does not support staging
* the table creation.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* If either table creation or the append fails, the table will be deleted. This implementation does
* not provide an atomic CTAS.
* If either table creation or the append fails, the table will be deleted. This implementation is
* not atomic; for an atomic variant for catalogs that support the appropriate features, see
* CreateTableAsSelectStagingExec.
*/
case class CreateTableAsSelectExec(
catalog: TableCatalog,
@ -78,7 +80,8 @@ case class CreateTableAsSelectExec(
}
Utils.tryWithSafeFinallyAndFailureCallbacks({
catalog.createTable(ident, query.schema, partitioning.toArray, properties.asJava) match {
catalog.createTable(
ident, query.schema, partitioning.toArray, properties.asJava) match {
case table: SupportsWrite =>
val batchWrite = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
@ -89,15 +92,145 @@ case class CreateTableAsSelectExec(
case _ =>
// table does not support writes
throw new SparkException(s"Table implementation does not support writes: ${ident.quoted}")
throw new SparkException(
s"Table implementation does not support writes: ${ident.quoted}")
}
})(catchBlock = {
catalog.dropTable(ident)
})
}
}
/**
* Physical plan node for v2 create table as select, when the catalog is determined to support
* staging table creation.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* The CTAS operation is atomic. The creation of the table is staged and the commit of the write
* should bundle the commitment of the metadata and the table contents in a single unit. If the
* write fails, the table is instructed to roll back all staged changes.
*/
case class AtomicCreateTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
ifNotExists: Boolean) extends AtomicTableWriteExec {
override protected def doExecute(): RDD[InternalRow] = {
if (catalog.tableExists(ident)) {
if (ifNotExists) {
return sparkContext.parallelize(Seq.empty, 1)
}
throw new TableAlreadyExistsException(ident)
}
val stagedTable = catalog.stageCreate(
ident, query.schema, partitioning.toArray, properties.asJava)
writeToStagedTable(stagedTable, writeOptions, ident)
}
}
/**
* Physical plan node for v2 replace table as select when the catalog does not support staging
* table replacement.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* If the table exists, its contents and schema should be replaced with the schema and the contents
* of the query. This is a non-atomic implementation that drops the table and then runs non-atomic
* CTAS. For an atomic implementation for catalogs with the appropriate support, see
* ReplaceTableAsSelectStagingExec.
*/
case class ReplaceTableAsSelectExec(
catalog: TableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
orCreate: Boolean) extends AtomicTableWriteExec {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
override protected def doExecute(): RDD[InternalRow] = {
// Note that this operation is potentially unsafe, but these are the strict semantics of
// RTAS if the catalog does not support atomic operations.
//
// There are numerous cases we concede to where the table will be dropped and irrecoverable:
//
// 1. Creating the new table fails,
// 2. Writing to the new table fails,
// 3. The table returned by catalog.createTable doesn't support writing.
if (catalog.tableExists(ident)) {
catalog.dropTable(ident)
} else if (!orCreate) {
throw new CannotReplaceMissingTableException(ident)
}
val createdTable = catalog.createTable(
ident, query.schema, partitioning.toArray, properties.asJava)
Utils.tryWithSafeFinallyAndFailureCallbacks({
createdTable match {
case table: SupportsWrite =>
val batchWrite = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
.buildForBatch()
doWrite(batchWrite)
case _ =>
// table does not support writes
throw new SparkException(
s"Table implementation does not support writes: ${ident.quoted}")
}
})(catchBlock = {
catalog.dropTable(ident)
})
}
}
/**
*
* Physical plan node for v2 replace table as select when the catalog supports staging
* table replacement.
*
* A new table will be created using the schema of the query, and rows from the query are appended.
* If the table exists, its contents and schema should be replaced with the schema and the contents
* of the query. This implementation is atomic. The table replacement is staged, and the commit
* operation at the end should perform tne replacement of the table's metadata and contents. If the
* write fails, the table is instructed to roll back staged changes and any previously written table
* is left untouched.
*/
case class AtomicReplaceTableAsSelectExec(
catalog: StagingTableCatalog,
ident: Identifier,
partitioning: Seq[Transform],
query: SparkPlan,
properties: Map[String, String],
writeOptions: CaseInsensitiveStringMap,
orCreate: Boolean) extends AtomicTableWriteExec {
override protected def doExecute(): RDD[InternalRow] = {
val staged = if (orCreate) {
catalog.stageCreateOrReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
} else if (catalog.tableExists(ident)) {
try {
catalog.stageReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
} catch {
case e: NoSuchTableException =>
throw new CannotReplaceMissingTableException(ident, Some(e))
}
} else {
throw new CannotReplaceMissingTableException(ident)
}
writeToStagedTable(staged, writeOptions, ident)
}
}
/**
* Physical plan node for append into a v2 table.
*
@ -330,6 +463,36 @@ object DataWritingSparkTask extends Logging {
}
}
private[v2] trait AtomicTableWriteExec extends V2TableWriteExec {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.IdentifierHelper
protected def writeToStagedTable(
stagedTable: StagedTable,
writeOptions: CaseInsensitiveStringMap,
ident: Identifier): RDD[InternalRow] = {
Utils.tryWithSafeFinallyAndFailureCallbacks({
stagedTable match {
case table: SupportsWrite =>
val batchWrite = table.newWriteBuilder(writeOptions)
.withInputDataSchema(query.schema)
.withQueryId(UUID.randomUUID().toString)
.buildForBatch()
val writtenRows = doWrite(batchWrite)
stagedTable.commitStagedChanges()
writtenRows
case _ =>
// Table does not support writes - staged changes are also rolled back below.
throw new SparkException(
s"Table implementation does not support writes: ${ident.quoted}")
}
})(catchBlock = {
// Failure rolls back the staged writes and metadata changes.
stagedTable.abortStagedChanges()
})
}
}
private[v2] case class DataWritingSparkTaskResult(
numRows: Long,
writerCommitMessage: WriterCommitMessage)

View file

@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalog.v2.Identifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
@ -39,6 +39,8 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
before {
spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set(
"spark.sql.catalog.testcat_atomic", classOf[TestStagingInMemoryCatalog].getName)
spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set(V2_SESSION_CATALOG.key, classOf[TestInMemoryTableCatalog].getName)
@ -50,6 +52,7 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
after {
spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables()
spark.catalog("testcat_atomic").asInstanceOf[TestInMemoryTableCatalog].clearTables()
spark.catalog("session").asInstanceOf[TestInMemoryTableCatalog].clearTables()
}
@ -159,20 +162,168 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
}
test("CreateTableAsSelect: use v2 plan because catalog is set") {
val basicCatalog = spark.catalog("testcat").asTableCatalog
val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == identifier)
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> "foo").asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
}
}
test("ReplaceTableAsSelect: basic v2 implementation.") {
val basicCatalog = spark.catalog("testcat").asTableCatalog
val atomicCatalog = spark.catalog("testcat_atomic").asTableCatalog
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Seq((basicCatalog, basicIdentifier), (atomicCatalog, atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM source")
val originalTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
spark.sql(s"REPLACE TABLE $identifier USING foo AS SELECT id FROM source")
val replacedTable = catalog.loadTable(Identifier.of(Array(), "table_name"))
assert(replacedTable != originalTable, "Table should have been replaced.")
assert(replacedTable.name == identifier)
assert(replacedTable.partitioning.isEmpty)
assert(replacedTable.properties == Map("provider" -> "foo").asJava)
assert(replacedTable.schema == new StructType()
.add("id", LongType, nullable = false))
val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
checkAnswer(
spark.internalCreateDataFrame(rdd, replacedTable.schema),
spark.table("source").select("id"))
}
}
test("ReplaceTableAsSelect: Non-atomic catalog drops the table if the write fails.") {
spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = spark.catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
intercept[Exception] {
spark.sql("REPLACE TABLE testcat.table_name" +
s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}`=true)" +
s" AS SELECT id FROM source")
}
assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
"Table should have been dropped as a result of the replace.")
}
test("ReplaceTableAsSelect: Non-atomic catalog drops the table permanently if the" +
" subsequent table creation fails.") {
spark.sql("CREATE TABLE testcat.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = spark.catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
intercept[Exception] {
spark.sql("REPLACE TABLE testcat.table_name" +
s" USING foo" +
s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
s" AS SELECT id FROM source")
}
assert(!testCatalog.tableExists(Identifier.of(Array(), "table_name")),
"Table should have been dropped and failed to be created.")
}
test("ReplaceTableAsSelect: Atomic catalog does not drop the table when replace fails.") {
spark.sql("CREATE TABLE testcat_atomic.table_name USING foo AS SELECT id, data FROM source")
val testCatalog = spark.catalog("testcat_atomic").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
intercept[Exception] {
spark.sql("REPLACE TABLE testcat_atomic.table_name" +
s" USING foo OPTIONS (`${TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION}=true)" +
s" AS SELECT id FROM source")
}
var maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(maybeReplacedTable === table, "Table should not have changed.")
intercept[Exception] {
spark.sql("REPLACE TABLE testcat_atomic.table_name" +
s" USING foo" +
s" TBLPROPERTIES (`${TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY}`=true)" +
s" AS SELECT id FROM source")
}
maybeReplacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(maybeReplacedTable === table, "Table should not have changed.")
}
test("ReplaceTable: Erases the table contents and changes the metadata.") {
spark.sql(s"CREATE TABLE testcat.table_name USING $orc2 AS SELECT id, data FROM source")
val testCatalog = spark.catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
assert(table.name == "testcat.table_name")
assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> "foo").asJava)
assert(table.schema == new StructType()
.add("id", LongType, nullable = false)
.add("data", StringType))
spark.sql("REPLACE TABLE testcat.table_name (id bigint) USING foo")
val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
"Replaced table should have no rows after committing.")
assert(replaced.schema().fields.length === 1,
"Replaced table should have new schema.")
assert(replaced.schema().fields(0).name === "id",
"Replaced table should have new schema.")
}
test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as CTAS.") {
Seq("testcat", "testcat_atomic").foreach { catalog =>
spark.sql(s"CREATE TABLE $catalog.created USING $orc2 AS SELECT id, data FROM source")
spark.sql(
s"CREATE OR REPLACE TABLE $catalog.replaced USING $orc2 AS SELECT id, data FROM source")
val testCatalog = spark.catalog(catalog).asTableCatalog
val createdTable = testCatalog.loadTable(Identifier.of(Array(), "created"))
val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "replaced"))
assert(createdTable.asInstanceOf[InMemoryTable].rows ===
replacedTable.asInstanceOf[InMemoryTable].rows)
assert(createdTable.schema === replacedTable.schema)
}
}
test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table does not exist.") {
Seq("testcat", "testcat_atomic").foreach { catalog =>
spark.sql(s"CREATE TABLE $catalog.created USING $orc2 AS SELECT id, data FROM source")
intercept[CannotReplaceMissingTableException] {
spark.sql(s"REPLACE TABLE $catalog.replaced USING $orc2 AS SELECT id, data FROM source")
}
}
}
test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") {
import TestInMemoryTableCatalog._
spark.sql(s"CREATE TABLE testcat_atomic.created USING $orc2 AS SELECT id, data FROM source")
intercept[CannotReplaceMissingTableException] {
spark.sql(s"REPLACE TABLE testcat_atomic.replaced" +
s" USING $orc2" +
s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" +
s" AS SELECT id, data FROM source")
}
}
test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") {

View file

@ -23,11 +23,11 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.{CatalogV2Implicits, Identifier, StagingTableCatalog, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.types.StructType
@ -38,7 +38,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
class TestInMemoryTableCatalog extends TableCatalog {
import CatalogV2Implicits._
private val tables: util.Map[Identifier, InMemoryTable] =
protected val tables: util.Map[Identifier, InMemoryTable] =
new ConcurrentHashMap[Identifier, InMemoryTable]()
private var _name: Option[String] = None
@ -66,11 +66,10 @@ class TestInMemoryTableCatalog extends TableCatalog {
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
if (tables.containsKey(ident)) {
throw new TableAlreadyExistsException(ident)
}
TestInMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
if (partitions.nonEmpty) {
throw new UnsupportedOperationException(
s"Catalog $name: Partitioned tables are not supported")
@ -104,7 +103,9 @@ class TestInMemoryTableCatalog extends TableCatalog {
}
}
override def dropTable(ident: Identifier): Boolean = Option(tables.remove(ident)).isDefined
override def dropTable(ident: Identifier): Boolean = {
Option(tables.remove(ident)).isDefined
}
def clearTables(): Unit = {
tables.clear()
@ -114,7 +115,7 @@ class TestInMemoryTableCatalog extends TableCatalog {
/**
* A simple in-memory table. Rows are stored as a buffered group produced by each output task.
*/
private class InMemoryTable(
class InMemoryTable(
val name: String,
val schema: StructType,
override val properties: util.Map[String, String])
@ -155,6 +156,7 @@ private class InMemoryTable(
}
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
TestInMemoryTableCatalog.maybeSimulateFailedTableWrite(options)
new WriteBuilder with SupportsTruncate {
private var shouldTruncate: Boolean = false
@ -196,7 +198,142 @@ private class InMemoryTable(
}
}
private class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
object TestInMemoryTableCatalog {
val SIMULATE_FAILED_WRITE_OPTION = "spark.sql.test.simulateFailedWrite"
val SIMULATE_FAILED_CREATE_PROPERTY = "spark.sql.test.simulateFailedCreate"
val SIMULATE_DROP_BEFORE_REPLACE_PROPERTY = "spark.sql.test.simulateDropBeforeReplace"
def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = {
if ("true".equalsIgnoreCase(
tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))) {
throw new IllegalStateException("Manual create table failure.")
}
}
def maybeSimulateFailedTableWrite(tableOptions: CaseInsensitiveStringMap): Unit = {
if (tableOptions.getBoolean(
TestInMemoryTableCatalog.SIMULATE_FAILED_WRITE_OPTION, false)) {
throw new IllegalStateException("Manual write to table failure.")
}
}
}
class TestStagingInMemoryCatalog
extends TestInMemoryTableCatalog with StagingTableCatalog {
import CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.sources.v2.TestInMemoryTableCatalog._
override def stageCreate(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
validateStagedTable(partitions, properties)
new TestStagedCreateTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}", schema, properties))
}
override def stageReplace(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
validateStagedTable(partitions, properties)
new TestStagedReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}", schema, properties))
}
override def stageCreateOrReplace(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
validateStagedTable(partitions, properties)
new TestStagedCreateOrReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}", schema, properties))
}
private def validateStagedTable(
partitions: Array[Transform],
properties: util.Map[String, String]): Unit = {
if (partitions.nonEmpty) {
throw new UnsupportedOperationException(
s"Catalog $name: Partitioned tables are not supported")
}
maybeSimulateFailedTableCreation(properties)
}
private abstract class TestStagedTable(
ident: Identifier,
delegateTable: InMemoryTable)
extends StagedTable with SupportsWrite with SupportsRead {
override def abortStagedChanges(): Unit = {}
override def name(): String = delegateTable.name
override def schema(): StructType = delegateTable.schema
override def capabilities(): util.Set[TableCapability] = delegateTable.capabilities
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
delegateTable.newWriteBuilder(options)
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
delegateTable.newScanBuilder(options)
}
}
private class TestStagedCreateTable(
ident: Identifier,
delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) {
override def commitStagedChanges(): Unit = {
val maybePreCommittedTable = tables.putIfAbsent(ident, delegateTable)
if (maybePreCommittedTable != null) {
throw new TableAlreadyExistsException(
s"Table with identifier $ident and name $name was already created.")
}
}
}
private class TestStagedReplaceTable(
ident: Identifier,
delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) {
override def commitStagedChanges(): Unit = {
maybeSimulateDropBeforeCommit()
val maybePreCommittedTable = tables.replace(ident, delegateTable)
if (maybePreCommittedTable == null) {
throw new CannotReplaceMissingTableException(ident)
}
}
private def maybeSimulateDropBeforeCommit(): Unit = {
if ("true".equalsIgnoreCase(
delegateTable.properties.get(SIMULATE_DROP_BEFORE_REPLACE_PROPERTY))) {
tables.remove(ident)
}
}
}
private class TestStagedCreateOrReplaceTable(
ident: Identifier,
delegateTable: InMemoryTable) extends TestStagedTable(ident, delegateTable) {
override def commitStagedChanges(): Unit = {
tables.put(ident, delegateTable)
}
}
}
class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()
}