[SPARK-16848][SQL] Check schema validation for user-specified schema in jdbc and table APIs

## What changes were proposed in this pull request?

This PR proposes to throw an exception for both jdbc APIs when user specified schemas are not allowed or useless.

**DataFrameReader.jdbc(...)**

``` scala
spark.read.schema(StructType(Nil)).jdbc(...)
```

**DataFrameReader.table(...)**

```scala
spark.read.schema(StructType(Nil)).table("usrdb.test")
```

## How was this patch tested?

Unit test in `JDBCSuite` and `DataFrameReaderWriterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14451 from HyukjinKwon/SPARK-16848.
This commit is contained in:
hyukjinkwon 2017-01-11 21:03:48 -08:00 committed by gatorsmile
parent 43fa21b3e6
commit 24100f162d
3 changed files with 39 additions and 4 deletions

View file

@ -165,6 +165,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0 * @since 1.4.0
*/ */
def jdbc(url: String, table: String, properties: Properties): DataFrame = { def jdbc(url: String, table: String, properties: Properties): DataFrame = {
assertNoSpecifiedSchema("jdbc")
// properties should override settings in extraOptions. // properties should override settings in extraOptions.
this.extraOptions ++= properties.asScala this.extraOptions ++= properties.asScala
// explicit url and dbtable should override all // explicit url and dbtable should override all
@ -235,6 +236,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
table: String, table: String,
predicates: Array[String], predicates: Array[String],
connectionProperties: Properties): DataFrame = { connectionProperties: Properties): DataFrame = {
assertNoSpecifiedSchema("jdbc")
// connectionProperties should override settings in extraOptions. // connectionProperties should override settings in extraOptions.
val params = extraOptions.toMap ++ connectionProperties.asScala.toMap val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
val options = new JDBCOptions(url, table, params) val options = new JDBCOptions(url, table, params)
@ -475,6 +477,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0 * @since 1.4.0
*/ */
def table(tableName: String): DataFrame = { def table(tableName: String): DataFrame = {
assertNoSpecifiedSchema("table")
sparkSession.table(tableName) sparkSession.table(tableName)
} }
@ -540,12 +543,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/ */
@scala.annotation.varargs @scala.annotation.varargs
def textFile(paths: String*): Dataset[String] = { def textFile(paths: String*): Dataset[String] = {
if (userSpecifiedSchema.nonEmpty) { assertNoSpecifiedSchema("textFile")
throw new AnalysisException("User specified schema not supported with `textFile`")
}
text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
} }
/**
* A convenient function for schema validation in APIs.
*/
private def assertNoSpecifiedSchema(operation: String): Unit = {
if (userSpecifiedSchema.nonEmpty) {
throw new AnalysisException(s"User specified schema not supported with `$operation`")
}
}
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options // Builder pattern config options
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////

View file

@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkFunSuite import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.command.ExplainCommand
@ -900,4 +900,19 @@ class JDBCSuite extends SparkFunSuite
assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty) assert(new JDBCOptions(parameters).asConnectionProperties.isEmpty)
assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty) assert(new JDBCOptions(new CaseInsensitiveMap(parameters)).asConnectionProperties.isEmpty)
} }
test("SPARK-16848: jdbc API throws an exception for user specified schema") {
val schema = StructType(Seq(
StructField("name", StringType, false), StructField("theid", IntegerType, false)))
val parts = Array[String]("THEID < 2", "THEID >= 2")
val e1 = intercept[AnalysisException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
}.getMessage
assert(e1.contains("User specified schema not supported with `jdbc`"))
val e2 = intercept[AnalysisException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
}.getMessage
assert(e2.contains("User specified schema not supported with `jdbc`"))
}
} }

View file

@ -635,4 +635,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil) checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
} }
} }
test("SPARK-16848: table API throws an exception for user specified schema") {
withTable("t") {
val schema = StructType(StructField("a", StringType) :: Nil)
val e = intercept[AnalysisException] {
spark.read.schema(schema).table("t")
}.getMessage
assert(e.contains("User specified schema not supported with `table`"))
}
}
} }