[SPARK-10521][SQL] Utilize Docker for test DB2 JDBC Dialect support
Add integration tests based on docker to test DB2 JDBC dialect support Author: Luciano Resende <lresende@apache.org> Closes #9893 from lresende/SPARK-10521.
This commit is contained in:
parent
3f0f40800b
commit
94de63053e
30
external/docker-integration-tests/pom.xml
vendored
30
external/docker-integration-tests/pom.xml
vendored
|
@ -34,6 +34,13 @@
|
|||
<sbt.project.name>docker-integration-tests</sbt.project.name>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>db2</id>
|
||||
<url>https://app.camunda.com/nexus/content/repositories/public/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.spotify</groupId>
|
||||
|
@ -180,5 +187,28 @@
|
|||
</exclusions>
|
||||
</dependency>
|
||||
<!-- End Jersey dependencies -->
|
||||
|
||||
<!-- DB2 JCC driver manual installation instructions
|
||||
|
||||
You can build this datasource if you:
|
||||
1) have the DB2 artifacts installed in a local repo and supply the URL:
|
||||
-Dmaven.repo.drivers=http://my.local.repo
|
||||
|
||||
2) have a copy of the DB2 JCC driver and run the following commands :
|
||||
mvn install:install-file -Dfile=${path to db2jcc4.jar} \
|
||||
-DgroupId=com.ibm.db2 \
|
||||
-DartifactId=db2jcc4 \
|
||||
-Dversion=10.5 \
|
||||
-Dpackaging=jar
|
||||
|
||||
Note: IBM DB2 JCC driver is available for download at
|
||||
http://www-01.ibm.com/support/docview.wss?uid=swg21363866
|
||||
-->
|
||||
<dependency>
|
||||
<groupId>com.ibm.db2.jcc</groupId>
|
||||
<artifactId>db2jcc4</artifactId>
|
||||
<version>10.5.0.5</version>
|
||||
<type>jar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
157
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
vendored
Normal file
157
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
vendored
Normal file
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.jdbc
|
||||
|
||||
import java.math.BigDecimal
|
||||
import java.sql.{Connection, Date, Timestamp}
|
||||
import java.util.Properties
|
||||
|
||||
import org.scalatest._
|
||||
|
||||
import org.apache.spark.tags.DockerTest
|
||||
|
||||
@DockerTest
|
||||
@Ignore // AMPLab Jenkins needs to be updated before shared memory works on docker
|
||||
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
|
||||
override val db = new DatabaseOnDocker {
|
||||
override val imageName = "lresende/db2express-c:10.5.0.5-3.10.0"
|
||||
override val env = Map(
|
||||
"DB2INST1_PASSWORD" -> "rootpass",
|
||||
"LICENSE" -> "accept"
|
||||
)
|
||||
override val usesIpc = true
|
||||
override val jdbcPort: Int = 50000
|
||||
override def getJdbcUrl(ip: String, port: Int): String =
|
||||
s"jdbc:db2://$ip:$port/foo:user=db2inst1;password=rootpass;"
|
||||
override def getStartupProcessName: Option[String] = Some("db2start")
|
||||
}
|
||||
|
||||
override def dataPreparation(conn: Connection): Unit = {
|
||||
conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y VARCHAR(8))").executeUpdate()
|
||||
conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate()
|
||||
conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate()
|
||||
|
||||
conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), "
|
||||
+ "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, "
|
||||
+ "dbl DOUBLE)").executeUpdate()
|
||||
conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
|
||||
+ "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, "
|
||||
+ "42.75, 1.0000000000000002)").executeUpdate()
|
||||
|
||||
conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, "
|
||||
+ "yr YEAR)").executeUpdate()
|
||||
conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', "
|
||||
+ "'1996-01-01 01:23:45', '2009-02-13 23:31:30', '2001')").executeUpdate()
|
||||
|
||||
// TODO: Test locale conversion for strings.
|
||||
conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c CLOB, d BLOB, "
|
||||
+ "e CHAR FOR BIT DATA)").executeUpdate()
|
||||
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', 'jumps'")
|
||||
.executeUpdate()
|
||||
}
|
||||
|
||||
test("Basic test") {
|
||||
val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
|
||||
val rows = df.collect()
|
||||
assert(rows.length == 2)
|
||||
val types = rows(0).toSeq.map(x => x.getClass.toString)
|
||||
assert(types.length == 2)
|
||||
assert(types(0).equals("class java.lang.Integer"))
|
||||
assert(types(1).equals("class java.lang.String"))
|
||||
}
|
||||
|
||||
test("Numeric types") {
|
||||
val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
|
||||
val rows = df.collect()
|
||||
assert(rows.length == 1)
|
||||
val types = rows(0).toSeq.map(x => x.getClass.toString)
|
||||
assert(types.length == 9)
|
||||
assert(types(0).equals("class java.lang.Boolean"))
|
||||
assert(types(1).equals("class java.lang.Long"))
|
||||
assert(types(2).equals("class java.lang.Integer"))
|
||||
assert(types(3).equals("class java.lang.Integer"))
|
||||
assert(types(4).equals("class java.lang.Integer"))
|
||||
assert(types(5).equals("class java.lang.Long"))
|
||||
assert(types(6).equals("class java.math.BigDecimal"))
|
||||
assert(types(7).equals("class java.lang.Double"))
|
||||
assert(types(8).equals("class java.lang.Double"))
|
||||
assert(rows(0).getBoolean(0) == false)
|
||||
assert(rows(0).getLong(1) == 0x225)
|
||||
assert(rows(0).getInt(2) == 17)
|
||||
assert(rows(0).getInt(3) == 77777)
|
||||
assert(rows(0).getInt(4) == 123456789)
|
||||
assert(rows(0).getLong(5) == 123456789012345L)
|
||||
val bd = new BigDecimal("123456789012345.12345678901234500000")
|
||||
assert(rows(0).getAs[BigDecimal](6).equals(bd))
|
||||
assert(rows(0).getDouble(7) == 42.75)
|
||||
assert(rows(0).getDouble(8) == 1.0000000000000002)
|
||||
}
|
||||
|
||||
test("Date types") {
|
||||
val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
|
||||
val rows = df.collect()
|
||||
assert(rows.length == 1)
|
||||
val types = rows(0).toSeq.map(x => x.getClass.toString)
|
||||
assert(types.length == 5)
|
||||
assert(types(0).equals("class java.sql.Date"))
|
||||
assert(types(1).equals("class java.sql.Timestamp"))
|
||||
assert(types(2).equals("class java.sql.Timestamp"))
|
||||
assert(types(3).equals("class java.sql.Timestamp"))
|
||||
assert(types(4).equals("class java.sql.Date"))
|
||||
assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09")))
|
||||
assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 13:31:24")))
|
||||
assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 01:23:45")))
|
||||
assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 23:31:30")))
|
||||
assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01")))
|
||||
}
|
||||
|
||||
test("String types") {
|
||||
val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
|
||||
val rows = df.collect()
|
||||
assert(rows.length == 1)
|
||||
val types = rows(0).toSeq.map(x => x.getClass.toString)
|
||||
assert(types.length == 9)
|
||||
assert(types(0).equals("class java.lang.String"))
|
||||
assert(types(1).equals("class java.lang.String"))
|
||||
assert(types(2).equals("class java.lang.String"))
|
||||
assert(types(3).equals("class java.lang.String"))
|
||||
assert(types(4).equals("class java.lang.String"))
|
||||
assert(types(5).equals("class java.lang.String"))
|
||||
assert(types(6).equals("class [B"))
|
||||
assert(types(7).equals("class [B"))
|
||||
assert(types(8).equals("class [B"))
|
||||
assert(rows(0).getString(0).equals("the"))
|
||||
assert(rows(0).getString(1).equals("quick"))
|
||||
assert(rows(0).getString(2).equals("brown"))
|
||||
assert(rows(0).getString(3).equals("fox"))
|
||||
assert(rows(0).getString(4).equals("jumps"))
|
||||
assert(rows(0).getString(5).equals("over"))
|
||||
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](116, 104, 101, 0)))
|
||||
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), Array[Byte](108, 97, 122, 121)))
|
||||
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), Array[Byte](100, 111, 103)))
|
||||
}
|
||||
|
||||
test("Basic write test") {
|
||||
val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
|
||||
val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
|
||||
val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
|
||||
df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
|
||||
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
|
||||
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
|
||||
}
|
||||
}
|
|
@ -44,6 +44,11 @@ abstract class DatabaseOnDocker {
|
|||
*/
|
||||
val env: Map[String, String]
|
||||
|
||||
/**
|
||||
* Wheather or not to use ipc mode for shared memory when starting docker image
|
||||
*/
|
||||
val usesIpc: Boolean
|
||||
|
||||
/**
|
||||
* The container-internal JDBC port that the database listens on.
|
||||
*/
|
||||
|
@ -53,6 +58,11 @@ abstract class DatabaseOnDocker {
|
|||
* Return a JDBC URL that connects to the database running at the given IP address and port.
|
||||
*/
|
||||
def getJdbcUrl(ip: String, port: Int): String
|
||||
|
||||
/**
|
||||
* Optional process to run when container starts
|
||||
*/
|
||||
def getStartupProcessName: Option[String]
|
||||
}
|
||||
|
||||
abstract class DockerJDBCIntegrationSuite
|
||||
|
@ -97,17 +107,23 @@ abstract class DockerJDBCIntegrationSuite
|
|||
val dockerIp = DockerUtils.getDockerIp()
|
||||
val hostConfig: HostConfig = HostConfig.builder()
|
||||
.networkMode("bridge")
|
||||
.ipcMode(if (db.usesIpc) "host" else "")
|
||||
.portBindings(
|
||||
Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava)
|
||||
.build()
|
||||
// Create the database container:
|
||||
val config = ContainerConfig.builder()
|
||||
val containerConfigBuilder = ContainerConfig.builder()
|
||||
.image(db.imageName)
|
||||
.networkDisabled(false)
|
||||
.env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
|
||||
.hostConfig(hostConfig)
|
||||
.exposedPorts(s"${db.jdbcPort}/tcp")
|
||||
.build()
|
||||
if(db.getStartupProcessName.isDefined) {
|
||||
containerConfigBuilder
|
||||
.cmd(db.getStartupProcessName.get)
|
||||
}
|
||||
val config = containerConfigBuilder.build()
|
||||
// Create the database container:
|
||||
containerId = docker.createContainer(config).id
|
||||
// Start the container and wait until the database can accept JDBC connections:
|
||||
docker.startContainer(containerId)
|
||||
|
|
|
@ -30,9 +30,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
|
|||
override val env = Map(
|
||||
"MYSQL_ROOT_PASSWORD" -> "rootpass"
|
||||
)
|
||||
override val usesIpc = false
|
||||
override val jdbcPort: Int = 3306
|
||||
override def getJdbcUrl(ip: String, port: Int): String =
|
||||
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
|
||||
override def getStartupProcessName: Option[String] = None
|
||||
}
|
||||
|
||||
override def dataPreparation(conn: Connection): Unit = {
|
||||
|
|
|
@ -52,9 +52,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
|
|||
override val env = Map(
|
||||
"ORACLE_ROOT_PASSWORD" -> "oracle"
|
||||
)
|
||||
override val usesIpc = false
|
||||
override val jdbcPort: Int = 1521
|
||||
override def getJdbcUrl(ip: String, port: Int): String =
|
||||
s"jdbc:oracle:thin:system/oracle@//$ip:$port/xe"
|
||||
override def getStartupProcessName: Option[String] = None
|
||||
}
|
||||
|
||||
override def dataPreparation(conn: Connection): Unit = {
|
||||
|
|
|
@ -32,9 +32,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
|
|||
override val env = Map(
|
||||
"POSTGRES_PASSWORD" -> "rootpass"
|
||||
)
|
||||
override val usesIpc = false
|
||||
override val jdbcPort = 5432
|
||||
override def getJdbcUrl(ip: String, port: Int): String =
|
||||
s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass"
|
||||
override def getStartupProcessName: Option[String] = None
|
||||
}
|
||||
|
||||
override def dataPreparation(conn: Connection): Unit = {
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -666,7 +666,7 @@
|
|||
<groupId>com.spotify</groupId>
|
||||
<artifactId>docker-client</artifactId>
|
||||
<classifier>shaded</classifier>
|
||||
<version>3.4.0</version>
|
||||
<version>3.6.6</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
|
|
|
@ -366,8 +366,10 @@ object Flume {
|
|||
object DockerIntegrationTests {
|
||||
// This serves to override the override specified in DependencyOverrides:
|
||||
lazy val settings = Seq(
|
||||
dependencyOverrides += "com.google.guava" % "guava" % "18.0"
|
||||
dependencyOverrides += "com.google.guava" % "guava" % "18.0",
|
||||
resolvers ++= Seq("DB2" at "https://app.camunda.com/nexus/content/repositories/public/")
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue