[SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options

### What changes were proposed in this pull request?

This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following.
```scala
scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load()
org.apache.spark.sql.AnalysisException: Path does not exist: 1;
```

### Why are the changes needed?

This will make the behavior deterministic.

### Does this PR introduce _any_ user-facing change?

Yes, but the previous behavior is indeterministic.

### How was this patch tested?

Pass the newly test cases.

Closes #29702 from dongjoon-hyun/SPARK-32832.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Dongjoon Hyun 2020-09-09 23:41:32 -07:00
parent db89b0e1b8
commit 2f85f9516c
3 changed files with 37 additions and 4 deletions

View file

@ -25,6 +25,7 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.command.DDLUtils
@ -212,7 +213,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val options = sessionOptions ++ extraOptions.toMap
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
@ -535,5 +536,5 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
private var userSpecifiedSchema: Option[StructType] = None
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
private var extraOptions = CaseInsensitiveMap[String](Map.empty)
}

View file

@ -26,6 +26,7 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.command.DDLUtils
@ -356,7 +357,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val options = sessionOptions ++ extraOptions.toMap
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = DataSourceV2Utils.getTableFromProvider(
provider, dsOptions, userSpecifiedSchema = None)
@ -479,7 +480,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private var trigger: Trigger = Trigger.ProcessingTime(0L)
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
private var extraOptions = CaseInsensitiveMap[String](Map.empty)
private var foreachWriter: ForeachWriter[T] = null

View file

@ -186,6 +186,37 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
query.stop()
}
test("SPARK-32832: later option should override earlier options for load()") {
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.option("path", "5")
.load()
assert(LastOptions.parameters("path") == "5")
}
test("SPARK-32832: later option should override earlier options for start()") {
val ds = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load()
assert(LastOptions.parameters.isEmpty)
val query = ds.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.option("path", "5")
.start()
assert(LastOptions.parameters("path") == "5")
query.stop()
}
test("partitioning") {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")