[SPARK-29089][SQL] Parallelize blocking FileSystem calls in DataSource#checkAndGlobPathIfNecessary

### What changes were proposed in this pull request?
See JIRA: https://issues.apache.org/jira/browse/SPARK-29089
Mailing List: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrameReader-bottleneck-in-DataSource-checkAndGlobPathIfNecessary-when-reading-S3-files-td27828.html

When using DataFrameReader#csv to read many files on S3, globbing and fs.exists on DataSource#checkAndGlobPathIfNecessary becomes a bottleneck.

From the mailing list discussions, an improvement that can be made is to parallelize the blocking FS calls:

> - have SparkHadoopUtils differentiate between files returned by globStatus(), and which therefore exist, and those which it didn't glob for -it will only need to check those.
> - add parallel execution to the glob and existence checks

### Why are the changes needed?

Verifying/globbing files happens on the driver, and if this operations take a long time (for example against S3), then the entire cluster has to wait, potentially sitting idle. This change hopes to make this process faster.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

I added a test suite `DataSourceSuite` - open to suggestions for better naming.

See [here](https://github.com/apache/spark/pull/25899#issuecomment-534380034) and [here](https://github.com/apache/spark/pull/25899#issuecomment-534069194) for some measurements

Closes #25899 from cozos/master.

Lead-authored-by: Arwin Tio <Arwin.tio@adroll.com>
Co-authored-by: Arwin Tio <arwin.tio@hotmail.com>
Co-authored-by: Arwin Tio <arwin.tio@adroll.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Arwin Tio 2020-02-17 09:30:35 -06:00 committed by Sean Owen
parent 06217cfded
commit 25e9156bc0
2 changed files with 238 additions and 21 deletions

View file

@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
@ -50,7 +51,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}
/**
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
@ -739,30 +740,53 @@ object DataSource extends Logging {
* Checks and returns files in all the paths.
*/
private[sql] def checkAndGlobPathIfNecessary(
paths: Seq[String],
pathStrings: Seq[String],
hadoopConf: Configuration,
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allGlobPath = paths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
checkFilesExist: Boolean,
numThreads: Integer = 40): Seq[Path] = {
val qualifiedPaths = pathStrings.map { pathString =>
val path = new Path(pathString)
val fs = path.getFileSystem(hadoopConf)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
// Split the paths into glob and non glob paths, because we don't need to do an existence check
// for globbed paths.
val (globPaths, nonGlobPaths) = qualifiedPaths.partition(SparkHadoopUtil.get.isGlobPath)
val globbedPaths =
try {
ThreadUtils.parmap(globPaths, "globPath", numThreads) { globPath =>
val fs = globPath.getFileSystem(hadoopConf)
val globResult = SparkHadoopUtil.get.globPath(fs, globPath)
if (checkEmptyGlobPath && globResult.isEmpty) {
throw new AnalysisException(s"Path does not exist: $globPath")
}
globResult
}.flatten
} catch {
case e: SparkException => throw e.getCause
}
if (checkFilesExist) {
val (filteredOut, filteredIn) = allGlobPath.partition { path =>
try {
ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path =>
val fs = path.getFileSystem(hadoopConf)
if (!fs.exists(path)) {
throw new AnalysisException(s"Path does not exist: $path")
}
}
} catch {
case e: SparkException => throw e.getCause
}
}
val allPaths = globbedPaths ++ nonGlobPaths
if (checkFilesExist) {
val (filteredOut, filteredIn) = allPaths.partition { path =>
InMemoryFileIndex.shouldFilterOut(path.getName)
}
if (filteredIn.isEmpty) {
@ -774,7 +798,7 @@ object DataSource extends Logging {
}
}
allGlobPath
allPaths.toSeq
}
/**

View file

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.test.SharedSparkSession
class DataSourceSuite extends SharedSparkSession {
import TestPaths._
test("test glob and non glob paths") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString,
globPath1.toString,
globPath2.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)
assert(resultPaths.toSet === allPathsInFs.toSet)
}
test("test glob paths") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
globPath1.toString,
globPath2.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)
assert(
resultPaths.toSet === Set(
globPath1Result1,
globPath1Result2,
globPath2Result1,
globPath2Result2
)
)
}
test("test non glob paths") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)
assert(
resultPaths.toSet === Set(
path1,
path2
)
)
}
test("test non glob paths checkFilesExist=false") {
val resultPaths = DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString,
nonExistentPath.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = false
)
assert(
resultPaths.toSet === Set(
path1,
path2,
nonExistentPath
)
)
}
test("test non existent paths") {
assertThrows[AnalysisException](
DataSource.checkAndGlobPathIfNecessary(
Seq(
path1.toString,
path2.toString,
nonExistentPath.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)
)
}
test("test non existent glob paths") {
assertThrows[AnalysisException](
DataSource.checkAndGlobPathIfNecessary(
Seq(
globPath1.toString,
globPath2.toString,
nonExistentGlobPath.toString
),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true
)
)
}
}
object TestPaths {
val hadoopConf = new Configuration()
hadoopConf.set("fs.mockFs.impl", classOf[MockFileSystem].getName)
val path1 = new Path("mockFs://mockFs/somepath1")
val path2 = new Path("mockFs://mockFs/somepath2")
val globPath1 = new Path("mockFs://mockFs/globpath1*")
val globPath2 = new Path("mockFs://mockFs/globpath2*")
val nonExistentPath = new Path("mockFs://mockFs/nonexistentpath")
val nonExistentGlobPath = new Path("mockFs://mockFs/nonexistentpath*")
val globPath1Result1 = new Path("mockFs://mockFs/globpath1/path1")
val globPath1Result2 = new Path("mockFs://mockFs/globpath1/path2")
val globPath2Result1 = new Path("mockFs://mockFs/globpath2/path1")
val globPath2Result2 = new Path("mockFs://mockFs/globpath2/path2")
val allPathsInFs = Seq(
path1,
path2,
globPath1Result1,
globPath1Result2,
globPath2Result1,
globPath2Result2
)
val mockGlobResults: Map[Path, Array[FileStatus]] = Map(
globPath1 ->
Array(
createMockFileStatus(globPath1Result1.toString),
createMockFileStatus(globPath1Result2.toString)
),
globPath2 ->
Array(
createMockFileStatus(globPath2Result1.toString),
createMockFileStatus(globPath2Result2.toString)
)
)
def createMockFileStatus(path: String): FileStatus = {
val fileStatus = new FileStatus()
fileStatus.setPath(new Path(path))
fileStatus
}
}
class MockFileSystem extends RawLocalFileSystem {
import TestPaths._
override def exists(f: Path): Boolean = {
allPathsInFs.contains(f)
}
override def globStatus(pathPattern: Path): Array[FileStatus] = {
mockGlobResults.getOrElse(pathPattern, Array())
}
}