[SPARK-11989][SQL] Only use commit in JDBC data source if the underlying database supports transactions

Fixes [SPARK-11989](https://issues.apache.org/jira/browse/SPARK-11989)

Author: CK50 <christian.kurz@oracle.com>
Author: Christian Kurz <christian.kurz@oracle.com>

Closes #9973 from CK50/branch-1.6_non-transactional.

(cherry picked from commit a589736a1b)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This commit is contained in:
CK50 2015-11-30 20:08:49 +08:00 committed by Reynold Xin
parent bf0e85a70a
commit 2db4662fe2

View file

@ -21,6 +21,7 @@ import java.sql.{Connection, PreparedStatement}
import java.util.Properties
import scala.util.Try
import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType, JdbcDialects}
@ -125,8 +126,19 @@ object JdbcUtils extends Logging {
dialect: JdbcDialect): Iterator[Byte] = {
val conn = getConnection()
var committed = false
val supportsTransactions = try {
conn.getMetaData().supportsDataManipulationTransactionsOnly() ||
conn.getMetaData().supportsDataDefinitionAndDataManipulationTransactions()
} catch {
case NonFatal(e) =>
logWarning("Exception while detecting transaction support", e)
true
}
try {
conn.setAutoCommit(false) // Everything in the same db transaction.
if (supportsTransactions) {
conn.setAutoCommit(false) // Everything in the same db transaction.
}
val stmt = insertStatement(conn, table, rddSchema)
try {
var rowCount = 0
@ -175,14 +187,18 @@ object JdbcUtils extends Logging {
} finally {
stmt.close()
}
conn.commit()
if (supportsTransactions) {
conn.commit()
}
committed = true
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
conn.rollback()
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
// The stage must succeed. We cannot propagate any exception close() might throw.