From f48c5a57d6488d5598534ca5834e008504f464fe Mon Sep 17 00:00:00 2001 From: sureshthalamati Date: Tue, 14 Feb 2017 15:34:12 -0800 Subject: [PATCH] [SPARK-19318][SQL] Fix to treat JDBC connection properties specified by the user in case-sensitive manner. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? The reason for test failure is that the property “oracle.jdbc.mapDateToTimestamp” set by the test was getting converted into all lower case. Oracle database expects this property in case-sensitive manner. This test was passing in previous releases because connection properties were sent as user specified for the test case scenario. Fixes to handle all option uniformly in case-insensitive manner, converted the JDBC connection properties also to lower case. This PR enhances CaseInsensitiveMap to keep track of input case-sensitive keys , and uses those when creating connection properties that are passed to the JDBC connection. Alternative approach PR https://github.com/apache/spark/pull/16847 is to pass original input keys to JDBC data source by adding check in the Data source class and handle case-insensitivity in the JDBC source code. ## How was this patch tested? Added new test cases to JdbcSuite , and OracleIntegrationSuite. Ran docker integration tests passed on my laptop, all tests passed successfully. Author: sureshthalamati Closes #16891 from sureshthalamati/jdbc_case_senstivity_props_fix-SPARK-19318. --- .../sql/jdbc/OracleIntegrationSuite.scala | 36 +++++++++++++ .../ml/source/libsvm/LibSVMOptions.scala | 4 +- .../spark/sql/catalyst/json/JSONOptions.scala | 4 +- .../catalyst/util/CaseInsensitiveMap.scala | 32 ++++++++---- .../execution/datasources/DataSource.scala | 4 +- .../datasources/csv/CSVOptions.scala | 6 +-- .../datasources/jdbc/JDBCOptions.scala | 10 ++-- .../datasources/parquet/ParquetOptions.scala | 4 +- .../datasources/text/TextOptions.scala | 4 +- .../streaming/FileStreamOptions.scala | 4 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 50 ++++++++++++++++++- .../spark/sql/jdbc/JDBCWriteSuite.scala | 13 +++++ .../spark/sql/hive/HiveExternalCatalog.scala | 4 +- .../sql/hive/execution/HiveOptions.scala | 5 +- .../spark/sql/hive/orc/OrcOptions.scala | 4 +- 15 files changed, 148 insertions(+), 36 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 8c880f3ee5..1bb89a361c 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -62,6 +62,31 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo } override def dataPreparation(conn: Connection): Unit = { + conn.prepareStatement("CREATE TABLE datetime (id NUMBER(10), d DATE, t TIMESTAMP)") + .executeUpdate() + conn.prepareStatement( + """INSERT INTO datetime VALUES + |(1, {d '1991-11-09'}, {ts '1996-01-01 01:23:45'}) + """.stripMargin.replaceAll("\n", " ")).executeUpdate() + conn.commit() + + sql( + s""" + |CREATE TEMPORARY VIEW datetime + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', dbTable 'datetime', oracle.jdbc.mapDateToTimestamp 'false') + """.stripMargin.replaceAll("\n", " ")) + + conn.prepareStatement("CREATE TABLE datetime1 (id NUMBER(10), d DATE, t TIMESTAMP)") + .executeUpdate() + conn.commit() + + sql( + s""" + |CREATE TEMPORARY VIEW datetime1 + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', dbTable 'datetime1', oracle.jdbc.mapDateToTimestamp 'false') + """.stripMargin.replaceAll("\n", " ")) } test("SPARK-12941: String datatypes to be mapped to Varchar in Oracle") { @@ -149,4 +174,15 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(values.getDate(9).equals(dateVal)) assert(values.getTimestamp(10).equals(timestampVal)) } + + test("SPARK-19318: connection property keys should be case-sensitive") { + def checkRow(row: Row): Unit = { + assert(row.getInt(0) == 1) + assert(row.getDate(1).equals(Date.valueOf("1991-11-09"))) + assert(row.getTimestamp(2).equals(Timestamp.valueOf("1996-01-01 01:23:45"))) + } + checkRow(sql("SELECT * FROM datetime where id = 1").head()) + sql("INSERT INTO TABLE datetime1 SELECT * FROM datetime where id = 1") + checkRow(sql("SELECT * FROM datetime1 where id = 1").head()) + } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMOptions.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMOptions.scala index e3c5b4d7ab..6900b4153a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMOptions.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMOptions.scala @@ -22,12 +22,12 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** * Options for the LibSVM data source. */ -private[libsvm] class LibSVMOptions(@transient private val parameters: CaseInsensitiveMap) +private[libsvm] class LibSVMOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { import LibSVMOptions._ - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) /** * Number of features. If unspecified or nonpositive, the number of features will be determined diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 38e191bbba..02bd8dede4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -31,10 +31,10 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: CaseInsensitiveMap) + @transient private val parameters: CaseInsensitiveMap[String]) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index 29e49a5837..66dd093bbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -18,21 +18,35 @@ package org.apache.spark.sql.catalyst.util /** - * Builds a map in which keys are case insensitive + * Builds a map in which keys are case insensitive. Input map can be accessed for cases where + * case-sensitive information is required. The primary constructor is marked private to avoid + * nested case-insensitive map creation, otherwise the keys in the original map will become + * case-insensitive in this scenario. */ -class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] +class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Map[String, T] with Serializable { - val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase)) + val keyLowerCasedMap = originalMap.map(kv => kv.copy(_1 = kv._1.toLowerCase)) - override def get(k: String): Option[String] = baseMap.get(k.toLowerCase) + override def get(k: String): Option[T] = keyLowerCasedMap.get(k.toLowerCase) - override def contains(k: String): Boolean = baseMap.contains(k.toLowerCase) + override def contains(k: String): Boolean = keyLowerCasedMap.contains(k.toLowerCase) - override def + [B1 >: String](kv: (String, B1)): Map[String, B1] = - baseMap + kv.copy(_1 = kv._1.toLowerCase) + override def +[B1 >: T](kv: (String, B1)): Map[String, B1] = { + new CaseInsensitiveMap(originalMap + kv) + } - override def iterator: Iterator[(String, String)] = baseMap.iterator + override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator - override def -(key: String): Map[String, String] = baseMap - key.toLowerCase + override def -(key: String): Map[String, T] = { + new CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key))) + } } + +object CaseInsensitiveMap { + def apply[T](params: Map[String, T]): CaseInsensitiveMap[T] = params match { + case caseSensitiveMap: CaseInsensitiveMap[T] => caseSensitiveMap + case _ => new CaseInsensitiveMap(params) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ecfcafe69c..d510581f90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -85,7 +85,7 @@ case class DataSource( lazy val providingClass: Class[_] = DataSource.lookupDataSource(className) lazy val sourceInfo: SourceInfo = sourceSchema() - private val caseInsensitiveOptions = new CaseInsensitiveMap(options) + private val caseInsensitiveOptions = CaseInsensitiveMap(options) /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer @@ -610,7 +610,7 @@ object DataSource { * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this. */ def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { - val path = new CaseInsensitiveMap(options).get("path") + val path = CaseInsensitiveMap(options).get("path") val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path") CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index af456c8d71..9d79ea6ed1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -26,10 +26,10 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} -private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap) +private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) private def getChar(paramName: String, default: Char): Char = { val paramValue = parameters.get(paramName) @@ -164,7 +164,7 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive object CSVOptions { - def apply(): CSVOptions = new CSVOptions(new CaseInsensitiveMap(Map.empty)) + def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty)) def apply(paramName: String, paramValue: String): CSVOptions = { new CSVOptions(Map(paramName -> paramValue)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 6fd2e0d241..d4d3464654 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -26,15 +26,15 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap * Options for the JDBC data source. */ class JDBCOptions( - @transient private val parameters: CaseInsensitiveMap) + @transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { import JDBCOptions._ - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) def this(url: String, table: String, parameters: Map[String, String]) = { - this(new CaseInsensitiveMap(parameters ++ Map( + this(CaseInsensitiveMap(parameters ++ Map( JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table))) } @@ -44,7 +44,7 @@ class JDBCOptions( */ val asProperties: Properties = { val properties = new Properties() - parameters.foreach { case (k, v) => properties.setProperty(k, v) } + parameters.originalMap.foreach { case (k, v) => properties.setProperty(k, v) } properties } @@ -55,7 +55,7 @@ class JDBCOptions( */ val asConnectionProperties: Properties = { val properties = new Properties() - parameters.filterKeys(key => !jdbcOptionNames(key.toLowerCase)) + parameters.originalMap.filterKeys(key => !jdbcOptionNames(key.toLowerCase)) .foreach { case (k, v) => properties.setProperty(k, v) } properties } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index a81a95d510..bdda299a62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -26,14 +26,14 @@ import org.apache.spark.sql.internal.SQLConf * Options for the Parquet data source. */ private[parquet] class ParquetOptions( - @transient private val parameters: CaseInsensitiveMap, + @transient private val parameters: CaseInsensitiveMap[String], @transient private val sqlConf: SQLConf) extends Serializable { import ParquetOptions._ def this(parameters: Map[String, String], sqlConf: SQLConf) = - this(new CaseInsensitiveMap(parameters), sqlConf) + this(CaseInsensitiveMap(parameters), sqlConf) /** * Compression codec to use. By default use the value specified in SQLConf. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index 8cad984e33..49bd7382f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -22,12 +22,12 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs /** * Options for the Text data source. */ -private[text] class TextOptions(@transient private val parameters: CaseInsensitiveMap) +private[text] class TextOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { import TextOptions._ - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) /** * Compression codec to use. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 25ebe1797b..2f802d782f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -26,9 +26,9 @@ import org.apache.spark.util.Utils /** * User specified options for file streams. */ -class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging { +class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging { - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str => Try(str.toInt).toOption.filter(_ > 0).getOrElse { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 1cca15542d..92d3e9519f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -899,7 +899,7 @@ class JDBCSuite extends SparkFunSuite "dbtable" -> "t1", "numPartitions" -> "10") assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty) - assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty) + assert(new JDBCOptions(CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty) } test("SPARK-16848: jdbc API throws an exception for user specified schema") { @@ -925,4 +925,52 @@ class JDBCSuite extends SparkFunSuite assert(res.generatedRows.isEmpty) assert(res.outputRows === foobarCnt :: Nil) } + + test("SPARK-19318: Connection properties keys should be case-sensitive.") { + def testJdbcOptions(options: JDBCOptions): Unit = { + // Spark JDBC data source options are case-insensitive + assert(options.table == "t1") + // When we convert it to properties, it should be case-sensitive. + assert(options.asProperties.size == 3) + assert(options.asProperties.get("customkey") == null) + assert(options.asProperties.get("customKey") == "a-value") + assert(options.asConnectionProperties.size == 1) + assert(options.asConnectionProperties.get("customkey") == null) + assert(options.asConnectionProperties.get("customKey") == "a-value") + } + + val parameters = Map("url" -> url, "dbTAblE" -> "t1", "customKey" -> "a-value") + testJdbcOptions(new JDBCOptions(parameters)) + testJdbcOptions(new JDBCOptions(CaseInsensitiveMap(parameters))) + // test add/remove key-value from the case-insensitive map + var modifiedParameters = CaseInsensitiveMap(Map.empty) ++ parameters + testJdbcOptions(new JDBCOptions(modifiedParameters)) + modifiedParameters -= "dbtable" + assert(modifiedParameters.get("dbTAblE").isEmpty) + modifiedParameters -= "customkey" + assert(modifiedParameters.get("customKey").isEmpty) + modifiedParameters += ("customKey" -> "a-value") + modifiedParameters += ("dbTable" -> "t1") + testJdbcOptions(new JDBCOptions(modifiedParameters)) + assert ((modifiedParameters -- parameters.keys).size == 0) + } + + test("SPARK-19318: jdbc data source options should be treated case-insensitive.") { + val df = spark.read.format("jdbc") + .option("Url", urlWithUserAndPass) + .option("DbTaBle", "TEST.PEOPLE") + .load() + assert(df.count() == 3) + + withTempView("people_view") { + sql( + s""" + |CREATE TEMPORARY VIEW people_view + |USING org.apache.spark.sql.jdbc + |OPTIONS (uRl '$url', DbTaBlE 'TEST.PEOPLE', User 'testUser', PassWord 'testPass') + """.stripMargin.replaceAll("\n", " ")) + + assert(sql("select * from people_view").count() == 3) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 354af29d42..ec7b19e666 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -349,4 +349,17 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { assert(e.contains("Invalid value `0` for parameter `numPartitions` in table writing " + "via JDBC. The minimum value is 1.")) } + + test("SPARK-19318 temporary view data source option keys should be case-insensitive") { + withTempView("people_view") { + sql( + s""" + |CREATE TEMPORARY VIEW people_view + |USING org.apache.spark.sql.jdbc + |OPTIONS (uRl '$url1', DbTaBlE 'TEST.PEOPLE1', User 'testUser', PassWord 'testPass') + """.stripMargin.replaceAll("\n", " ")) + sql("INSERT OVERWRITE TABLE PEOPLE_VIEW SELECT * FROM PEOPLE") + assert(sql("select * from people_view").count() == 2) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 1fc8e8ea9f..ea48256147 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -465,7 +465,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // will be updated automatically in Hive metastore by the `alterTable` call at the end of this // method. Here we only update the path option if the path option already exists in storage // properties, to avoid adding a unnecessary path option for Hive serde tables. - val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path") + val hasPathOption = CaseInsensitiveMap(rawTable.storage.properties).contains("path") val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) { // If it's a managed table with path option and we are renaming it, then the path option // becomes inaccurate and we need to update it according to the new table name. @@ -483,7 +483,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { - new CaseInsensitiveMap(table.storage.properties).get("path") + CaseInsensitiveMap(table.storage.properties).get("path") } private def updateLocationInStorageProps( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala index 35b7a681f1..1928510280 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala @@ -23,10 +23,11 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap * Options for the Hive data source. Note that rule `DetermineHiveSerde` will extract Hive * serde/format information from these options. */ -class HiveOptions(@transient private val parameters: CaseInsensitiveMap) extends Serializable { +class HiveOptions(@transient private val parameters: CaseInsensitiveMap[String]) + extends Serializable { import HiveOptions._ - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) val fileFormat = parameters.get(FILE_FORMAT).map(_.toLowerCase) val inputFormat = parameters.get(INPUT_FORMAT) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index ac587ab99a..ccaa568dcc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -22,12 +22,12 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap /** * Options for the ORC data source. */ -private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap) +private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { import OrcOptions._ - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) /** * Compression codec to use. By default snappy compression.