[SPARK-26368][SQL] Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex

## What changes were proposed in this pull request?
I was looking at the code and it was a bit difficult to see the life cycle of InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is passed in, and another time it was created in getOrInferFileFormatSchema. It'd be easier to understand the life cycle if we move the creation of it out.

## How was this patch tested?
This is a simple code move and should be covered by existing tests.

Closes #23317 from rxin/SPARK-26368.

Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Reynold Xin 2018-12-13 20:55:12 -08:00 committed by gatorsmile
parent 93139afb07
commit 2d8838dccd

View file

@ -122,21 +122,14 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @param getFileIndex [[InMemoryFileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = fileIndex.getOrElse {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
}
getFileIndex: () => InMemoryFileIndex): (StructType, StructType) = {
lazy val tempFileIndex = getFileIndex()
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
@ -236,7 +229,15 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, () => {
// The operations below are expensive therefore try not to do them if we don't need to,
// e.g., in streaming mode, we have already inferred and registered partition columns,
// we will never have to materialize the lazy val below
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
})
SourceInfo(
s"FileSource[$path]",
StructType(dataSchema ++ partitionSchema),
@ -370,7 +371,7 @@ case class DataSource(
} else {
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, Some(index))
getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
}