[SPARK-19919][SQL] Defer throwing the exception for empty paths in CSV datasource into DataSource

## What changes were proposed in this pull request?

This PR proposes to defer throwing the exception within `DataSource`.

Currently, if other datasources fail to infer the schema, it returns `None` and then this is being validated in `DataSource` as below:

```
scala> spark.read.json("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.;
```

```
scala> spark.read.orc("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.;
```

```
scala> spark.read.parquet("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
```

However, CSV it checks it within the datasource implementation and throws another exception message as below:

```
scala> spark.read.csv("emptydir")
java.lang.IllegalArgumentException: requirement failed: Cannot infer schema from an empty set of files
```

We could remove this duplicated check and validate this in one place in the same way with the same message.

## How was this patch tested?

Unit test in `CSVSuite` and manual test.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17256 from HyukjinKwon/SPARK-19919.
This commit is contained in:
hyukjinkwon 2017-03-22 08:41:46 +08:00 committed by Wenchen Fan
parent a04dcde8cb
commit 9281a3d504
3 changed files with 23 additions and 12 deletions

View file

@ -54,10 +54,21 @@ abstract class CSVDataSource extends Serializable {
/**
* Infers the schema from `inputPaths` files.
*/
def infer(
final def inferSchema(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType]
parsedOptions: CSVOptions): Option[StructType] = {
if (inputPaths.nonEmpty) {
Some(infer(sparkSession, inputPaths, parsedOptions))
} else {
None
}
}
protected def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType
/**
* Generates a header from the given row which is null-safe and duplicate-safe.
@ -131,10 +142,10 @@ object TextInputCSVDataSource extends CSVDataSource {
override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
parsedOptions: CSVOptions): StructType = {
val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions)
val maybeFirstLine = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).take(1).headOption
Some(inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions))
inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions)
}
/**
@ -203,7 +214,7 @@ object WholeFileCSVDataSource extends CSVDataSource {
override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
UnivocityParser.tokenizeStream(
@ -222,10 +233,10 @@ object WholeFileCSVDataSource extends CSVDataSource {
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
}
Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
CSVInferSchema.infer(tokenRDD, header, parsedOptions)
case None =>
// If the first row could not be read, just return the empty schema.
Some(StructType(Nil))
StructType(Nil)
}
}

View file

@ -51,12 +51,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
require(files.nonEmpty, "Cannot infer schema from an empty set of files")
val parsedOptions =
new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
CSVDataSource(parsedOptions).infer(sparkSession, files, parsedOptions)
CSVDataSource(parsedOptions).inferSchema(sparkSession, files, parsedOptions)
}
override def prepareWrite(

View file

@ -370,9 +370,11 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
val schema = df.schema
// Reader, without user specified schema
intercept[IllegalArgumentException] {
val message = intercept[AnalysisException] {
testRead(spark.read.csv(), Seq.empty, schema)
}
}.getMessage
assert(message.contains("Unable to infer schema for CSV. It must be specified manually."))
testRead(spark.read.csv(dir), data, schema)
testRead(spark.read.csv(dir, dir), data ++ data, schema)
testRead(spark.read.csv(Seq(dir, dir): _*), data ++ data, schema)