[SPARK-23896][SQL] Improve PartitioningAwareFileIndex

## What changes were proposed in this pull request?

Currently `PartitioningAwareFileIndex` accepts an optional parameter `userPartitionSchema`. If provided, it will combine the inferred partition schema with the parameter.

However,
1. to get `userPartitionSchema`, we need to  combine inferred partition schema with `userSpecifiedSchema`
2. to get the inferred partition schema, we have to create a temporary file index.

Only after that, a final version of `PartitioningAwareFileIndex` can be created.

This can be improved by passing `userSpecifiedSchema` to `PartitioningAwareFileIndex`.

With the improvement, we can reduce redundant code and avoid parsing the file partition twice.
## How was this patch tested?
Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21004 from gengliangwang/PartitioningAwareFileIndex.
This commit is contained in:
Gengliang Wang 2018-04-14 00:22:38 +08:00 committed by Wenchen Fan
parent a83ae0d9bc
commit 4dfd746de3
7 changed files with 103 additions and 108 deletions

View file

@ -85,7 +85,7 @@ class CatalogFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
}
}

View file

@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
@ -103,24 +102,6 @@ case class DataSource(
bucket.sortColumnNames, "in the sort definition", equality)
}
/**
* In the read path, only managed tables by Hive provide the partition columns properly when
* initializing this class. All other file based data sources will try to infer the partitioning,
* and then cast the inferred types to user specified dataTypes if the partition columns exist
* inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
* inconsistent data types as reported in SPARK-21463.
* @param fileIndex A FileIndex that will perform partition inference
* @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
*/
private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = {
val resolved = fileIndex.partitionSchema.map { partitionField =>
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
partitionField)
}
StructType(resolved)
}
/**
* Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
* it. In the read path, only managed tables by Hive provide the partition columns properly when
@ -140,31 +121,26 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
* @param fileStatusCache the shared cache for file statuses to speed up listing
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
}.toArray
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
lazy val tempFileIndex = fileIndex.getOrElse {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
}
val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)
tempFileIndex.partitionSchema
} else {
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
@ -356,13 +332,7 @@ case class DataSource(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)
val fileCatalog = if (userSpecifiedSchema.nonEmpty) {
val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)
new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))
} else {
tempFileCatalog
}
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
@ -384,24 +354,23 @@ case class DataSource(
// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.flatMap(
DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
new CatalogFileIndex(
val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
(index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
} else {
new InMemoryFileIndex(
sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, Some(index))
(index, resultDataSchema, resultPartitionSchema)
}
HadoopFsRelation(
@ -552,6 +521,40 @@ case class DataSource(
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}
/**
* Checks and returns files in all the paths.
*/
private def checkAndGlobPathIfNecessary(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}.toSeq
}
}
object DataSource extends Logging {
@ -699,30 +702,6 @@ object DataSource extends Logging {
locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
}
/**
* If `path` is a file pattern, return all the files that match it. Otherwise, return itself.
* If `checkFilesExist` is `true`, also check the file existence.
*/
private def checkAndGlobPathIfNecessary(
hadoopConf: Configuration,
path: String,
checkFilesExist: Boolean): Seq[Path] = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
if (globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}
/**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.

View file

@ -41,17 +41,17 @@ import org.apache.spark.util.SerializableConfiguration
* @param rootPathsSpecified the list of root table paths to scan (some of which might be
* filtered out later)
* @param parameters as set of options to control discovery
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
* types for the discovered partitions
*/
class InMemoryFileIndex(
sparkSession: SparkSession,
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
extends PartitioningAwareFileIndex(
sparkSession, parameters, partitionSchema, fileStatusCache) {
sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {
// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)
// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain

View file

@ -34,13 +34,13 @@ import org.apache.spark.sql.types.{StringType, StructType}
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
* @param userPartitionSchema an optional partition schema that will be use to provide types for
* the discovered partitions
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
* types for the discovered partitions
*/
abstract class PartitioningAwareFileIndex(
sparkSession: SparkSession,
parameters: Map[String, String],
userPartitionSchema: Option[StructType],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
import PartitioningAwareFileIndex.BASE_PATH_PARAM
@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
userPartitionSchema match {
val inferredPartitionSpec = PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths,
timeZoneId = timeZoneId)
userSpecifiedSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs,
typeInference = false,
basePaths = basePaths,
timeZoneId = timeZoneId)
val userPartitionSchema =
combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)
// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
def castPartitionValuesToUserSchema(row: InternalRow) = {
InternalRow((0 until row.numFields).map { i =>
val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType
Cast(
Literal.create(row.getUTF8String(i), StringType),
userProvidedSchema.fields(i).dataType,
Literal.create(row.get(i, dt), dt),
userPartitionSchema.fields(i).dataType,
Option(timeZoneId)).eval()
}: _*)
}
PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
PartitionSpec(userPartitionSchema, inferredPartitionSpec.partitions.map { part =>
part.copy(values = castPartitionValuesToUserSchema(part.values))
})
case _ =>
PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths,
timeZoneId = timeZoneId)
inferredPartitionSpec
}
}
@ -236,6 +233,25 @@ abstract class PartitioningAwareFileIndex(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
/**
* In the read path, only managed tables by Hive provide the partition columns properly when
* initializing this class. All other file based data sources will try to infer the partitioning,
* and then cast the inferred types to user specified dataTypes if the partition columns exist
* inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
* inconsistent data types as reported in SPARK-21463.
* @param spec A partition inference result
* @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
*/
private def combineInferredAndUserSpecifiedPartitionSchema(spec: PartitionSpec): StructType = {
val equality = sparkSession.sessionState.conf.resolver
val resolved = spec.partitionColumns.map { partitionField =>
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
partitionField)
}
StructType(resolved)
}
}
object PartitioningAwareFileIndex {

View file

@ -30,14 +30,14 @@ import org.apache.spark.sql.types.StructType
* A [[FileIndex]] that generates the list of files to processing by reading them from the
* metadata log files generated by the [[FileStreamSink]].
*
* @param userPartitionSchema an optional partition schema that will be use to provide types for
* the discovered partitions
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
* types for the discovered partitions
*/
class MetadataLogFileIndex(
sparkSession: SparkSession,
path: Path,
userPartitionSchema: Option[StructType])
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) {
userSpecifiedSchema: Option[StructType])
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) {
private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
@ -51,7 +51,7 @@ class MetadataLogFileIndex(
}
override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
allFilesFromLog.toArray.groupBy(_.getPath.getParent)
allFilesFromLog.groupBy(_.getPath.getParent)
}
override def rootPaths: Seq[Path] = path :: Nil

View file

@ -401,7 +401,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
sparkSession = spark,
rootPathsSpecified = Seq(new Path(tempDir)),
parameters = Map.empty[String, String],
partitionSchema = None)
userSpecifiedSchema = None)
// This should not fail.
fileCatalog.listLeafFiles(Seq(new Path(tempDir)))

View file

@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite
HiveCatalogMetrics.reset()
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}