[SPARK-29197][SQL] Remove saveModeForDSV2 from DataFrameWriter

### What changes were proposed in this pull request?

It is very confusing that the default save mode is different between the internal implementation of a Data source. The reason that we had to have saveModeForDSV2 was that there was no easy way to check the existence of a Table in DataSource v2. Now, we have catalogs for that. Therefore we should be able to remove the different save modes. We also have a plan forward for `save`, where we can't really check the existence of a table, and therefore create one. That will come in a future PR.

### Why are the changes needed?

Because it is confusing that the internal implementation of a data source (which is generally non-obvious to users) decides which default save mode is used within Spark.

### Does this PR introduce any user-facing change?

It changes the default save mode for V2 Tables in the DataFrameWriter APIs

### How was this patch tested?

Existing tests

Closes #25876 from brkyvz/removeSM.

Lead-authored-by: Burak Yavuz <brkyvz@gmail.com>
Co-authored-by: Burak Yavuz <burak@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Burak Yavuz 2019-09-26 15:20:04 +08:00 committed by Wenchen Fan
parent b8b59d6fa3
commit c8159c7941
7 changed files with 51 additions and 45 deletions

View file

@ -400,6 +400,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.mode("append")
.save()
checkAnswer(
createKafkaReader(topic, includeHeaders = true).selectExpr(
@ -423,6 +424,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.mode("append")
.save()
}
TestUtils.assertExceptionMsg(ex, "null topic present in the data")
@ -457,6 +459,7 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.mode("append")
.save()
}
}

View file

@ -1643,7 +1643,7 @@ object SQLConf {
"implementation class names for which Data Source V2 code path is disabled. These data " +
"sources will fallback to Data Source V1 code path.")
.stringConf
.createWithDefault("")
.createWithDefault("kafka")
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
.doc("A comma-separated list of fully qualified data source register class names for which" +

View file

@ -67,7 +67,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter[T] = {
this.mode = Some(saveMode)
this.mode = saveMode
this
}
@ -267,7 +267,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
"if partition columns are specified.")
}
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
modeForDSV2 match {
mode match {
case SaveMode.Append =>
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan, extraOptions.toMap)
@ -308,7 +308,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
sparkSession = df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
options = extraOptions.toMap).planForWriting(modeForDSV1, df.logicalPlan)
options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
}
}
@ -319,6 +319,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @note Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based
* resolution. For example:
*
* @note SaveMode.ErrorIfExists and SaveMode.Ignore behave as SaveMode.Append in `insertInto` as
* `insertInto` is not a table creating operation.
*
* {{{
* scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
* scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
@ -380,8 +383,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
DataSourceV2Relation.create(t)
}
val command = modeForDSV2 match {
case SaveMode.Append =>
val command = mode match {
case SaveMode.Append | SaveMode.ErrorIfExists | SaveMode.Ignore =>
AppendData.byPosition(table, df.logicalPlan, extraOptions.toMap)
case SaveMode.Overwrite =>
@ -394,10 +397,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
} else {
OverwriteByExpression.byPosition(table, df.logicalPlan, Literal(true), extraOptions.toMap)
}
case other =>
throw new AnalysisException(s"insertInto does not support $other mode, " +
s"please use Append or Overwrite mode instead.")
}
runCommand(df.sparkSession, "insertInto") {
@ -411,7 +410,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
table = UnresolvedRelation(tableIdent),
partitionSpec = Map.empty[String, Option[String]],
query = df.logicalPlan,
overwrite = modeForDSV1 == SaveMode.Overwrite,
overwrite = mode == SaveMode.Overwrite,
ifPartitionNotExists = false)
}
}
@ -490,12 +489,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)
saveAsTable(catalog.asTableCatalog, ident)
case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 =>
// We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility
// for now.
saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1)
saveAsTable(sessionCatalog.asTableCatalog, ident)
case AsTableIdentifier(tableIdentifier) =>
saveAsTable(tableIdentifier)
@ -507,7 +504,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = {
private def saveAsTable(catalog: TableCatalog, ident: Identifier): Unit = {
val partitioning = partitioningColumns.map { colNames =>
colNames.map(name => IdentityTransform(FieldReference(name)))
}.getOrElse(Seq.empty[Transform])
@ -568,7 +565,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val tableIdentWithDB = tableIdent.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
(tableExists, modeForDSV1) match {
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
// Do nothing
@ -624,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
bucketSpec = getBucketSpec)
runCommand(df.sparkSession, "saveAsTable")(
CreateTable(tableDesc, modeForDSV1, Some(df.logicalPlan)))
CreateTable(tableDesc, mode, Some(df.logicalPlan)))
}
/**
@ -830,10 +827,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
SQLExecution.withNewExecutionId(session, qe, Some(name))(qe.toRdd)
}
private def modeForDSV1 = mode.getOrElse(SaveMode.ErrorIfExists)
private def modeForDSV2 = mode.getOrElse(SaveMode.Append)
private def lookupV2Provider(): Option[TableProvider] = {
DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
// TODO(SPARK-28396): File source v2 write path is currently broken.
@ -848,7 +841,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private var source: String = df.sparkSession.sessionState.conf.defaultDataSourceName
private var mode: Option[SaveMode] = None
private var mode: SaveMode = SaveMode.ErrorIfExists
private val extraOptions = new scala.collection.mutable.HashMap[String, String]

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
class DataSourceV2DataFrameSuite
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) {
@ -75,13 +76,15 @@ class DataSourceV2DataFrameSuite
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
// Default saveMode is append, therefore this doesn't throw a table already exists exception
// Default saveMode is ErrorIfExists
intercept[TableAlreadyExistsException] {
df.write.saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
assert(spark.table(t1).count() === 0)
// also appends are by name not by position
df.select('data, 'id).write.saveAsTable(t1)
checkAnswer(spark.table(t1), df.union(df))
// appends are by name not by position
df.select('data, 'id).write.mode("append").saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

View file

@ -225,9 +225,13 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession {
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).select('id, -'id))
// default save mode is append
// default save mode is ErrorIfExists
intercept[AnalysisException] {
spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).save()
}
spark.range(10).select('id as 'i, -'id as 'j).write.mode("append").format(cls.getName)
.option("path", path).save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).union(spark.range(10)).select('id, -'id))
@ -281,7 +285,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession {
val numPartition = 6
spark.range(0, 10, 1, numPartition).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).save()
.mode("append").option("path", path).save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).select('id, -'id))
@ -368,7 +372,7 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession {
val format = classOf[SimpleWritableDataSource].getName
val df = Seq((1L, 2L)).toDF("i", "j")
df.write.format(format).option("path", optionPath).save()
df.write.format(format).mode("append").option("path", optionPath).save()
assert(!new File(sessionPath).exists)
checkAnswer(spark.read.format(format).option("path", optionPath).load(), df)
}

View file

@ -32,6 +32,7 @@ class NoopSuite extends SharedSparkSession {
}
.write
.format("noop")
.mode("append")
.save()
assert(accum.value == numElems)
}
@ -54,7 +55,7 @@ class NoopSuite extends SharedSparkSession {
accum.add(1)
x
}
.write.format("noop").save()
.write.mode("append").format("noop").save()
assert(accum.value == numElems)
}
}

View file

@ -289,18 +289,20 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
assert(plan.isInstanceOf[OverwriteByExpression])
// By default the save mode is `ErrorIfExists` for data source v2.
val e = intercept[AnalysisException] {
spark.range(10).write
.format(classOf[NoopDataSource].getName)
.save()
sparkContext.listenerBus.waitUntilEmpty()
assert(plan.isInstanceOf[AppendData])
}
assert(e.getMessage.contains("ErrorIfExists"))
val e2 = intercept[AnalysisException] {
spark.range(10).write
.format(classOf[NoopDataSource].getName)
.mode("default")
.save()
sparkContext.listenerBus.waitUntilEmpty()
assert(plan.isInstanceOf[AppendData])
}
assert(e2.getMessage.contains("ErrorIfExists"))
} finally {
spark.listenerManager.unregister(listener)
}