[SPARK-27136][SQL] Remove data source option check_files_exist
## What changes were proposed in this pull request? The data source option check_files_exist is introduced in In #23383 when the file source V2 framework is implemented. In the PR, FileIndex was created as a member of FileTable, so that we could implement partition pruning like 0f9fcab in the future. At that time `FileIndex`es will always be created for file writes, so we needed the option to decide whether to check file existence. After https://github.com/apache/spark/pull/23774, the option is not needed anymore, since Dataframe writes won't create unnecessary FileIndex. This PR is to remove the option. ## How was this patch tested? Unit test. Closes #24069 from gengliangwang/removeOptionCheckFilesExist. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
8819eaba4d
commit
6d22ee3969
|
@ -213,9 +213,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
|
|||
val objectMapper = new ObjectMapper()
|
||||
Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
|
||||
}
|
||||
// TODO SPARK-27113: remove this option.
|
||||
val checkFilesExistsOpt = "check_files_exist" -> "true"
|
||||
val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption + checkFilesExistsOpt
|
||||
|
||||
val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption
|
||||
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
|
||||
val table = userSpecifiedSchema match {
|
||||
case Some(schema) => provider.getTable(dsOptions, schema)
|
||||
|
|
|
@ -261,10 +261,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
|
|||
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
|
||||
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
|
||||
provider, session.sessionState.conf)
|
||||
// TODO SPARK-27113: remove this option.
|
||||
val checkFilesExistsOption = "check_files_exist" -> "false"
|
||||
val options = sessionOptions ++ extraOptions + checkFilesExistsOption
|
||||
val options = sessionOptions ++ extraOptions
|
||||
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
|
||||
|
||||
provider.getTable(dsOptions) match {
|
||||
case table: SupportsBatchWrite =>
|
||||
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
|
||||
|
|
|
@ -36,10 +36,8 @@ abstract class FileTable(
|
|||
lazy val fileIndex: PartitioningAwareFileIndex = {
|
||||
val scalaMap = options.asScala.toMap
|
||||
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap)
|
||||
// This is an internal config so must be present.
|
||||
val checkFilesExist = options.get("check_files_exist").toBoolean
|
||||
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
|
||||
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
|
||||
checkEmptyGlobPath = true, checkFilesExist = true)
|
||||
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
|
||||
new InMemoryFileIndex(
|
||||
sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache)
|
||||
|
|
Loading…
Reference in a new issue