[SPARK-7601] [SQL] Support Insert into JDBC Datasource

Supported InsertableRelation for JDBC Datasource JDBCRelation.
Example usage:
sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE testram1
        |USING org.apache.spark.sql.jdbc
        |OPTIONS (url '$url', dbtable 'testram1', user 'xx', password 'xx', driver 'com.h2.Driver')
      """.stripMargin.replaceAll("\n", " "))

sqlContext.sql("insert into table testram1 select * from testsrc")
sqlContext.sql("insert overwrite table testram1 select * from testsrc")

Author: Venkata Ramana Gollamudi <ramana.gollamudi@huawei.com>

Closes #6121 from gvramana/JDBCDatasource_insert and squashes the following commits:

f3fb5f1 [Venkata Ramana Gollamudi] Support for JDBC Datasource InsertableRelation

(cherry picked from commit 59aaa1dad6)
Signed-off-by: Michael Armbrust <michael@databricks.com>
This commit is contained in:
Venkata Ramana Gollamudi 2015-05-13 17:24:04 -07:00 committed by Michael Armbrust
parent c53ebea9db
commit 820aaa6b9a
2 changed files with 43 additions and 2 deletions

View file

@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.sources._
@ -129,7 +130,8 @@ private[sql] case class JDBCRelation(
parts: Array[Partition],
properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
extends BaseRelation
with PrunedFilteredScan {
with PrunedFilteredScan
with InsertableRelation {
override val needConversion: Boolean = false
@ -148,4 +150,8 @@ private[sql] case class JDBCRelation(
filters,
parts)
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.insertIntoJDBC(url, table, overwrite, properties)
}
}

View file

@ -43,6 +43,29 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
conn1 = DriverManager.getConnection(url1, properties)
conn1.prepareStatement("create schema test").executeUpdate()
conn1.prepareStatement("drop table if exists test.people").executeUpdate()
conn1.prepareStatement(
"create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
conn1.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate()
conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate()
conn1.prepareStatement("drop table if exists test.people1").executeUpdate()
conn1.prepareStatement(
"create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
conn1.commit()
TestSQLContext.sql(
s"""
|CREATE TEMPORARY TABLE PEOPLE
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
TestSQLContext.sql(
s"""
|CREATE TEMPORARY TABLE PEOPLE1
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
}
after {
@ -114,5 +137,17 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
}
}
test("INSERT to JDBC Datasource") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
test("INSERT to JDBC Datasource with overwrite") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
}