[SPARK-15307][SQL] speed up listing files for data source

## What changes were proposed in this pull request?

Currently, listing files is very slow if there is thousands files, especially on local file system, because:
1) FileStatus.getPermission() is very slow on local file system, which is launch a subprocess and parse the stdout.
2) Create an JobConf is very expensive (ClassUtil.findContainingJar() is slow).

This PR improve these by:
1) Use another constructor of LocatedFileStatus to avoid calling FileStatus.getPermission, the permissions are not used for data sources.
2) Only create an JobConf once within one task.

## How was this patch tested?

Manually tests on a partitioned table with 1828 partitions, decrease the time to load the table from 22 seconds to 1.6 seconds (Most of time are spent in merging schema now).

Author: Davies Liu <davies@databricks.com>

Closes #13094 from davies/listing.
This commit is contained in:
Davies Liu 2016-05-18 18:46:57 +08:00 committed by Cheng Lian
parent 6e02aec44b
commit 33814f887a
2 changed files with 33 additions and 14 deletions

View file

@ -77,12 +77,12 @@ class ListingFileCatalog(
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
} else {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses = {
val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
@ -101,7 +101,8 @@ class ListingFileCatalog(
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a
// a big deal since we always use to `listLeafFilesInParallel` when the number of paths
// exceeds threshold.
case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
case f =>
HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}.filterNot { status =>
val name = status.getPath.getName

View file

@ -348,28 +348,40 @@ private[sql] object HadoopFsRelation extends Logging {
pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
}
/**
* Create a LocatedFileStatus using FileStatus and block locations.
*/
def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): LocatedFileStatus = {
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(), which is
// very slow on some file system (RawLocalFileSystem, which is launch a subprocess and parse the
// stdout).
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
lfs
}
// We don't filter files/directories whose name start with "_" except "_temporary" here, as
// specific data sources may take advantages over them (e.g. Parquet _metadata and
// _common_metadata files). "_temporary" directories are explicitly ignored since failed
// tasks/jobs may leave partial/corrupted data files there. Files and directories whose name
// start with "." are also ignored.
def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
logInfo(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name)) {
Array.empty
} else {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(fs.getConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val statuses = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
}
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}
}
@ -403,9 +415,15 @@ private[sql] object HadoopFsRelation extends Logging {
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)
val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(serializableConfiguration.value)
Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions { paths =>
// Dummy jobconf to get to the pathFilter defined in configuration
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
val jobConf = new JobConf(serializableConfiguration.value, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(serializableConfiguration.value)
Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).getOrElse(Array.empty)
}
}.map { status =>
val blockLocations = status match {
case f: LocatedFileStatus =>