[SPARK-26871][SQL] File Source V2: avoid creating unnecessary FileIndex in the write path

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/23383, the file source V2 framework is implemented. In the PR, `FileIndex` is created as a member of `FileTable`, so that we can implement partition pruning like 0f9fcabb4a in the future(As data source V2 catalog is under development, partition pruning is removed from the PR)

However, after write path of file source V2 is implemented, I find that a simple write will create an unnecessary `FileIndex`, which is required by `FileTable`. This is a sort of regression. And we can see there is a warning message when writing to ORC files
```
WARN InMemoryFileIndex: The directory file:/tmp/foo was not found. Was it deleted very recently?
```
This PR is to make `FileIndex` as a lazy value in `FileTable`, so that we can avoid creating unnecessary `FileIndex` in the write path.

## How was this patch tested?

Existing unit test

Closes #23774 from gengliangwang/moveFileIndexInV2.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Gengliang Wang 2019-02-15 14:57:23 +08:00 committed by Wenchen Fan
parent c406472970
commit 71170e74df
5 changed files with 21 additions and 29 deletions

View file

@ -35,7 +35,7 @@ class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPl
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
val v1FileFormat = new OrcFileFormat
val relation = HadoopFsRelation(table.getFileIndex, table.getFileIndex.partitionSchema,
val relation = HadoopFsRelation(table.fileIndex, table.fileIndex.partitionSchema,
table.schema(), None, v1FileFormat, d.options)(sparkSession)
i.copy(table = LogicalRelation(relation))
}

View file

@ -16,13 +16,10 @@
*/
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, SupportsBatchRead, TableProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.sources.v2.TableProvider
/**
* A base interface for data source v2 implementations of the built-in file-based data sources.
@ -38,17 +35,4 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
def fallBackFileFormat: Class[_ <: FileFormat]
lazy val sparkSession = SparkSession.active
def getFileIndex(
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType]): PartitioningAwareFileIndex = {
val filePaths = options.paths()
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
}
}

View file

@ -16,19 +16,30 @@
*/
package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table}
import org.apache.spark.sql.types.StructType
abstract class FileTable(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType])
extends Table with SupportsBatchRead with SupportsBatchWrite {
def getFileIndex: PartitioningAwareFileIndex = this.fileIndex
lazy val fileIndex: PartitioningAwareFileIndex = {
val filePaths = options.paths()
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
}
lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
inferSchema(fileIndex.allFiles())

View file

@ -34,13 +34,11 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: DataSourceOptions): Table = {
val tableName = getTableName(options)
val fileIndex = getFileIndex(options, None)
OrcTable(tableName, sparkSession, fileIndex, None)
OrcTable(tableName, sparkSession, options, None)
}
override def getTable(options: DataSourceOptions, schema: StructType): Table = {
val tableName = getTableName(options)
val fileIndex = getFileIndex(options, Some(schema))
OrcTable(tableName, sparkSession, fileIndex, Some(schema))
OrcTable(tableName, sparkSession, options, Some(schema))
}
}

View file

@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2.orc
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.sources.v2.DataSourceOptions
@ -29,9 +28,9 @@ import org.apache.spark.sql.types.StructType
case class OrcTable(
name: String,
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType])
extends FileTable(sparkSession, fileIndex, userSpecifiedSchema) {
extends FileTable(sparkSession, options, userSpecifiedSchema) {
override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder =
new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)