diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 5cee2b9af6..644e5d65d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -77,12 +77,12 @@ class ListingFileCatalog( if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext) } else { + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) val statuses = { val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) @@ -101,7 +101,8 @@ class ListingFileCatalog( // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a // a big deal since we always use to `listLeafFilesInParallel` when the number of paths // exceeds threshold. - case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + case f => + HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) } }.filterNot { status => val name = status.getPath.getName diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index b516297115..8d332df029 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -348,28 +348,40 @@ private[sql] object HadoopFsRelation extends Logging { pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".") } + /** + * Create a LocatedFileStatus using FileStatus and block locations. + */ + def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): LocatedFileStatus = { + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), which is + // very slow on some file system (RawLocalFileSystem, which is launch a subprocess and parse the + // stdout). + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } + // We don't filter files/directories whose name start with "_" except "_temporary" here, as // specific data sources may take advantages over them (e.g. Parquet _metadata and // _common_metadata files). "_temporary" directories are explicitly ignored since failed // tasks/jobs may leave partial/corrupted data files there. Files and directories whose name // start with "." are also ignored. - def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = { + def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = { logInfo(s"Listing ${status.getPath}") val name = status.getPath.getName.toLowerCase if (shouldFilterOut(name)) { Array.empty } else { - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(fs.getConf, this.getClass()) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) val statuses = { val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory) - val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir)) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats + val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter)) + if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats } statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { case f: LocatedFileStatus => f - case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) + case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) } } } @@ -403,9 +415,15 @@ private[sql] object HadoopFsRelation extends Logging { val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) - val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new Path(_)).flatMap { path => - val fs = path.getFileSystem(serializableConfiguration.value) - Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty) + val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions { paths => + // Dummy jobconf to get to the pathFilter defined in configuration + // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) + val jobConf = new JobConf(serializableConfiguration.value, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + paths.map(new Path(_)).flatMap { path => + val fs = path.getFileSystem(serializableConfiguration.value) + Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).getOrElse(Array.empty) + } }.map { status => val blockLocations = status match { case f: LocatedFileStatus =>