[SPARK-15648][SQL] Add teradataDialect for JDBC connection to Teradata

The contribution is my original work and I license the work to the project under the project’s open source license.

Note: the Teradata JDBC connector limits the row size to 64K. The default string datatype equivalent I used is a 255 character/byte length varchar. This effectively limits the max number of string columns to 250 when using the Teradata jdbc connector.

## What changes were proposed in this pull request?

Added a teradataDialect for JDBC connection to Teradata. The Teradata dialect uses VARCHAR(255) in place of TEXT for string datatypes, and CHAR(1) in place of BIT(1) for boolean datatypes.

## How was this patch tested?

I added two unit tests to double check that the types get set correctly for a teradata jdbc url. I also ran a couple manual tests to make sure the jdbc connector worked with teradata and to make sure that an error was thrown if a row could potentially exceed 64K (this error comes from the teradata jdbc connector, not from the spark code). I did not check how string columns longer than 255 characters are handled.

Author: Kirby Linvill <kirby.linvill@teradata.com>
Author: klinvill <kjlinvill@gmail.com>

Closes #16746 from klinvill/master.
This commit is contained in:
Kirby Linvill 2017-05-23 12:00:58 -07:00 committed by Xiao Li
parent 0d589ba00b
commit 4816c2ef5e
3 changed files with 47 additions and 0 deletions

View file

@ -174,6 +174,7 @@ object JdbcDialects {
registerDialect(MsSqlServerDialect) registerDialect(MsSqlServerDialect)
registerDialect(DerbyDialect) registerDialect(DerbyDialect)
registerDialect(OracleDialect) registerDialect(OracleDialect)
registerDialect(TeradataDialect)
/** /**
* Fetch the JdbcDialect class corresponding to a given database url. * Fetch the JdbcDialect class corresponding to a given database url.

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import java.sql.Types
import org.apache.spark.sql.types._
private case object TeradataDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = { url.startsWith("jdbc:teradata") }
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR))
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None
}
}

View file

@ -922,6 +922,18 @@ class JDBCSuite extends SparkFunSuite
assert(e2.contains("User specified schema not supported with `jdbc`")) assert(e2.contains("User specified schema not supported with `jdbc`"))
} }
test("SPARK-15648: teradataDialect StringType data mapping") {
val teradataDialect = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")
assert(teradataDialect.getJDBCType(StringType).
map(_.databaseTypeDefinition).get == "VARCHAR(255)")
}
test("SPARK-15648: teradataDialect BooleanType data mapping") {
val teradataDialect = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")
assert(teradataDialect.getJDBCType(BooleanType).
map(_.databaseTypeDefinition).get == "CHAR(1)")
}
test("Checking metrics correctness with JDBC") { test("Checking metrics correctness with JDBC") {
val foobarCnt = spark.table("foobar").count() val foobarCnt = spark.table("foobar").count()
val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF()) val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF())