[SPARK-14525][SQL][FOLLOWUP] Clean up JdbcRelationProvider

## What changes were proposed in this pull request?

This PR proposes cleaning up the confusing part in `createRelation` as discussed in https://github.com/apache/spark/pull/12601/files#r80627940

Also, this PR proposes the changes below:

 - Add documentation for `batchsize` and `isolationLevel`.
 - Move property names into `JDBCOptions` so that they can be managed in a single place. which were, `fetchsize`, `batchsize`, `isolationLevel` and `driver`.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #15263 from HyukjinKwon/SPARK-14525.
This commit is contained in:
hyukjinkwon 2016-10-07 10:52:32 -07:00 committed by gatorsmile
parent cff5607552
commit aa3a6841eb
4 changed files with 74 additions and 52 deletions

View file

@ -22,6 +22,7 @@ import java.util.Properties
import scala.collection.JavaConverters.mapAsJavaMapConverter
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}
class JdbcRelationProvider extends CreatableRelationProvider
@ -50,67 +51,52 @@ class JdbcRelationProvider extends CreatableRelationProvider
JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
}
/*
* The following structure applies to this code:
* | tableExists | !tableExists
*------------------------------------------------------------------------------------
* Ignore | BaseRelation | CreateTable, saveTable, BaseRelation
* ErrorIfExists | ERROR | CreateTable, saveTable, BaseRelation
* Overwrite* | (DropTable, CreateTable,) | CreateTable, saveTable, BaseRelation
* | saveTable, BaseRelation |
* Append | saveTable, BaseRelation | CreateTable, saveTable, BaseRelation
*
* *Overwrite & tableExists with truncate, will not drop & create, but instead truncate
*/
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val jdbcOptions = new JDBCOptions(parameters)
val url = jdbcOptions.url
val table = jdbcOptions.table
df: DataFrame): BaseRelation = {
val options = new JDBCOptions(parameters)
val url = options.url
val table = options.table
val createTableOptions = options.createTableOptions
val isTruncate = options.isTruncate
val props = new Properties()
props.putAll(parameters.asJava)
val conn = JdbcUtils.createConnectionFactory(url, props)()
val conn = JdbcUtils.createConnectionFactory(url, props)()
try {
val tableExists = JdbcUtils.tableExists(conn, url, table)
val (doCreate, doSave) = (mode, tableExists) match {
case (SaveMode.Ignore, true) => (false, false)
case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(
s"Table or view '$table' already exists, and SaveMode is set to ErrorIfExists.")
case (SaveMode.Overwrite, true) =>
if (jdbcOptions.isTruncate && JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
JdbcUtils.truncateTable(conn, table)
(false, true)
if (tableExists) {
mode match {
case SaveMode.Overwrite =>
if (isTruncate && isCascadingTruncateTable(url).contains(false)) {
// In this case, we should truncate table and then load.
truncateTable(conn, table)
saveTable(df, url, table, props)
} else {
JdbcUtils.dropTable(conn, table)
(true, true)
}
case (SaveMode.Append, true) => (false, true)
case (_, true) => throw new IllegalArgumentException(s"Unexpected SaveMode, '$mode'," +
" for handling existing tables.")
case (_, false) => (true, true)
// Otherwise, do not truncate the table, instead drop and recreate it
dropTable(conn, table)
createTable(df.schema, url, table, createTableOptions, conn)
saveTable(df, url, table, props)
}
if (doCreate) {
val schema = JdbcUtils.schemaString(data, url)
// To allow certain options to append when create a new table, which can be
// table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
val createtblOptions = jdbcOptions.createTableOptions
val sql = s"CREATE TABLE $table ($schema) $createtblOptions"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
case SaveMode.Append =>
saveTable(df, url, table, props)
case SaveMode.ErrorIfExists =>
throw new AnalysisException(
s"Table or view '$table' already exists. SaveMode: ErrorIfExists.")
case SaveMode.Ignore =>
// With `SaveMode.Ignore` mode, if table already exists, the save operation is expected
// to not save the contents of the DataFrame and to not change the existing data.
// Therefore, it is okay to do nothing here and then just return the relation below.
}
} else {
createTable(df.schema, url, table, createTableOptions, conn)
saveTable(df, url, table, props)
}
if (doSave) JdbcUtils.saveTable(data, url, table, props)
} finally {
conn.close()
}

View file

@ -552,7 +552,7 @@ object JdbcUtils extends Logging {
isolationLevel: Int): Iterator[Byte] = {
require(batchSize >= 1,
s"Invalid value `${batchSize.toString}` for parameter " +
s"`${JdbcUtils.JDBC_BATCH_INSERT_SIZE}`. The minimum value is 1.")
s"`$JDBC_BATCH_INSERT_SIZE`. The minimum value is 1.")
val conn = getConnection()
var committed = false
@ -657,10 +657,10 @@ object JdbcUtils extends Logging {
/**
* Compute the schema string for this RDD.
*/
def schemaString(df: DataFrame, url: String): String = {
def schemaString(schema: StructType, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field =>
schema.fields foreach { field =>
val name = dialect.quoteIdentifier(field.name)
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
val nullable = if (field.nullable) "" else "NOT NULL"
@ -697,4 +697,27 @@ object JdbcUtils extends Logging {
getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect, isolationLevel)
)
}
/**
* Creates a table with a given schema.
*/
def createTable(
schema: StructType,
url: String,
table: String,
createTableOptions: String,
conn: Connection): Unit = {
val strSchema = schemaString(schema, url)
// Create the table if the table does not exist.
// To allow certain options to append when create a new table, which can be
// table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
val sql = s"CREATE TABLE $table ($strSchema) $createTableOptions"
val statement = conn.createStatement
try {
statement.executeUpdate(sql)
} finally {
statement.close()
}
}
}

View file

@ -788,7 +788,7 @@ class JDBCSuite extends SparkFunSuite
test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") {
val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp")
val schema = JdbcUtils.schemaString(df.schema, "jdbc:mysql://localhost:3306/temp")
assert(schema.contains("`order` TEXT"))
}
}

View file

@ -132,6 +132,19 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
}
}
test("CREATE with ignore") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
df.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
df2.write.mode(SaveMode.Ignore).jdbc(url1, "TEST.DROPTEST", properties)
assert(2 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).count())
assert(3 === spark.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
}
test("CREATE with overwrite") {
val df = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)