From 24100f162dadb80400cb3e0bc94e4282f10f0c84 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 11 Jan 2017 21:03:48 -0800 Subject: [PATCH] [SPARK-16848][SQL] Check schema validation for user-specified schema in jdbc and table APIs ## What changes were proposed in this pull request? This PR proposes to throw an exception for both jdbc APIs when user specified schemas are not allowed or useless. **DataFrameReader.jdbc(...)** ``` scala spark.read.schema(StructType(Nil)).jdbc(...) ``` **DataFrameReader.table(...)** ```scala spark.read.schema(StructType(Nil)).table("usrdb.test") ``` ## How was this patch tested? Unit test in `JDBCSuite` and `DataFrameReaderWriterSuite`. Author: hyukjinkwon Closes #14451 from HyukjinKwon/SPARK-16848. --- .../org/apache/spark/sql/DataFrameReader.scala | 16 +++++++++++++--- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 17 ++++++++++++++++- .../sql/test/DataFrameReaderWriterSuite.scala | 10 ++++++++++ 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index cd83836178..fe34d597db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -165,6 +165,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def jdbc(url: String, table: String, properties: Properties): DataFrame = { + assertNoSpecifiedSchema("jdbc") // properties should override settings in extraOptions. this.extraOptions ++= properties.asScala // explicit url and dbtable should override all @@ -235,6 +236,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { table: String, predicates: Array[String], connectionProperties: Properties): DataFrame = { + assertNoSpecifiedSchema("jdbc") // connectionProperties should override settings in extraOptions. val params = extraOptions.toMap ++ connectionProperties.asScala.toMap val options = new JDBCOptions(url, table, params) @@ -475,6 +477,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { + assertNoSpecifiedSchema("table") sparkSession.table(tableName) } @@ -540,12 +543,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def textFile(paths: String*): Dataset[String] = { - if (userSpecifiedSchema.nonEmpty) { - throw new AnalysisException("User specified schema not supported with `textFile`") - } + assertNoSpecifiedSchema("textFile") text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) } + /** + * A convenient function for schema validation in APIs. + */ + private def assertNoSpecifiedSchema(operation: String): Unit = { + if (userSpecifiedSchema.nonEmpty) { + throw new AnalysisException(s"User specified schema not supported with `$operation`") + } + } + /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 74ca66b103..039625421e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand @@ -900,4 +900,19 @@ class JDBCSuite extends SparkFunSuite assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty) assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty) } + + test("SPARK-16848: jdbc API throws an exception for user specified schema") { + val schema = StructType(Seq( + StructField("name", StringType, false), StructField("theid", IntegerType, false))) + val parts = Array[String]("THEID < 2", "THEID >= 2") + val e1 = intercept[AnalysisException] { + spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) + }.getMessage + assert(e1.contains("User specified schema not supported with `jdbc`")) + + val e2 = intercept[AnalysisException] { + spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties()) + }.getMessage + assert(e2.contains("User specified schema not supported with `jdbc`")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 4bec2e3fdb..8a8ba05534 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -635,4 +635,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) } } + + test("SPARK-16848: table API throws an exception for user specified schema") { + withTable("t") { + val schema = StructType(StructField("a", StringType) :: Nil) + val e = intercept[AnalysisException] { + spark.read.schema(schema).table("t") + }.getMessage + assert(e.contains("User specified schema not supported with `table`")) + } + } }