[SPARK-16335][SQL] Structured streaming should fail if source directory does not exist

## What changes were proposed in this pull request?
In structured streaming, Spark does not report errors when the specified directory does not exist. This is a behavior different from the batch mode. This patch changes the behavior to fail if the directory does not exist (when the path is not a glob pattern).

## How was this patch tested?
Updated unit tests to reflect the new behavior.

Author: Reynold Xin <rxin@databricks.com>

Closes #14002 from rxin/SPARK-16335.
This commit is contained in:
Reynold Xin 2016-07-01 15:16:04 -07:00
parent e4fa58c43c
commit d601894c04
4 changed files with 36 additions and 35 deletions

View file

@ -232,6 +232,10 @@ class SparkHadoopUtil extends Logging {
recurse(baseStatus)
}
def isGlobPath(pattern: Path): Boolean = {
pattern.toString.exists("{}[]*?\\".toSet.contains)
}
def globPath(pattern: Path): Seq[Path] = {
val fs = pattern.getFileSystem(conf)
Option(fs.globStatus(pattern)).map { statuses =>
@ -240,11 +244,7 @@ class SparkHadoopUtil extends Logging {
}
def globPathIfNecessary(pattern: Path): Seq[Path] = {
if (pattern.toString.exists("{}[]*?\\".toSet.contains)) {
globPath(pattern)
} else {
Seq(pattern)
}
if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern)
}
/**

View file

@ -315,7 +315,7 @@ class DataStreamReader(OptionUtils):
>>> json_sdf = spark.readStream.format("json")\
.schema(sdf_schema)\
.load(os.path.join(tempfile.mkdtemp(),'data'))
.load(tempfile.mkdtemp())
>>> json_sdf.isStreaming
True
>>> json_sdf.schema == sdf_schema
@ -382,8 +382,7 @@ class DataStreamReader(OptionUtils):
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
>>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \
schema = sdf_schema)
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
True
>>> json_sdf.schema == sdf_schema
@ -411,8 +410,7 @@ class DataStreamReader(OptionUtils):
.. note:: Experimental.
>>> parquet_sdf = spark.readStream.schema(sdf_schema)\
.parquet(os.path.join(tempfile.mkdtemp()))
>>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
>>> parquet_sdf.isStreaming
True
>>> parquet_sdf.schema == sdf_schema
@ -512,8 +510,7 @@ class DataStreamReader(OptionUtils):
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
>>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \
schema = sdf_schema)
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
True
>>> csv_sdf.schema == sdf_schema

View file

@ -203,6 +203,18 @@ case class DataSource(
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
// Check whether the path exists if it is not a glob pattern.
// For glob pattern, we do not check it because the glob pattern might only make sense
// once the streaming job starts and some upstream source starts dropping data.
val hdfsPath = new Path(path)
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
}
}
val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified

View file

@ -179,18 +179,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { testError() }
}
test("FileStreamSource schema: path doesn't exist, no schema") {
val e = intercept[IllegalArgumentException] {
createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None)
test("FileStreamSource schema: path doesn't exist (without schema) should throw exception") {
withTempDir { dir =>
intercept[AnalysisException] {
val userSchema = new StructType().add(new StructField("value", IntegerType))
val schema = createFileStreamSourceAndGetSchema(
format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = None)
}
}
assert(e.getMessage.toLowerCase.contains("schema")) // reason is schema absence, not the path
}
test("FileStreamSource schema: path doesn't exist, with schema") {
val userSchema = new StructType().add(new StructField("value", IntegerType))
val schema = createFileStreamSourceAndGetSchema(
format = None, path = Some("/a/b/c"), schema = Some(userSchema))
assert(schema === userSchema)
test("FileStreamSource schema: path doesn't exist (with schema) should throw exception") {
withTempDir { dir =>
intercept[AnalysisException] {
val userSchema = new StructType().add(new StructField("value", IntegerType))
val schema = createFileStreamSourceAndGetSchema(
format = None, path = Some(new File(dir, "1").getAbsolutePath), schema = Some(userSchema))
}
}
}
@ -225,20 +231,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// =============== Parquet file stream schema tests ================
ignore("FileStreamSource schema: parquet, no existing files, no schema") {
withTempDir { src =>
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
val e = intercept[AnalysisException] {
createFileStreamSourceAndGetSchema(
format = Some("parquet"),
path = Some(new File(src, "1").getCanonicalPath),
schema = None)
}
assert("Unable to infer schema. It must be specified manually.;" === e.getMessage)
}
}
}
test("FileStreamSource schema: parquet, existing files, no schema") {
withTempDir { src =>
Seq("a", "b", "c").toDS().as("userColumn").toDF().write