[SPARK-21519][SQL] Add an option to the JDBC data source to initialize the target DB environment
Add an option to the JDBC data source to initialize the environment of the remote database session ## What changes were proposed in this pull request? This proposes an option to the JDBC datasource, tentatively called " sessionInitStatement" to implement the functionality of session initialization present for example in the Sqoop connector for Oracle (see https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_oraoop_oracle_session_initialization_statements ) . After each database session is opened to the remote DB, and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block in the case of Oracle). See also https://issues.apache.org/jira/browse/SPARK-21519 ## How was this patch tested? Manually tested using Spark SQL data source and Oracle JDBC Author: LucaCanali <luca.canali@cern.ch> Closes #18724 from LucaCanali/JDBC_datasource_sessionInitStatement.
This commit is contained in:
parent
2387f1e316
commit
0377338bf7
|
@ -1308,6 +1308,13 @@ the following case-insensitive options:
|
|||
</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td><code>sessionInitStatement</code></td>
|
||||
<td>
|
||||
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code>
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td><code>truncate</code></td>
|
||||
<td>
|
||||
|
|
|
@ -138,6 +138,8 @@ class JDBCOptions(
|
|||
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
|
||||
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
|
||||
}
|
||||
// An option to execute custom SQL before fetching data from the remote DB
|
||||
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
|
||||
}
|
||||
|
||||
object JDBCOptions {
|
||||
|
@ -161,4 +163,5 @@ object JDBCOptions {
|
|||
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
|
||||
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
|
||||
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
|
||||
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
|
||||
}
|
||||
|
|
|
@ -273,6 +273,21 @@ private[jdbc] class JDBCRDD(
|
|||
import scala.collection.JavaConverters._
|
||||
dialect.beforeFetch(conn, options.asProperties.asScala.toMap)
|
||||
|
||||
// This executes a generic SQL statement (or PL/SQL block) before reading
|
||||
// the table/query via JDBC. Use this feature to initialize the database
|
||||
// session environment, e.g. for optimizations and/or troubleshooting.
|
||||
options.sessionInitStatement match {
|
||||
case Some(sql) =>
|
||||
val statement = conn.prepareStatement(sql)
|
||||
logInfo(s"Executing sessionInitStatement: $sql")
|
||||
try {
|
||||
statement.execute()
|
||||
} finally {
|
||||
statement.close()
|
||||
}
|
||||
case None =>
|
||||
}
|
||||
|
||||
// H2's JDBC driver does not support the setSchema() method. We pass a
|
||||
// fully-qualified table name in the SELECT statement. I don't know how to
|
||||
// talk about a table in a completely portable way.
|
||||
|
|
|
@ -1044,4 +1044,35 @@ class JDBCSuite extends SparkFunSuite
|
|||
assert(sql("select * from people_view").count() == 3)
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") {
|
||||
val initSQL1 = "SET @MYTESTVAR 21519"
|
||||
val df1 = spark.read.format("jdbc")
|
||||
.option("url", urlWithUserAndPass)
|
||||
.option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))")
|
||||
.option("sessionInitStatement", initSQL1)
|
||||
.load()
|
||||
assert(df1.collect() === Array(Row(21519)))
|
||||
|
||||
val initSQL2 = "SET SCHEMA DUMMY"
|
||||
val df2 = spark.read.format("jdbc")
|
||||
.option("url", urlWithUserAndPass)
|
||||
.option("dbtable", "TEST.PEOPLE")
|
||||
.option("sessionInitStatement", initSQL2)
|
||||
.load()
|
||||
val e = intercept[SparkException] {df2.collect()}.getMessage
|
||||
assert(e.contains("""Schema "DUMMY" not found"""))
|
||||
|
||||
sql(
|
||||
s"""
|
||||
|CREATE OR REPLACE TEMPORARY VIEW test_sessionInitStatement
|
||||
|USING org.apache.spark.sql.jdbc
|
||||
|OPTIONS (url '$urlWithUserAndPass',
|
||||
|dbtable '(SELECT NVL(@MYTESTVAR1, -1), NVL(@MYTESTVAR2, -1))',
|
||||
|sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234')
|
||||
""".stripMargin)
|
||||
|
||||
val df3 = sql("SELECT * FROM test_sessionInitStatement")
|
||||
assert(df3.collect() === Array(Row(21519, 1234)))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue