[SPARK-31321][SQL] Remove SaveMode check in v2 FileWriteBuilder
### What changes were proposed in this pull request? The `SaveMode` is resolved before we create `FileWriteBuilder` to build `BatchWrite`. In https://github.com/apache/spark/pull/25876, we removed save mode for DSV2 from DataFrameWriter. So that the `mode` method is never used which makes `validateInputs` fail determinately without `mode` set. ### Why are the changes needed? rm dead code. ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests. Closes #28090 from yaooqinn/SPARK-31321. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
50e535c431
commit
1ce584f6b7
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.spark.sql.execution.datasources.v2
|
||||
|
||||
import java.io.IOException
|
||||
import java.util.UUID
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
@ -27,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
|
|||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
|
||||
|
||||
import org.apache.spark.internal.io.FileCommitProtocol
|
||||
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
|
||||
import org.apache.spark.sql.{AnalysisException, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
|
||||
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
|
||||
|
@ -46,12 +45,6 @@ abstract class FileWriteBuilder(
|
|||
private val schema = info.schema()
|
||||
private val queryId = info.queryId()
|
||||
private val options = info.options()
|
||||
private var mode: SaveMode = _
|
||||
|
||||
def mode(mode: SaveMode): WriteBuilder = {
|
||||
this.mode = mode
|
||||
this
|
||||
}
|
||||
|
||||
override def buildForBatch(): BatchWrite = {
|
||||
val sparkSession = SparkSession.active
|
||||
|
@ -68,26 +61,8 @@ abstract class FileWriteBuilder(
|
|||
lazy val description =
|
||||
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)
|
||||
|
||||
val fs = path.getFileSystem(hadoopConf)
|
||||
mode match {
|
||||
case SaveMode.ErrorIfExists if fs.exists(path) =>
|
||||
val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
|
||||
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
|
||||
|
||||
case SaveMode.Ignore if fs.exists(path) =>
|
||||
null
|
||||
|
||||
case SaveMode.Overwrite =>
|
||||
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
|
||||
throw new IOException(s"Unable to clear directory $path prior to writing to it")
|
||||
}
|
||||
committer.setupJob(job)
|
||||
new FileBatchWrite(job, description, committer)
|
||||
|
||||
case _ =>
|
||||
committer.setupJob(job)
|
||||
new FileBatchWrite(job, description, committer)
|
||||
}
|
||||
committer.setupJob(job)
|
||||
new FileBatchWrite(job, description, committer)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -104,7 +79,6 @@ abstract class FileWriteBuilder(
|
|||
private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
|
||||
assert(schema != null, "Missing input data schema")
|
||||
assert(queryId != null, "Missing query ID")
|
||||
assert(mode != null, "Missing save mode")
|
||||
|
||||
if (paths.length != 1) {
|
||||
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
|
||||
|
|
Loading…
Reference in a new issue