[SPARK-34180][SQL] Fix the regression brought by SPARK-33888 for PostgresDialect

### What changes were proposed in this pull request?

This PR fixes the regression bug brought by SPARK-33888 (#30902).
After that PR merged, `PostgresDIalect#getCatalystType` throws Exception for array types.
```
[info] - Type mapping for various types *** FAILED *** (551 milliseconds)
[info]   java.util.NoSuchElementException: key not found: scale
[info]   at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:106)
[info]   at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:104)
[info]   at org.apache.spark.sql.types.Metadata.get(Metadata.scala:111)
[info]   at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51)
[info]   at org.apache.spark.sql.jdbc.PostgresDialect$.getCatalystType(PostgresDialect.scala:43)
[info]   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
```

### Why are the changes needed?

To fix the regression bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I confirmed the test case `SPARK-22291: Conversion error when transforming array types of uuid, inet and cidr to StingType in PostgreSQL` in `PostgresIntegrationSuite` passed.

I also confirmed whether all the `v2.*IntegrationSuite` pass because this PR changed them and they passed.

Closes #31262 from sarutak/fix-postgres-dialect-regression.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Kousuke Saruta 2021-01-22 13:03:02 +09:00 committed by HyukjinKwon
parent 116f4cab6b
commit 842902154a
9 changed files with 68 additions and 54 deletions

View file

@ -65,11 +65,11 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", DoubleType)
expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from DOUBLE to STRING
val msg1 = intercept[AnalysisException] {
@ -81,8 +81,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testCreateTableWithProperty(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('CCSID'='UNICODE')")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}
}

View file

@ -67,11 +67,11 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBC
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {

View file

@ -69,11 +69,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {
@ -110,8 +110,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testCreateTableWithProperty(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}
}

View file

@ -75,11 +75,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", DecimalType(10, 0))
var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {

View file

@ -54,11 +54,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg = intercept[AnalysisException] {
@ -70,8 +70,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
override def testCreateTableWithProperty(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('TABLESPACE'='pg_default')")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}
}

View file

@ -32,16 +32,18 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {
def notSupportsTableComment: Boolean = false
val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()
def testUpdateColumnNullability(tbl: String): Unit = {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL)")
var t = spark.table(s"$catalogName.alt_table")
// nullable is true in the expectedSchema because Spark always sets nullable to true
// regardless of the JDBC metadata https://github.com/apache/spark/pull/18445
var expectedSchema = new StructType().add("ID", StringType, nullable = true)
var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = new StructType().add("ID", StringType, nullable = true)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
@ -53,8 +55,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {
def testRenameColumn(tbl: String): Unit = {
sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED")
val t = spark.table(s"$tbl")
val expectedSchema = new StructType().add("RENAMED", StringType, nullable = true)
.add("ID1", StringType, nullable = true).add("ID2", StringType, nullable = true)
val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata)
.add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}
@ -64,15 +66,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {
withTable(s"$catalogName.alt_table") {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)")
var t = spark.table(s"$catalogName.alt_table")
var expectedSchema = new StructType().add("ID", StringType)
var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = expectedSchema.add("C1", StringType).add("C2", StringType)
expectedSchema = expectedSchema.add("C1", StringType, true, defaultMetadata)
.add("C2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = expectedSchema.add("C3", StringType)
expectedSchema = expectedSchema.add("C3", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Add already existing column
val msg = intercept[AnalysisException] {
@ -93,7 +96,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1")
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3")
val t = spark.table(s"$catalogName.alt_table")
val expectedSchema = new StructType().add("C2", StringType)
val expectedSchema = new StructType().add("C2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Drop not existing column
val msg = intercept[AnalysisException] {

View file

@ -305,18 +305,15 @@ object JdbcUtils extends Logging {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder()
metadata.putLong("scale", fieldScale)
// SPARK-33888
// - include scale in metadata for only DECIMAL & NUMERIC
// - include TIME type metadata
// - always build the metadata
dataType match {
// scalastyle:off
case java.sql.Types.NUMERIC => metadata.putLong("scale", fieldScale)
case java.sql.Types.DECIMAL => metadata.putLong("scale", fieldScale)
case java.sql.Types.TIME => metadata.putBoolean("logical_time_type", true)
case _ =>
// scalastyle:on
if (dataType == java.sql.Types.TIME) {
metadata.putBoolean("logical_time_type", true)
}
val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, fieldSize, fieldScale, isSigned))

View file

@ -34,6 +34,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val tempDir = Utils.createTempDir()
val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()
var conn: java.sql.Connection = null
override def sparkConf: SparkConf = super.sparkConf
@ -138,8 +139,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
test("load a table") {
val t = spark.table("h2.test.people")
val expectedSchema = new StructType()
.add("NAME", StringType)
.add("ID", IntegerType)
.add("NAME", StringType, true, defaultMetadata)
.add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
@ -177,13 +178,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)")
var t = spark.table(tableName)
var expectedSchema = new StructType()
.add("ID", IntegerType)
.add("C1", IntegerType)
.add("C2", StringType)
.add("ID", IntegerType, true, defaultMetadata)
.add("C1", IntegerType, true, defaultMetadata)
.add("C2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)")
t = spark.table(tableName)
expectedSchema = expectedSchema.add("c3", DoubleType)
expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Add already existing column
val msg = intercept[AnalysisException] {
@ -207,8 +208,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C")
val t = spark.table(tableName)
val expectedSchema = new StructType()
.add("C", IntegerType)
.add("C0", IntegerType)
.add("C", IntegerType, true, defaultMetadata)
.add("C0", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Rename to already existing column
val msg = intercept[AnalysisException] {
@ -232,7 +233,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName DROP COLUMN C1")
sql(s"ALTER TABLE $tableName DROP COLUMN c3")
val t = spark.table(tableName)
val expectedSchema = new StructType().add("C2", IntegerType)
val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Drop not existing column
val msg = intercept[AnalysisException] {
@ -256,7 +257,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE DOUBLE")
sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE")
val t = spark.table(tableName)
val expectedSchema = new StructType().add("ID", DoubleType).add("deptno", DoubleType)
val expectedSchema = new StructType()
.add("ID", DoubleType, true, defaultMetadata)
.add("deptno", DoubleType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update not existing column
val msg1 = intercept[AnalysisException] {
@ -286,7 +289,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL")
val t = spark.table(tableName)
val expectedSchema = new StructType()
.add("ID", IntegerType, nullable = true).add("deptno", IntegerType, nullable = true)
.add("ID", IntegerType, true, defaultMetadata)
.add("deptno", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
@ -332,7 +336,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withTable(tableName) {
sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)")
var t = spark.table(tableName)
var expectedSchema = new StructType().add("c1", IntegerType).add("c2", IntegerType)
var expectedSchema = new StructType()
.add("c1", IntegerType, true, defaultMetadata)
.add("c2", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
@ -344,7 +350,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
expectedSchema = new StructType().add("c1", IntegerType).add("c3", IntegerType)
expectedSchema = new StructType()
.add("c1", IntegerType, true, defaultMetadata)
.add("c3", IntegerType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
@ -358,7 +366,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName DROP COLUMN C3")
expectedSchema = new StructType().add("c1", IntegerType)
expectedSchema = new StructType().add("c1", IntegerType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
@ -372,7 +380,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
expectedSchema = new StructType().add("c1", DoubleType)
expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
@ -386,7 +394,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
expectedSchema = new StructType().add("c1", DoubleType, nullable = true)
expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}

View file

@ -74,6 +74,8 @@ class JDBCSuite extends QueryTest
}
}
val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()
override def beforeAll(): Unit = {
super.beforeAll()
Utils.classForName("org.h2.Driver")
@ -1252,8 +1254,8 @@ class JDBCSuite extends QueryTest
}
test("SPARK-16848: jdbc API throws an exception for user specified schema") {
val schema = StructType(Seq(
StructField("name", StringType, false), StructField("theid", IntegerType, false)))
val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata),
StructField("theid", IntegerType, false, defaultMetadata)))
val parts = Array[String]("THEID < 2", "THEID >= 2")
val e1 = intercept[AnalysisException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
@ -1273,7 +1275,9 @@ class JDBCSuite extends QueryTest
props.put("customSchema", customSchema)
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
assert(df.schema.size === 2)
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map(
f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray)
assert(df.schema === expectedSchema)
assert(df.count() === 3)
}
@ -1289,7 +1293,9 @@ class JDBCSuite extends QueryTest
""".stripMargin.replaceAll("\n", " "))
val df = sql("select * from people_view")
assert(df.schema.length === 2)
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema)
.map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray)
assert(df.schema === expectedSchema)
assert(df.count() === 3)
}
}
@ -1404,8 +1410,8 @@ class JDBCSuite extends QueryTest
}
test("jdbc data source shouldn't have unnecessary metadata in its schema") {
val schema = StructType(Seq(
StructField("NAME", StringType, true), StructField("THEID", IntegerType, true)))
val schema = StructType(Seq(StructField("NAME", StringType, true, defaultMetadata),
StructField("THEID", IntegerType, true, defaultMetadata)))
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)