[SPARK-8004][SQL] Quote identifier in JDBC data source.
This is a follow-up patch to #6577 to replace columnEnclosing to quoteIdentifier.
I also did some minor cleanup to the JdbcDialect file.
Author: Reynold Xin <rxin@databricks.com>
Closes #6689 from rxin/jdbc-quote and squashes the following commits:
bad365f [Reynold Xin] Fixed test compilation...
e39e14e [Reynold Xin] Fixed compilation.
db9a8e0 [Reynold Xin] [SPARK-8004][SQL] Quote identifier in JDBC data source.
(cherry picked from commit d6d601a07b
)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This commit is contained in:
parent
ff26767c03
commit
b9c046f6d7
|
@ -212,13 +212,13 @@ private[sql] object JDBCRDD extends Logging {
|
|||
filters: Array[Filter],
|
||||
parts: Array[Partition]): RDD[Row] = {
|
||||
val dialect = JdbcDialects.get(url)
|
||||
val enclosedColumns = requiredColumns.map(dialect.columnEnclosing(_))
|
||||
val quotedColumns = requiredColumns.map(colName => dialect.quoteIdentifier(colName))
|
||||
new JDBCRDD(
|
||||
sc,
|
||||
getConnector(driver, url, properties),
|
||||
pruneSchema(schema, requiredColumns),
|
||||
fqTable,
|
||||
enclosedColumns,
|
||||
quotedColumns,
|
||||
filters,
|
||||
parts,
|
||||
properties)
|
||||
|
|
|
@ -17,11 +17,11 @@
|
|||
|
||||
package org.apache.spark.sql.jdbc
|
||||
|
||||
import java.sql.Types
|
||||
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
|
||||
import java.sql.Types
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* A database type definition coupled with the jdbc type needed to send null
|
||||
|
@ -82,11 +82,10 @@ abstract class JdbcDialect {
|
|||
def getJDBCType(dt: DataType): Option[JdbcType] = None
|
||||
|
||||
/**
|
||||
* Enclose column name
|
||||
* @param colName The coulmn name
|
||||
* @return Enclosed column name
|
||||
* Quotes the identifier. This is used to put quotes around the identifier in case the column
|
||||
* name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
|
||||
*/
|
||||
def columnEnclosing(colName: String): String = {
|
||||
def quoteIdentifier(colName: String): String = {
|
||||
s""""$colName""""
|
||||
}
|
||||
}
|
||||
|
@ -150,18 +149,19 @@ object JdbcDialects {
|
|||
@DeveloperApi
|
||||
class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
|
||||
|
||||
require(!dialects.isEmpty)
|
||||
require(dialects.nonEmpty)
|
||||
|
||||
def canHandle(url : String): Boolean =
|
||||
override def canHandle(url : String): Boolean =
|
||||
dialects.map(_.canHandle(url)).reduce(_ && _)
|
||||
|
||||
override def getCatalystType(
|
||||
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] =
|
||||
dialects.map(_.getCatalystType(sqlType, typeName, size, md)).flatten.headOption
|
||||
|
||||
override def getJDBCType(dt: DataType): Option[JdbcType] =
|
||||
dialects.map(_.getJDBCType(dt)).flatten.headOption
|
||||
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
|
||||
dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
|
||||
}
|
||||
|
||||
override def getJDBCType(dt: DataType): Option[JdbcType] = {
|
||||
dialects.flatMap(_.getJDBCType(dt)).headOption
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -170,7 +170,7 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
|
|||
*/
|
||||
@DeveloperApi
|
||||
case object NoopDialect extends JdbcDialect {
|
||||
def canHandle(url : String): Boolean = true
|
||||
override def canHandle(url : String): Boolean = true
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -179,7 +179,7 @@ case object NoopDialect extends JdbcDialect {
|
|||
*/
|
||||
@DeveloperApi
|
||||
case object PostgresDialect extends JdbcDialect {
|
||||
def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
|
||||
override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
|
||||
override def getCatalystType(
|
||||
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
|
||||
if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
|
||||
|
@ -205,7 +205,7 @@ case object PostgresDialect extends JdbcDialect {
|
|||
*/
|
||||
@DeveloperApi
|
||||
case object MySQLDialect extends JdbcDialect {
|
||||
def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
|
||||
override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
|
||||
override def getCatalystType(
|
||||
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
|
||||
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
|
||||
|
@ -218,7 +218,7 @@ case object MySQLDialect extends JdbcDialect {
|
|||
} else None
|
||||
}
|
||||
|
||||
override def columnEnclosing(colName: String): String = {
|
||||
override def quoteIdentifier(colName: String): String = {
|
||||
s"`$colName`"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -411,13 +411,13 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
assert(JdbcDialects.get("test.invalid") == NoopDialect)
|
||||
}
|
||||
|
||||
test("Enclosing column names by jdbc dialect") {
|
||||
test("quote column names by jdbc dialect") {
|
||||
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
|
||||
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
|
||||
|
||||
val columns = Seq("abc", "key")
|
||||
val MySQLColumns = columns.map(MySQL.columnEnclosing(_))
|
||||
val PostgresColumns = columns.map(Postgres.columnEnclosing(_))
|
||||
val MySQLColumns = columns.map(MySQL.quoteIdentifier(_))
|
||||
val PostgresColumns = columns.map(Postgres.quoteIdentifier(_))
|
||||
assert(MySQLColumns === Seq("`abc`", "`key`"))
|
||||
assert(PostgresColumns === Seq(""""abc"""", """"key""""))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue