add test for JdbcRDD using embedded derby, per rxin suggestion
This commit is contained in:
parent
3da2305ed0
commit
b16c4896f6
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -36,3 +36,4 @@ streaming-tests.log
|
|||
dependency-reduced-pom.xml
|
||||
.ensime
|
||||
.ensime_lucene
|
||||
derby.log
|
||||
|
|
56
core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
Normal file
56
core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
Normal file
|
@ -0,0 +1,56 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.{ BeforeAndAfter, FunSuite }
|
||||
import spark.SparkContext._
|
||||
import spark.rdd.JdbcRDD
|
||||
import java.sql._
|
||||
|
||||
class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
|
||||
|
||||
before {
|
||||
Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
|
||||
val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
|
||||
try {
|
||||
val create = conn.createStatement
|
||||
create.execute("""
|
||||
CREATE TABLE FOO(
|
||||
ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
|
||||
DATA INTEGER
|
||||
)""")
|
||||
create.close
|
||||
val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
|
||||
(1 to 100).foreach { i =>
|
||||
insert.setInt(1, i * 2)
|
||||
insert.executeUpdate
|
||||
}
|
||||
insert.close
|
||||
} catch {
|
||||
case e: SQLException if e.getSQLState == "X0Y32" =>
|
||||
// table exists
|
||||
} finally {
|
||||
conn.close
|
||||
}
|
||||
}
|
||||
|
||||
test("basic functionality") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd = new JdbcRDD(
|
||||
sc,
|
||||
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
|
||||
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
|
||||
1, 100, 3,
|
||||
(r: ResultSet) => { r.getInt(1) } ).cache
|
||||
|
||||
assert(rdd.count === 100)
|
||||
assert(rdd.reduce(_+_) === 10100)
|
||||
}
|
||||
|
||||
after {
|
||||
try {
|
||||
DriverManager.getConnection("jdbc:derby:;shutdown=true")
|
||||
} catch {
|
||||
case se: SQLException if se.getSQLState == "XJ015" =>
|
||||
// normal shutdown
|
||||
}
|
||||
}
|
||||
}
|
|
@ -147,6 +147,7 @@ object SparkBuild extends Build {
|
|||
"cc.spray" % "spray-can" % "1.0-M2.1",
|
||||
"cc.spray" % "spray-server" % "1.0-M2.1",
|
||||
"cc.spray" % "spray-json_2.9.2" % "1.1.1",
|
||||
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
|
||||
"org.apache.mesos" % "mesos" % "0.9.0-incubating"
|
||||
) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
|
||||
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
|
||||
|
|
Loading…
Reference in a new issue