[SPARK-22366] Support ignoring missing files
## What changes were proposed in this pull request? Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag "spark.sql.files.ignoreCorruptFiles". ## How was this patch tested? new unit test Author: Jose Torres <jose@databricks.com> Closes #19581 from joseph-torres/SPARK-22366.
This commit is contained in:
parent
5415963d2c
commit
8e9863531b
|
@ -614,6 +614,12 @@ object SQLConf {
|
|||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles")
|
||||
.doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " +
|
||||
"encountering missing files and the contents that have been read will still be returned.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
|
||||
.doc("Maximum number of records to write out to a single file. " +
|
||||
"If this value is zero or negative, there is no limit.")
|
||||
|
@ -1014,6 +1020,8 @@ class SQLConf extends Serializable with Logging {
|
|||
|
||||
def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
|
||||
|
||||
def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
|
||||
|
||||
def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
|
||||
|
||||
def useCompression: Boolean = getConf(COMPRESS_CACHED)
|
||||
|
|
|
@ -66,6 +66,7 @@ class FileScanRDD(
|
|||
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
|
||||
|
||||
private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
|
||||
private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles
|
||||
|
||||
override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
|
||||
val iterator = new Iterator[Object] with AutoCloseable {
|
||||
|
@ -142,7 +143,7 @@ class FileScanRDD(
|
|||
// Sets InputFileBlockHolder for the file block's information
|
||||
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)
|
||||
|
||||
if (ignoreCorruptFiles) {
|
||||
if (ignoreMissingFiles || ignoreCorruptFiles) {
|
||||
currentIterator = new NextIterator[Object] {
|
||||
// The readFunction may read some bytes before consuming the iterator, e.g.,
|
||||
// vectorized Parquet reader. Here we use lazy val to delay the creation of
|
||||
|
@ -158,9 +159,13 @@ class FileScanRDD(
|
|||
null
|
||||
}
|
||||
} catch {
|
||||
// Throw FileNotFoundException even `ignoreCorruptFiles` is true
|
||||
case e: FileNotFoundException => throw e
|
||||
case e @ (_: RuntimeException | _: IOException) =>
|
||||
case e: FileNotFoundException if ignoreMissingFiles =>
|
||||
logWarning(s"Skipped missing file: $currentFile", e)
|
||||
finished = true
|
||||
null
|
||||
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
|
||||
case e: FileNotFoundException if !ignoreMissingFiles => throw e
|
||||
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
|
||||
logWarning(
|
||||
s"Skipped the rest of the content in the corrupted file: $currentFile", e)
|
||||
finished = true
|
||||
|
|
|
@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
|
|||
}
|
||||
}
|
||||
|
||||
testQuietly("Enabling/disabling ignoreMissingFiles") {
|
||||
def testIgnoreMissingFiles(): Unit = {
|
||||
withTempDir { dir =>
|
||||
val basePath = dir.getCanonicalPath
|
||||
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
|
||||
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
|
||||
val thirdPath = new Path(basePath, "third")
|
||||
spark.range(2, 3).toDF("a").write.parquet(thirdPath.toString)
|
||||
val df = spark.read.parquet(
|
||||
new Path(basePath, "first").toString,
|
||||
new Path(basePath, "second").toString,
|
||||
new Path(basePath, "third").toString)
|
||||
|
||||
val fs = thirdPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
|
||||
fs.delete(thirdPath, true)
|
||||
checkAnswer(
|
||||
df,
|
||||
Seq(Row(0), Row(1)))
|
||||
}
|
||||
}
|
||||
|
||||
withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
|
||||
testIgnoreMissingFiles()
|
||||
}
|
||||
|
||||
withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
|
||||
val exception = intercept[SparkException] {
|
||||
testIgnoreMissingFiles()
|
||||
}
|
||||
assert(exception.getMessage().contains("does not exist"))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
|
||||
* to increase the chance of failure
|
||||
|
|
Loading…
Reference in a new issue