From 604d6799df23329778bc384d429445cf52def4d4 Mon Sep 17 00:00:00 2001 From: Ximo Guanter Date: Mon, 6 Jan 2020 23:53:45 +0800 Subject: [PATCH] [SPARK-30226][SQL] Remove withXXX functions in WriteBuilder ### What changes were proposed in this pull request? Adding a `LogicalWriteInfo` interface as suggested by cloud-fan in https://github.com/apache/spark/pull/25990#issuecomment-555132991 ### Why are the changes needed? It provides compile-time guarantees where we previously had none, which will make it harder to introduce bugs in the future. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Compiles and passes tests Closes #26678 from edrevo/add-logical-write-info. Lead-authored-by: Ximo Guanter Co-authored-by: Ximo Guanter Signed-off-by: Wenchen Fan --- .../apache/spark/sql/v2/avro/AvroTable.scala | 6 +-- .../spark/sql/v2/avro/AvroWriteBuilder.scala | 8 ++-- .../sql/kafka010/KafkaSourceProvider.scala | 12 ++--- .../sql/connector/catalog/StagedTable.java | 10 ++-- .../catalog/StagingTableCatalog.java | 8 ++-- .../sql/connector/catalog/SupportsWrite.java | 8 ++-- .../sql/connector/write/LogicalWriteInfo.java | 46 +++++++++++++++++++ .../sql/connector/write/WriteBuilder.java | 23 ---------- .../write/LogicalWriteInfoImpl.scala | 26 +++++++++++ .../spark/sql/connector/InMemoryTable.scala | 4 +- .../StagingInMemoryTableCatalog.scala | 6 +-- .../datasources/noop/NoopDataSource.scala | 4 +- .../datasources/v2/FileWriteBuilder.scala | 22 +++------ .../datasources/v2/V1FallbackWriters.scala | 11 +++-- .../v2/WriteToDataSourceV2Exec.scala | 34 ++++++++------ .../datasources/v2/csv/CSVTable.scala | 6 +-- .../datasources/v2/csv/CSVWriteBuilder.scala | 8 ++-- .../datasources/v2/json/JsonTable.scala | 6 +-- .../v2/json/JsonWriteBuilder.scala | 8 ++-- .../datasources/v2/orc/OrcTable.scala | 6 +-- .../datasources/v2/orc/OrcWriteBuilder.scala | 8 ++-- .../datasources/v2/parquet/ParquetTable.scala | 6 +-- .../v2/parquet/ParquetWriteBuilder.scala | 9 ++-- .../datasources/v2/text/TextTable.scala | 6 +-- .../v2/text/TextWriteBuilder.scala | 8 ++-- .../execution/streaming/StreamExecution.scala | 10 ++-- .../sql/execution/streaming/console.scala | 13 ++---- .../sources/ForeachWriterTable.scala | 12 ++--- .../execution/streaming/sources/memory.scala | 12 ++--- .../FileDataSourceV2FallBackSuite.scala | 4 +- .../connector/SimpleWritableDataSource.scala | 14 ++---- .../sql/connector/V1WriteFallbackSuite.scala | 14 +++--- .../datasources/v2/FileTableSuite.scala | 4 +- .../sources/StreamingDataSourceV2Suite.scala | 4 +- 34 files changed, 207 insertions(+), 179 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala index 765e5727d9..2096f1a08a 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroUtils -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DataType, StructType} @@ -42,8 +42,8 @@ case class AvroTable( override def inferSchema(files: Seq[FileStatus]): Option[StructType] = AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files) - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new AvroWriteBuilder(options, paths, formatName, supportsDataType) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = + new AvroWriteBuilder(paths, formatName, supportsDataType, info) override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala index c2ddc4b191..c4defb9f06 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala @@ -19,18 +19,18 @@ package org.apache.spark.sql.v2.avro import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.avro.AvroUtils +import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CaseInsensitiveStringMap class AvroWriteBuilder( - options: CaseInsensitiveStringMap, paths: Seq[String], formatName: String, - supportsDataType: DataType => Boolean) - extends FileWriteBuilder(options, paths, formatName, supportsDataType) { + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo) + extends FileWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index c15f08d787..4ffa70f9f3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder} +import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.connector.write.streaming.StreamingWrite import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ @@ -392,18 +392,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new KafkaScan(options) - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new WriteBuilder { - private var inputSchema: StructType = _ + private val options = info.options + private val inputSchema: StructType = info.schema() private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim) private val producerParams = kafkaParamsForProducer(CaseInsensitiveMap(options.asScala.toMap)) - override def withInputDataSchema(schema: StructType): WriteBuilder = { - this.inputSchema = schema - this - } - override def buildForBatch(): BatchWrite = { assert(inputSchema != null) new KafkaBatchWrite(topic, producerParams, inputSchema) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagedTable.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagedTable.java index 9fd70cc977..84b24f204b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagedTable.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagedTable.java @@ -21,8 +21,8 @@ import java.util.Map; import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * Represents a table which is staged for being committed to the metastore. @@ -32,10 +32,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; * {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or * {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the * table for being written to. This table should usually implement {@link SupportsWrite}. A new - * writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}, - * and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()}, - * at which point implementations are expected to commit the table's metadata into the metastore - * along with the data that was written by the writes from the write builder this table created. + * writer will be constructed via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}, and the + * write will be committed. The job concludes with a call to {@link #commitStagedChanges()}, at + * which point implementations are expected to commit the table's metadata into the metastore along + * with the data that was written by the writes from the write builder this table created. */ @Experimental public interface StagedTable extends Table { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java index ca9160aa2f..1c8e9c5024 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java @@ -21,13 +21,13 @@ import java.util.Map; import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.connector.write.BatchWrite; import org.apache.spark.sql.connector.write.WriterCommitMessage; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An optional mix-in for implementations of {@link TableCatalog} that support staging creation of @@ -39,9 +39,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform - * the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the - * write operation fails, the catalog will have already dropped the table, and the planner cannot - * roll back the dropping of the table. + * the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}. + * However, if the write operation fails, the catalog will have already dropped the table, and the + * planner cannot roll back the dropping of the table. *

* If the catalog implements this plugin, the catalog can implement the methods to "stage" the * creation and the replacement of a table. After the table's diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsWrite.java index 5b648468c8..90d79ed492 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsWrite.java @@ -19,13 +19,13 @@ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; import org.apache.spark.sql.connector.write.WriteBuilder; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * A mix-in interface of {@link Table}, to indicate that it's writable. This adds - * {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write - * for batch or streaming. + * {@link #newWriteBuilder(LogicalWriteInfo)} that is used to create a + * write for batch or streaming. */ @Experimental public interface SupportsWrite extends Table { @@ -34,5 +34,5 @@ public interface SupportsWrite extends Table { * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call * this method to configure each data source write. */ - WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options); + WriteBuilder newWriteBuilder(LogicalWriteInfo info); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java new file mode 100644 index 0000000000..831f4e5aac --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * This interface contains logical write information that data sources can use when generating a + * {@link WriteBuilder}. + */ +@Evolving +public interface LogicalWriteInfo { + /** + * the options that the user specified when writing the dataset + */ + CaseInsensitiveStringMap options(); + + /** + * `queryId` is a unique string of the query. It's possible that there are many queries + * running at the same time, or a query is restarted and resumed. {@link BatchWrite} can use + * this id to identify the query. + */ + String queryId(); + + /** + * the schema of the input data from Spark to data source. + */ + StructType schema(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java index f26304e8db..a8d99a8f04 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteBuilder.java @@ -21,7 +21,6 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCapability; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; -import org.apache.spark.sql.types.StructType; /** * An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to @@ -33,28 +32,6 @@ import org.apache.spark.sql.types.StructType; @Evolving public interface WriteBuilder { - /** - * Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's - * possible that there are many queries running at the same time, or a query is restarted and - * resumed. {@link BatchWrite} can use this id to identify the query. - * - * @return a new builder with the `queryId`. By default it returns `this`, which means the given - * `queryId` is ignored. Please override this method to take the `queryId`. - */ - default WriteBuilder withQueryId(String queryId) { - return this; - } - - /** - * Passes the schema of the input data from Spark to data source. - * - * @return a new builder with the `schema`. By default it returns `this`, which means the given - * `schema` is ignored. Please override this method to take the `schema`. - */ - default WriteBuilder withInputDataSchema(StructType schema) { - return this; - } - /** * Returns a {@link BatchWrite} to write data to batch source. By default this method throws * exception, data sources must overwrite this method to provide an implementation, if the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala new file mode 100644 index 0000000000..b1492e4298 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.write + +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +private[sql] case class LogicalWriteInfoImpl( + queryId: String, + schema: StructType, + options: CaseInsensitiveStringMap) extends LogicalWriteInfo diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index 09c4b9ab9d..c9e4e0aad5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -95,8 +95,8 @@ class InMemoryTable( override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { - InMemoryTable.maybeSimulateFailedTableWrite(options) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + InMemoryTable.maybeSimulateFailedTableWrite(info.options) new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite { private var writer: BatchWrite = Append diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/StagingInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/StagingInMemoryTableCatalog.scala index 513ea67b1f..6d4879d355 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/StagingInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/StagingInMemoryTableCatalog.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -88,8 +88,8 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable override def capabilities(): util.Set[TableCapability] = delegateTable.capabilities - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { - delegateTable.newWriteBuilder(options) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + delegateTable.newWriteBuilder(info) } override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index dd44651050..b6149ce729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider} -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType @@ -39,7 +39,7 @@ class NoopDataSource extends TableProvider with DataSourceRegister { } private[noop] object NoopTable extends Table with SupportsWrite { - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = NoopWriteBuilder override def name(): String = "noop-table" override def schema(): StructType = new StructType() override def capabilities(): util.Set[TableCapability] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index 65a2c61ba0..d519832c57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -30,34 +30,24 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder} +import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.SerializableConfiguration abstract class FileWriteBuilder( - options: CaseInsensitiveStringMap, paths: Seq[String], formatName: String, - supportsDataType: DataType => Boolean) extends WriteBuilder { - private var schema: StructType = _ - private var queryId: String = _ + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo) extends WriteBuilder { + private val schema = info.schema() + private val queryId = info.queryId() + private val options = info.options() private var mode: SaveMode = _ - override def withInputDataSchema(schema: StructType): WriteBuilder = { - this.schema = schema - this - } - - override def withQueryId(queryId: String): WriteBuilder = { - this.queryId = queryId - this - } - def mode(mode: SaveMode): WriteBuilder = { this.mode = mode this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala index bf67e97297..f973000254 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.SupportsWrite -import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -98,9 +98,12 @@ sealed trait V1FallbackWriters extends SupportsV1Write { } protected def newWriteBuilder(): V1WriteBuilder = { - val writeBuilder = table.newWriteBuilder(writeOptions) - .withInputDataSchema(plan.schema) - .withQueryId(UUID.randomUUID().toString) + val info = LogicalWriteInfoImpl( + queryId = UUID.randomUUID().toString, + schema = plan.schema, + options = writeOptions) + val writeBuilder = table.newWriteBuilder(info) + writeBuilder.asV1Builder } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index f4c70f7593..e360a9e656 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, SupportsWrite, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, LogicalWriteInfoImpl, PhysicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.{AlwaysTrue, Filter} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -84,9 +84,11 @@ case class CreateTableAsSelectExec( catalog.createTable( ident, schema, partitioning.toArray, properties.asJava) match { case table: SupportsWrite => - val writeBuilder = table.newWriteBuilder(writeOptions) - .withInputDataSchema(schema) - .withQueryId(UUID.randomUUID().toString) + val info = LogicalWriteInfoImpl( + queryId = UUID.randomUUID().toString, + schema, + writeOptions) + val writeBuilder = table.newWriteBuilder(info) writeBuilder match { case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write()) @@ -179,9 +181,11 @@ case class ReplaceTableAsSelectExec( Utils.tryWithSafeFinallyAndFailureCallbacks({ createdTable match { case table: SupportsWrite => - val writeBuilder = table.newWriteBuilder(writeOptions) - .withInputDataSchema(schema) - .withQueryId(UUID.randomUUID().toString) + val info = LogicalWriteInfoImpl( + queryId = UUID.randomUUID().toString, + schema, + writeOptions) + val writeBuilder = table.newWriteBuilder(info) writeBuilder match { case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write()) @@ -335,9 +339,11 @@ trait BatchWriteHelper { def writeOptions: CaseInsensitiveStringMap def newWriteBuilder(): WriteBuilder = { - table.newWriteBuilder(writeOptions) - .withInputDataSchema(query.schema) - .withQueryId(UUID.randomUUID().toString) + val info = LogicalWriteInfoImpl( + queryId = UUID.randomUUID().toString, + query.schema, + writeOptions) + table.newWriteBuilder(info) } } @@ -483,9 +489,11 @@ private[v2] trait AtomicTableWriteExec extends V2TableWriteExec with SupportsV1W Utils.tryWithSafeFinallyAndFailureCallbacks({ stagedTable match { case table: SupportsWrite => - val writeBuilder = table.newWriteBuilder(writeOptions) - .withInputDataSchema(query.schema) - .withQueryId(UUID.randomUUID().toString) + val info = LogicalWriteInfoImpl( + queryId = UUID.randomUUID().toString, + query.schema, + writeOptions) + val writeBuilder = table.newWriteBuilder(info) val writtenRows = writeBuilder match { case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala index 04beee0e3b..3cafe37b74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.csv.CSVDataSource import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -49,8 +49,8 @@ case class CSVTable( CSVDataSource(parsedOptions).inferSchema(sparkSession, files, parsedOptions) } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new CSVWriteBuilder(options, paths, formatName, supportsDataType) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = + new CSVWriteBuilder(paths, formatName, supportsDataType, info) override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala index 92b47e4354..bfbb1831aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala @@ -20,19 +20,19 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap class CSVWriteBuilder( - options: CaseInsensitiveStringMap, paths: Seq[String], formatName: String, - supportsDataType: DataType => Boolean) - extends FileWriteBuilder(options, paths, formatName, supportsDataType) { + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo) + extends FileWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala index 9bb615528f..4b66aec6ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.json.JSONOptionsInRead -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.json.JsonDataSource import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -49,8 +49,8 @@ case class JsonTable( sparkSession, files, parsedOptions) } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new JsonWriteBuilder(options, paths, formatName, supportsDataType) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = + new JsonWriteBuilder(paths, formatName, supportsDataType, info) override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala index 3c99e07489..19f472057e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala @@ -20,19 +20,19 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.sql.catalyst.json.JSONOptions import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CaseInsensitiveStringMap class JsonWriteBuilder( - options: CaseInsensitiveStringMap, paths: Seq[String], formatName: String, - supportsDataType: DataType => Boolean) - extends FileWriteBuilder(options, paths, formatName, supportsDataType) { + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo) + extends FileWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index f2e4b88e9f..3ef41210de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -43,8 +43,8 @@ case class OrcTable( override def inferSchema(files: Seq[FileStatus]): Option[StructType] = OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap) - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new OrcWriteBuilder(options, paths, formatName, supportsDataType) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = + new OrcWriteBuilder(paths, formatName, supportsDataType, info) override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala index f5b06e11c8..4804474870 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala @@ -21,19 +21,19 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA} import org.apache.orc.mapred.OrcStruct +import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils} import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CaseInsensitiveStringMap class OrcWriteBuilder( - options: CaseInsensitiveStringMap, paths: Seq[String], formatName: String, - supportsDataType: DataType => Boolean) - extends FileWriteBuilder(options, paths, formatName, supportsDataType) { + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo) + extends FileWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( sqlConf: SQLConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala index 2ad64b1aa5..e9f9bf8df3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -43,8 +43,8 @@ case class ParquetTable( override def inferSchema(files: Seq[FileStatus]): Option[StructType] = ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files) - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new ParquetWriteBuilder(options, paths, formatName, supportsDataType) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = + new ParquetWriteBuilder(paths, formatName, supportsDataType, info) override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala index bfe2084299..a4e22c21a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2.parquet -import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext} import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel @@ -25,19 +24,19 @@ import org.apache.parquet.hadoop.util.ContextUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.Row +import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CaseInsensitiveStringMap class ParquetWriteBuilder( - options: CaseInsensitiveStringMap, paths: Seq[String], formatName: String, - supportsDataType: DataType => Boolean) - extends FileWriteBuilder(options, paths, formatName, supportsDataType) with Logging { + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo) + extends FileWriteBuilder(paths, formatName, supportsDataType, info) with Logging { override def prepareWrite( sqlConf: SQLConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala index 87bfa84985..36304a9b17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.text import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} @@ -39,8 +39,8 @@ case class TextTable( override def inferSchema(files: Seq[FileStatus]): Option[StructType] = Some(StructType(Seq(StructField("value", StringType)))) - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = - new TextWriteBuilder(options, paths, formatName, supportsDataType) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = + new TextWriteBuilder(paths, formatName, supportsDataType, info) override def supportsDataType(dataType: DataType): Boolean = dataType == StringType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala index c00dbc20be..a3bf4dcae3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala @@ -20,19 +20,19 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.text.{TextOptions, TextOutputWriter} import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.util.CaseInsensitiveStringMap class TextWriteBuilder( - options: CaseInsensitiveStringMap, paths: Seq[String], formatName: String, - supportsDataType: DataType => Boolean) - extends FileWriteBuilder(options, paths, formatName, supportsDataType) { + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo) + extends FileWriteBuilder(paths, formatName, supportsDataType, info) { private def verifySchema(schema: StructType): Unit = { if (schema.size != 1) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1cb395517e..6dff5c6f26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream} -import org.apache.spark.sql.connector.write.SupportsTruncate +import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate} import org.apache.spark.sql.connector.write.streaming.StreamingWrite import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand @@ -612,9 +612,11 @@ abstract class StreamExecution( table: SupportsWrite, options: Map[String, String], inputPlan: LogicalPlan): StreamingWrite = { - val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava)) - .withQueryId(id.toString) - .withInputDataSchema(inputPlan.schema) + val info = LogicalWriteInfoImpl( + queryId = id.toString, + inputPlan.schema, + new CaseInsensitiveStringMap(options.asJava)) + val writeBuilder = table.newWriteBuilder(info) outputMode match { case Append => writeBuilder.buildForStreaming() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 20eb7ae5a6..63e4089194 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider} -import org.apache.spark.sql.connector.write.{SupportsTruncate, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, WriteBuilder} import org.apache.spark.sql.connector.write.streaming.StreamingWrite import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} @@ -71,21 +71,16 @@ object ConsoleTable extends Table with SupportsWrite { Set(TableCapability.STREAMING_WRITE).asJava } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new WriteBuilder with SupportsTruncate { - private var inputSchema: StructType = _ - - override def withInputDataSchema(schema: StructType): WriteBuilder = { - this.inputSchema = schema - this - } + private val inputSchema: StructType = info.schema() // Do nothing for truncate. Console sink is special that it just prints all the records. override def truncate(): WriteBuilder = this override def buildForStreaming(): StreamingWrite = { assert(inputSchema != null) - new ConsoleWrite(inputSchema, options) + new ConsoleWrite(inputSchema, info.options) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 4793cb9a9b..6e4f40ad08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -27,11 +27,10 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.write.{DataWriter, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.python.PythonForeachWriter import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A write-only table for forwarding data into the specified [[ForeachWriter]]. @@ -54,14 +53,9 @@ case class ForeachWriterTable[T]( Set(TableCapability.STREAMING_WRITE).asJava } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new WriteBuilder with SupportsTruncate { - private var inputSchema: StructType = _ - - override def withInputDataSchema(schema: StructType): WriteBuilder = { - this.inputSchema = schema - this - } + private var inputSchema: StructType = info.schema() // Do nothing for truncate. Foreach sink is special that it just forwards all the records to // ForeachWriter. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index 0cc067fc76..2b674070a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -33,11 +33,10 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit @@ -53,21 +52,16 @@ class MemorySink extends Table with SupportsWrite with Logging { Set(TableCapability.STREAMING_WRITE).asJava } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new WriteBuilder with SupportsTruncate { private var needTruncate: Boolean = false - private var inputSchema: StructType = _ + private val inputSchema: StructType = info.schema() override def truncate(): WriteBuilder = { this.needTruncate = true this } - override def withInputDataSchema(schema: StructType): WriteBuilder = { - this.inputSchema = schema - this - } - override def buildForStreaming(): StreamingWrite = { new MemoryStreamingWrite(MemorySink.this, inputSchema, needTruncate) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index 2b3340527a..21938f301d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution} import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -74,7 +74,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsWrite { override def schema(): StructType = StructType(Nil) - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = throw new AnalysisException("Dummy file writer") override def capabilities(): java.util.Set[TableCapability] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index a0f1a9f9f5..0070076459 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -70,15 +70,11 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport { override def readSchema(): StructType = tableSchema } - class MyWriteBuilder(path: String) extends WriteBuilder with SupportsTruncate { - private var queryId: String = _ + class MyWriteBuilder(path: String, info: LogicalWriteInfo) + extends WriteBuilder with SupportsTruncate { + private val queryId: String = info.queryId() private var needTruncate = false - override def withQueryId(queryId: String): WriteBuilder = { - this.queryId = queryId - this - } - override def truncate(): WriteBuilder = { this.needTruncate = true this @@ -143,8 +139,8 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport { new MyScanBuilder(new Path(path).toUri.toString, conf) } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { - new MyWriteBuilder(path) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + new MyWriteBuilder(path, info) } override def capabilities(): util.Set[TableCapability] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala index de843ba437..a36e8dbdec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala @@ -27,11 +27,11 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} -import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} -import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceUtils} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder} +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class V1WriteFallbackSuite extends QueryTest with SharedSparkSession with BeforeAndAfter { @@ -233,7 +233,9 @@ class InMemoryV1Provider // do nothing return getRelation } - val writer = table.newWriteBuilder(new CaseInsensitiveStringMap(parameters.asJava)) + val writer = table.newWriteBuilder( + LogicalWriteInfoImpl( + "", StructType(Seq.empty), new CaseInsensitiveStringMap(parameters.asJava))) if (mode == SaveMode.Overwrite) { writer.asInstanceOf[SupportsTruncate].truncate() } @@ -267,8 +269,8 @@ class InMemoryTableWithV1Fallback( def getData: Seq[Row] = dataMap.values.flatten.toSeq - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { - new FallbackWriteBuilder(options) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + new FallbackWriteBuilder(info.options) } private class FallbackWriteBuilder(options: CaseInsensitiveStringMap) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala index 7f4bbcf97b..8f001e0e4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.connector.read.ScanBuilder -import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.test.SharedSparkSession @@ -44,7 +44,7 @@ class DummyFileTable( override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = null - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = null + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null override def supportsDataType(dataType: DataType): Boolean = dataType == StringType diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index c4f0751a95..13bc811a8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsRea import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, MicroBatchStream, Offset, PartitionOffset} -import org.apache.spark.sql.connector.write.{PhysicalWriteInfo, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, PhysicalWriteInfo, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, RateStreamOffset, Sink, StreamingQueryWrapper} @@ -86,7 +86,7 @@ trait FakeStreamingWriteTable extends Table with SupportsWrite { override def capabilities(): util.Set[TableCapability] = { Set(STREAMING_WRITE).asJava } - override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new FakeWriteBuilder } }