[SPARK-8004][SQL] Enclose column names by JDBC Dialect
JIRA: https://issues.apache.org/jira/browse/SPARK-8004 Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #6577 from viirya/enclose_jdbc_columns and squashes the following commits: 614606a [Liang-Chi Hsieh] For comment. bc50182 [Liang-Chi Hsieh] Enclose column names by JDBC Dialect.
This commit is contained in:
parent
3285a51121
commit
901a552c5e
|
@ -211,12 +211,14 @@ private[sql] object JDBCRDD extends Logging {
|
|||
requiredColumns: Array[String],
|
||||
filters: Array[Filter],
|
||||
parts: Array[Partition]): RDD[Row] = {
|
||||
val dialect = JdbcDialects.get(url)
|
||||
val enclosedColumns = requiredColumns.map(dialect.columnEnclosing(_))
|
||||
new JDBCRDD(
|
||||
sc,
|
||||
getConnector(driver, url, properties),
|
||||
pruneSchema(schema, requiredColumns),
|
||||
fqTable,
|
||||
requiredColumns,
|
||||
enclosedColumns,
|
||||
filters,
|
||||
parts,
|
||||
properties)
|
||||
|
|
|
@ -80,6 +80,15 @@ abstract class JdbcDialect {
|
|||
* @return The new JdbcType if there is an override for this DataType
|
||||
*/
|
||||
def getJDBCType(dt: DataType): Option[JdbcType] = None
|
||||
|
||||
/**
|
||||
* Enclose column name
|
||||
* @param colName The coulmn name
|
||||
* @return Enclosed column name
|
||||
*/
|
||||
def columnEnclosing(colName: String): String = {
|
||||
s""""$colName""""
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -208,4 +217,8 @@ case object MySQLDialect extends JdbcDialect {
|
|||
Some(BooleanType)
|
||||
} else None
|
||||
}
|
||||
|
||||
override def columnEnclosing(colName: String): String = {
|
||||
s"`$colName`"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -410,6 +410,17 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
assert(JdbcDialects.get("test.invalid") == NoopDialect)
|
||||
}
|
||||
|
||||
test("Enclosing column names by jdbc dialect") {
|
||||
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
|
||||
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
|
||||
|
||||
val columns = Seq("abc", "key")
|
||||
val MySQLColumns = columns.map(MySQL.columnEnclosing(_))
|
||||
val PostgresColumns = columns.map(Postgres.columnEnclosing(_))
|
||||
assert(MySQLColumns === Seq("`abc`", "`key`"))
|
||||
assert(PostgresColumns === Seq(""""abc"""", """"key""""))
|
||||
}
|
||||
|
||||
test("Dialect unregister") {
|
||||
JdbcDialects.registerDialect(testH2Dialect)
|
||||
JdbcDialects.unregisterDialect(testH2Dialect)
|
||||
|
|
Loading…
Reference in a new issue