[SPARK-24423][SQL] Add a new option for JDBC sources

## What changes were proposed in this pull request?
Here is the description in the JIRA -

Currently, our JDBC connector provides the option `dbtable` for users to specify the to-be-loaded JDBC source table.

 ```SQL
 val jdbcDf = spark.read
   .format("jdbc")
   .option("dbtable", "dbName.tableName")
   .options(jdbcCredentials: Map)
   .load()
 ```

Normally, users do not fetch the whole JDBC table due to the poor performance/throughput of JDBC. Thus, they normally just fetch a small set of tables. For advanced users, they can pass a subquery as the option.

 ```SQL
 val query = """ (select * from tableName limit 10) as tmp """
 val jdbcDf = spark.read
   .format("jdbc")
   .option("dbtable", query)
   .options(jdbcCredentials: Map)
   .load()
 ```
However, this is straightforward to end users. We should simply allow users to specify the query by a new option `query`. We will handle the complexity for them.

 ```SQL
 val query = """select * from tableName limit 10"""
 val jdbcDf = spark.read
   .format("jdbc")
   .option("query", query)
   .options(jdbcCredentials: Map)
   .load()
```

## How was this patch tested?
Added tests in JDBCSuite and JDBCWriterSuite.
Also tested against MySQL, Postgress, Oracle, DB2 (using docker infrastructure) to make sure there are no syntax issues.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #21590 from dilipbiswal/SPARK-24423.
This commit is contained in:
Dilip Biswal 2018-06-26 15:17:00 -07:00 committed by Xiao Li
parent dcaa49ff1e
commit 02f8781fa2
8 changed files with 204 additions and 23 deletions

View file

@ -1302,9 +1302,33 @@ the following case-insensitive options:
<tr>
<td><code>dbtable</code></td>
<td>
The JDBC table that should be read. Note that anything that is valid in a <code>FROM</code> clause of
a SQL query can be used. For example, instead of a full table you could also use a
subquery in parentheses.
The JDBC table that should be read from or written into. Note that when using it in the read
path anything that is valid in a <code>FROM</code> clause of a SQL query can be used.
For example, instead of a full table you could also use a subquery in parentheses. It is not
allowed to specify `dbtable` and `query` options at the same time.
</td>
</tr>
<tr>
<td><code>query</code></td>
<td>
A query that will be used to read data into Spark. The specified query will be parenthesized and used
as a subquery in the <code>FROM</code> clause. Spark will also assign an alias to the subquery clause.
As an example, spark will issue a query of the following form to the JDBC Source.<br><br>
<code> SELECT &lt;columns&gt; FROM (&lt;user_specified_query&gt;) spark_gen_alias</code><br><br>
Below are couple of restrictions while using this option.<br>
<ol>
<li> It is not allowed to specify `dbtable` and `query` options at the same time. </li>
<li> It is not allowed to spcify `query` and `partitionColumn` options at the same time. When specifying
`partitionColumn` option is required, the subquery can be specified using `dbtable` option instead and
partition columns can be qualified using the subquery alias provided as part of `dbtable`. <br>
Example:<br>
<code>
spark.read.format("jdbc")<br>
&nbsp&nbsp .option("dbtable", "(select c1, c2 from t1) as subq")<br>
&nbsp&nbsp .option("partitionColumn", "subq.c1"<br>
&nbsp&nbsp .load()
</code></li>
</ol>
</td>
</tr>

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
* Options for the JDBC data source.
*/
class JDBCOptions(
@transient private val parameters: CaseInsensitiveMap[String])
@transient val parameters: CaseInsensitiveMap[String])
extends Serializable {
import JDBCOptions._
@ -65,11 +65,31 @@ class JDBCOptions(
// Required parameters
// ------------------------------------------------------------
require(parameters.isDefinedAt(JDBC_URL), s"Option '$JDBC_URL' is required.")
require(parameters.isDefinedAt(JDBC_TABLE_NAME), s"Option '$JDBC_TABLE_NAME' is required.")
// a JDBC URL
val url = parameters(JDBC_URL)
// name of table
val table = parameters(JDBC_TABLE_NAME)
// table name or a table subquery.
val tableOrQuery = (parameters.get(JDBC_TABLE_NAME), parameters.get(JDBC_QUERY_STRING)) match {
case (Some(name), Some(subquery)) =>
throw new IllegalArgumentException(
s"Both '$JDBC_TABLE_NAME' and '$JDBC_QUERY_STRING' can not be specified at the same time."
)
case (None, None) =>
throw new IllegalArgumentException(
s"Option '$JDBC_TABLE_NAME' or '$JDBC_QUERY_STRING' is required."
)
case (Some(name), None) =>
if (name.isEmpty) {
throw new IllegalArgumentException(s"Option '$JDBC_TABLE_NAME' can not be empty.")
} else {
name.trim
}
case (None, Some(subquery)) =>
if (subquery.isEmpty) {
throw new IllegalArgumentException(s"Option `$JDBC_QUERY_STRING` can not be empty.")
} else {
s"(${subquery}) __SPARK_GEN_JDBC_SUBQUERY_NAME_${curId.getAndIncrement()}"
}
}
// ------------------------------------------------------------
// Optional parameters
@ -109,6 +129,20 @@ class JDBCOptions(
s"When reading JDBC data sources, users need to specify all or none for the following " +
s"options: '$JDBC_PARTITION_COLUMN', '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', " +
s"and '$JDBC_NUM_PARTITIONS'")
require(!(parameters.get(JDBC_QUERY_STRING).isDefined && partitionColumn.isDefined),
s"""
|Options '$JDBC_QUERY_STRING' and '$JDBC_PARTITION_COLUMN' can not be specified together.
|Please define the query using `$JDBC_TABLE_NAME` option instead and make sure to qualify
|the partition columns using the supplied subquery alias to resolve any ambiguity.
|Example :
|spark.read.format("jdbc")
| .option("dbtable", "(select c1, c2 from t1) as subq")
| .option("partitionColumn", "subq.c1"
| .load()
""".stripMargin
)
val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
require(size >= 0,
@ -149,7 +183,30 @@ class JDBCOptions(
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
}
class JdbcOptionsInWrite(
@transient override val parameters: CaseInsensitiveMap[String])
extends JDBCOptions(parameters) {
import JDBCOptions._
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
def this(url: String, table: String, parameters: Map[String, String]) = {
this(CaseInsensitiveMap(parameters ++ Map(
JDBCOptions.JDBC_URL -> url,
JDBCOptions.JDBC_TABLE_NAME -> table)))
}
require(
parameters.get(JDBC_TABLE_NAME).isDefined,
s"Option '$JDBC_TABLE_NAME' is required. " +
s"Option '$JDBC_QUERY_STRING' is not applicable while writing.")
val table = parameters(JDBC_TABLE_NAME)
}
object JDBCOptions {
private val curId = new java.util.concurrent.atomic.AtomicLong(0L)
private val jdbcOptionNames = collection.mutable.Set[String]()
private def newOption(name: String): String = {
@ -159,6 +216,7 @@ object JDBCOptions {
val JDBC_URL = newOption("url")
val JDBC_TABLE_NAME = newOption("dbtable")
val JDBC_QUERY_STRING = newOption("query")
val JDBC_DRIVER_CLASS = newOption("driver")
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
val JDBC_LOWER_BOUND = newOption("lowerBound")

View file

@ -51,7 +51,7 @@ object JDBCRDD extends Logging {
*/
def resolveTable(options: JDBCOptions): StructType = {
val url = options.url
val table = options.table
val table = options.tableOrQuery
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
try {
@ -296,7 +296,7 @@ private[jdbc] class JDBCRDD(
val myWhereClause = getWhereClause(part)
val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(options.fetchSize)

View file

@ -189,12 +189,12 @@ private[sql] case class JDBCRelation(
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(jdbcOptions.url, jdbcOptions.table, jdbcOptions.asProperties)
.jdbc(jdbcOptions.url, jdbcOptions.tableOrQuery, jdbcOptions.asProperties)
}
override def toString: String = {
val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" else ""
// credentials should not be included in the plan output, table information is sufficient.
s"JDBCRelation(${jdbcOptions.table})" + partitioningInfo
s"JDBCRelation(${jdbcOptions.tableOrQuery})" + partitioningInfo
}
}

View file

@ -59,7 +59,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
val options = new JDBCOptions(parameters)
val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis
val conn = JdbcUtils.createConnectionFactory(options)()
@ -86,7 +86,8 @@ class JdbcRelationProvider extends CreatableRelationProvider
case SaveMode.ErrorIfExists =>
throw new AnalysisException(
s"Table or view '${options.table}' already exists. SaveMode: ErrorIfExists.")
s"Table or view '${options.table}' already exists. " +
s"SaveMode: ErrorIfExists.")
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected

View file

@ -67,7 +67,7 @@ object JdbcUtils extends Logging {
/**
* Returns true if the table already exists in the JDBC database.
*/
def tableExists(conn: Connection, options: JDBCOptions): Boolean = {
def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = {
val dialect = JdbcDialects.get(options.url)
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
@ -100,7 +100,7 @@ object JdbcUtils extends Logging {
/**
* Truncates a table from the JDBC database without side effects.
*/
def truncateTable(conn: Connection, options: JDBCOptions): Unit = {
def truncateTable(conn: Connection, options: JdbcOptionsInWrite): Unit = {
val dialect = JdbcDialects.get(options.url)
val statement = conn.createStatement
try {
@ -255,7 +255,7 @@ object JdbcUtils extends Logging {
val dialect = JdbcDialects.get(options.url)
try {
val statement = conn.prepareStatement(dialect.getSchemaQuery(options.table))
val statement = conn.prepareStatement(dialect.getSchemaQuery(options.tableOrQuery))
try {
statement.setQueryTimeout(options.queryTimeout)
Some(getSchema(statement.executeQuery(), dialect))
@ -809,7 +809,7 @@ object JdbcUtils extends Logging {
df: DataFrame,
tableSchema: Option[StructType],
isCaseSensitive: Boolean,
options: JDBCOptions): Unit = {
options: JdbcOptionsInWrite): Unit = {
val url = options.url
val table = options.table
val dialect = JdbcDialects.get(url)
@ -838,7 +838,7 @@ object JdbcUtils extends Logging {
def createTable(
conn: Connection,
df: DataFrame,
options: JDBCOptions): Unit = {
options: JdbcOptionsInWrite): Unit = {
val strSchema = schemaString(
df, options.url, options.createTableColumnTypes)
val table = options.table

View file

@ -25,7 +25,7 @@ import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
@ -39,7 +39,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class JDBCSuite extends SparkFunSuite
class JDBCSuite extends QueryTest
with BeforeAndAfter with PrivateMethodTester with SharedSQLContext {
import testImplicits._
@ -1099,7 +1099,7 @@ class JDBCSuite extends SparkFunSuite
test("SPARK-19318: Connection properties keys should be case-sensitive.") {
def testJdbcOptions(options: JDBCOptions): Unit = {
// Spark JDBC data source options are case-insensitive
assert(options.table == "t1")
assert(options.tableOrQuery == "t1")
// When we convert it to properties, it should be case-sensitive.
assert(options.asProperties.size == 3)
assert(options.asProperties.get("customkey") == null)
@ -1255,4 +1255,92 @@ class JDBCSuite extends SparkFunSuite
testIncorrectJdbcPartitionColumn(testH2Dialect.quoteIdentifier("ThEiD"))
}
}
test("query JDBC option - negative tests") {
val query = "SELECT * FROM test.people WHERE theid = 1"
// load path
val e1 = intercept[RuntimeException] {
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("query", query)
.option("dbtable", "test.people")
.load()
}.getMessage
assert(e1.contains("Both 'dbtable' and 'query' can not be specified at the same time."))
// jdbc api path
val properties = new Properties()
properties.setProperty(JDBCOptions.JDBC_QUERY_STRING, query)
val e2 = intercept[RuntimeException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", properties).collect()
}.getMessage
assert(e2.contains("Both 'dbtable' and 'query' can not be specified at the same time."))
val e3 = intercept[RuntimeException] {
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', dbtable 'TEST.PEOPLE',
| user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
}.getMessage
assert(e3.contains("Both 'dbtable' and 'query' can not be specified at the same time."))
val e4 = intercept[RuntimeException] {
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("query", "")
.load()
}.getMessage
assert(e4.contains("Option `query` can not be empty."))
// Option query and partitioncolumn are not allowed together.
val expectedErrorMsg =
s"""
|Options 'query' and 'partitionColumn' can not be specified together.
|Please define the query using `dbtable` option instead and make sure to qualify
|the partition columns using the supplied subquery alias to resolve any ambiguity.
|Example :
|spark.read.format("jdbc")
| .option("dbtable", "(select c1, c2 from t1) as subq")
| .option("partitionColumn", "subq.c1"
| .load()
""".stripMargin
val e5 = intercept[RuntimeException] {
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass',
| partitionColumn 'THEID', lowerBound '1', upperBound '4', numPartitions '3')
""".stripMargin.replaceAll("\n", " "))
}.getMessage
assert(e5.contains(expectedErrorMsg))
}
test("query JDBC option") {
val query = "SELECT name, theid FROM test.people WHERE theid = 1"
// query option to pass on the query string.
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("query", query)
.load()
checkAnswer(
df,
Row("fred", 1) :: Nil)
// query option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url', query '$query', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
checkAnswer(
sql("select name, theid from queryOption"),
Row("fred", 1) :: Nil)
}
}

View file

@ -293,13 +293,23 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
test("save errors if dbtable is not specified") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val e = intercept[RuntimeException] {
val e1 = intercept[RuntimeException] {
df.write.format("jdbc")
.option("url", url1)
.options(properties.asScala)
.save()
}.getMessage
assert(e.contains("Option 'dbtable' is required"))
assert(e1.contains("Option 'dbtable' or 'query' is required"))
val e2 = intercept[RuntimeException] {
df.write.format("jdbc")
.option("url", url1)
.options(properties.asScala)
.option("query", "select * from TEST.SAVETEST")
.save()
}.getMessage
val msg = "Option 'dbtable' is required. Option 'query' is not applicable while writing."
assert(e2.contains(msg))
}
test("save errors if wrong user/password combination") {