[SPARK-32516][SQL] 'path' option cannot coexist with load()'s path parameters

### What changes were proposed in this pull request?

This PR proposes to make the behavior consistent for the `path` option when loading dataframes with a single path (e.g, `option("path", path).format("parquet").load(path)` vs. `option("path", path).parquet(path)`) by disallowing `path` option to coexist with `load`'s path parameters.

### Why are the changes needed?

The current behavior is inconsistent:
```scala
scala> Seq(1).toDF.write.mode("overwrite").parquet("/tmp/test")

scala> spark.read.option("path", "/tmp/test").format("parquet").load("/tmp/test").show
+-----+
|value|
+-----+
|    1|
+-----+

scala> spark.read.option("path", "/tmp/test").parquet("/tmp/test").show
+-----+
|value|
+-----+
|    1|
|    1|
+-----+
```

### Does this PR introduce _any_ user-facing change?

Yes, now if the `path` option is specified along with `load`'s path parameters, it would fail:
```scala
scala> Seq(1).toDF.write.mode("overwrite").parquet("/tmp/test")

scala> spark.read.option("path", "/tmp/test").format("parquet").load("/tmp/test").show
org.apache.spark.sql.AnalysisException: There is a path option set and load() is called with path parameters. Either remove the path option or move it into the load() parameters.;
  at org.apache.spark.sql.DataFrameReader.verifyPathOptionDoesNotExist(DataFrameReader.scala:310)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
  ... 47 elided

scala> spark.read.option("path", "/tmp/test").parquet("/tmp/test").show
org.apache.spark.sql.AnalysisException: There is a path option set and load() is called with path parameters. Either remove the path option or move it into the load() parameters.;
  at org.apache.spark.sql.DataFrameReader.verifyPathOptionDoesNotExist(DataFrameReader.scala:310)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:250)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:778)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:756)
  ... 47 elided
```

The user can restore the previous behavior by setting `spark.sql.legacy.pathOptionBehavior.enabled` to `true`.

### How was this patch tested?

Added a test

Closes #29328 from imback82/dfw_option.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Terry Kim 2020-08-24 16:30:30 +00:00 committed by Wenchen Fan
parent bc23bb7882
commit e3a88a9767
5 changed files with 93 additions and 28 deletions

View file

@ -38,6 +38,8 @@ license: |
- In Spark 3.1, when `spark.sql.ansi.enabled` is false, Spark always returns null if the sum of decimal type column overflows. In Spark 3.0 or earlier, in the case, the sum of decimal type column may return null or incorrect result, or even fails at runtime (depending on the actual query plan execution).
- In Spark 3.1, when loading a dataframe, `path` or `paths` option cannot coexist with `load()`'s path parameters. For example, `spark.read.format("csv").option("path", "/tmp").load("/tmp2")` or `spark.read.option("path", "/tmp").csv("/tmp2")` will throw `org.apache.spark.sql.AnalysisException`. In Spark version 3.0 and below, `path` option is overwritten if one path parameter is passed to `load()`, or `path` option is added to the overall paths if multiple path parameters are passed to `load()`. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.pathOptionBehavior.enabled` to `true`.
## Upgrading from Spark SQL 3.0 to 3.0.1
- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.

View file

@ -2712,6 +2712,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
val LEGACY_PATH_OPTION_BEHAVIOR =
buildConf("spark.sql.legacy.pathOptionBehavior.enabled")
.internal()
.doc("When true, \"path\" option is overwritten if one path parameter is passed to " +
"DataFramerReader.load(), or \"path\" option is added to the overall paths if multiple " +
"path parameters are passed to DataFramerReader.load()")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
/**
* Holds information about keys that have been deprecated.
*
@ -3322,6 +3332,8 @@ class SQLConf extends Serializable with Logging {
def optimizeNullAwareAntiJoin: Boolean =
getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN)
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */

View file

@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
@ -229,7 +230,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
def load(path: String): DataFrame = {
// force invocation of `load(...varargs...)`
option("path", path).load(Seq.empty: _*)
if (sparkSession.sessionState.conf.legacyPathOptionBehavior) {
option("path", path).load(Seq.empty: _*)
} else {
load(Seq(path): _*)
}
}
/**
@ -245,15 +250,30 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.")
}
val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior
if (!legacyPathOptionBehavior &&
(extraOptions.contains("path") || extraOptions.contains("paths")) && paths.nonEmpty) {
throw new AnalysisException("There is a 'path' or 'paths' option set and load() is called " +
"with path parameters. Either remove the path option if it's the same as the path " +
"parameter, or add it to the load() parameter if you do want to read multiple paths.")
}
val updatedPaths = if (!legacyPathOptionBehavior && paths.length == 1) {
option("path", paths.head)
Seq.empty
} else {
paths
}
DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
val catalogManager = sparkSession.sessionState.catalogManager
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
val pathsOption = if (paths.isEmpty) {
val pathsOption = if (updatedPaths.isEmpty) {
None
} else {
val objectMapper = new ObjectMapper()
Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
Some("paths" -> objectMapper.writeValueAsString(updatedPaths.toArray))
}
val finalOptions = sessionOptions ++ extraOptions.originalMap ++ pathsOption
@ -281,9 +301,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
sparkSession,
DataSourceV2Relation.create(table, catalog, ident, dsOptions))
case _ => loadV1Source(paths: _*)
case _ => loadV1Source(updatedPaths: _*)
}
}.getOrElse(loadV1Source(paths: _*))
}.getOrElse(loadV1Source(updatedPaths: _*))
}
private def loadV1Source(paths: String*) = {

View file

@ -826,27 +826,6 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
test("File table location should include both values of option `path` and `paths`") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPaths(3) { paths =>
paths.zipWithIndex.foreach { case (path, index) =>
Seq(index).toDF("a").write.mode("overwrite").parquet(path.getCanonicalPath)
}
val df = spark
.read
.option("path", paths.head.getCanonicalPath)
.parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath)
df.queryExecution.optimizedPlan match {
case PhysicalOperation(_, _, DataSourceV2ScanRelation(table: ParquetTable, _, _)) =>
assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet)
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
}
checkAnswer(df, Seq(0, 1, 2).map(Row(_)))
}
}
}
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
Seq("parquet", "").foreach { format =>
withSQLConf(

View file

@ -224,15 +224,29 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
assert(LastOptions.parameters("opt3") == "3")
}
test("SPARK-32364: path argument of load function should override all existing options") {
test("SPARK-32364: later option should override earlier options") {
spark.read
.format("org.apache.spark.sql.test")
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.load("5")
.option("path", "5")
.load()
assert(LastOptions.parameters("path") == "5")
withClue("SPARK-32516: legacy path option behavior") {
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") {
spark.read
.format("org.apache.spark.sql.test")
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.load("5")
assert(LastOptions.parameters("path") == "5")
}
}
}
test("SPARK-32364: path argument of save function should override all existing options") {
@ -1105,4 +1119,42 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}
}
test("SPARK-32516: 'path' or 'paths' option cannot coexist with load()'s path parameters") {
def verifyLoadFails(f: () => DataFrame): Unit = {
val e = intercept[AnalysisException](f())
assert(e.getMessage.contains(
"Either remove the path option if it's the same as the path parameter"))
}
val path = "/tmp"
verifyLoadFails(() => spark.read.option("path", path).parquet(path))
verifyLoadFails(() => spark.read.option("path", path).format("parquet").load(path))
verifyLoadFails(() => spark.read.option("paths", path).parquet(path))
verifyLoadFails(() => spark.read.option("paths", path).format("parquet").load(path))
}
test("SPARK-32516: legacy path option behavior in load()") {
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
Seq(1).toDF.write.mode("overwrite").parquet(path)
// When there is one path parameter to load(), "path" option is overwritten.
checkAnswer(spark.read.format("parquet").option("path", path).load(path), Row(1))
// When there are multiple path parameters to load(), "path" option is added.
checkAnswer(
spark.read.format("parquet").option("path", path).load(path, path),
Seq(Row(1), Row(1), Row(1)))
// When built-in datasource functions are invoked (e.g, `csv`, `parquet`, etc.),
// the path option is always added regardless of the number of path parameters.
checkAnswer(spark.read.option("path", path).parquet(path), Seq(Row(1), Row(1)))
checkAnswer(
spark.read.option("path", path).parquet(path, path),
Seq(Row(1), Row(1), Row(1)))
}
}
}
}