From 842902154a26d00e1dd01325089a2ef2b6c89b65 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 22 Jan 2021 13:03:02 +0900 Subject: [PATCH] [SPARK-34180][SQL] Fix the regression brought by SPARK-33888 for PostgresDialect ### What changes were proposed in this pull request? This PR fixes the regression bug brought by SPARK-33888 (#30902). After that PR merged, `PostgresDIalect#getCatalystType` throws Exception for array types. ``` [info] - Type mapping for various types *** FAILED *** (551 milliseconds) [info] java.util.NoSuchElementException: key not found: scale [info] at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:106) [info] at scala.collection.immutable.Map$EmptyMap$.apply(Map.scala:104) [info] at org.apache.spark.sql.types.Metadata.get(Metadata.scala:111) [info] at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51) [info] at org.apache.spark.sql.jdbc.PostgresDialect$.getCatalystType(PostgresDialect.scala:43) [info] at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321) ``` ### Why are the changes needed? To fix the regression bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I confirmed the test case `SPARK-22291: Conversion error when transforming array types of uuid, inet and cidr to StingType in PostgreSQL` in `PostgresIntegrationSuite` passed. I also confirmed whether all the `v2.*IntegrationSuite` pass because this PR changed them and they passed. Closes #31262 from sarutak/fix-postgres-dialect-regression. Authored-by: Kousuke Saruta Signed-off-by: HyukjinKwon --- .../sql/jdbc/v2/DB2IntegrationSuite.scala | 8 ++-- .../jdbc/v2/MsSqlServerIntegrationSuite.scala | 4 +- .../sql/jdbc/v2/MySQLIntegrationSuite.scala | 8 ++-- .../sql/jdbc/v2/OracleIntegrationSuite.scala | 4 +- .../jdbc/v2/PostgresIntegrationSuite.scala | 8 ++-- .../apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 19 +++++---- .../datasources/jdbc/JdbcUtils.scala | 13 +++--- .../v2/jdbc/JDBCTableCatalogSuite.scala | 40 +++++++++++-------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 18 ++++++--- 9 files changed, 68 insertions(+), 54 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala index 8cabf353c6..3b8008aca1 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala @@ -65,11 +65,11 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType) + var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DoubleType) + expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update column type from DOUBLE to STRING val msg1 = intercept[AnalysisException] { @@ -81,8 +81,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { override def testCreateTableWithProperty(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('CCSID'='UNICODE')") - var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType) + val t = spark.table(tbl) + val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala index a7e257dbdc..a756516457 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala @@ -67,11 +67,11 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBC override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType) + var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val msg1 = intercept[AnalysisException] { diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 5f63fde7a0..a567ab3b82 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -69,11 +69,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType) + var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val msg1 = intercept[AnalysisException] { @@ -110,8 +110,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { override def testCreateTableWithProperty(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')") - var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType) + val t = spark.table(tbl) + val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala index 241c9c1409..84b952937d 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala @@ -75,11 +75,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", DecimalType(10, 0)) + var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val msg1 = intercept[AnalysisException] { diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index a7fd9aa9a9..eded03afda 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -54,11 +54,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType) + var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val msg = intercept[AnalysisException] { @@ -70,8 +70,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes override def testCreateTableWithProperty(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('TABLESPACE'='pg_default')") - var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType) + val t = spark.table(tbl) + val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index a2dd837583..3807eb732a 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -32,16 +32,18 @@ private[v2] trait V2JDBCTest extends SharedSparkSession { def notSupportsTableComment: Boolean = false + val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build() + def testUpdateColumnNullability(tbl: String): Unit = { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL)") var t = spark.table(s"$catalogName.alt_table") // nullable is true in the expectedSchema because Spark always sets nullable to true // regardless of the JDBC metadata https://github.com/apache/spark/pull/18445 - var expectedSchema = new StructType().add("ID", StringType, nullable = true) + var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL") t = spark.table(s"$catalogName.alt_table") - expectedSchema = new StructType().add("ID", StringType, nullable = true) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update nullability of not existing column val msg = intercept[AnalysisException] { @@ -53,8 +55,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession { def testRenameColumn(tbl: String): Unit = { sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED") val t = spark.table(s"$tbl") - val expectedSchema = new StructType().add("RENAMED", StringType, nullable = true) - .add("ID1", StringType, nullable = true).add("ID2", StringType, nullable = true) + val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata) + .add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) } @@ -64,15 +66,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession { withTable(s"$catalogName.alt_table") { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)") var t = spark.table(s"$catalogName.alt_table") - var expectedSchema = new StructType().add("ID", StringType) + var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C1", StringType).add("C2", StringType) + expectedSchema = expectedSchema.add("C1", StringType, true, defaultMetadata) + .add("C2", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C3", StringType) + expectedSchema = expectedSchema.add("C3", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) // Add already existing column val msg = intercept[AnalysisException] { @@ -93,7 +96,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession { sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1") sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3") val t = spark.table(s"$catalogName.alt_table") - val expectedSchema = new StructType().add("C2", StringType) + val expectedSchema = new StructType().add("C2", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) // Drop not existing column val msg = intercept[AnalysisException] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 85a05f42c7..b709bbbb9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -305,18 +305,15 @@ object JdbcUtils extends Logging { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls } val metadata = new MetadataBuilder() + metadata.putLong("scale", fieldScale) + // SPARK-33888 - // - include scale in metadata for only DECIMAL & NUMERIC // - include TIME type metadata // - always build the metadata - dataType match { - // scalastyle:off - case java.sql.Types.NUMERIC => metadata.putLong("scale", fieldScale) - case java.sql.Types.DECIMAL => metadata.putLong("scale", fieldScale) - case java.sql.Types.TIME => metadata.putBoolean("logical_time_type", true) - case _ => - // scalastyle:on + if (dataType == java.sql.Types.TIME) { + metadata.putBoolean("logical_time_type", true) } + val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, fieldSize, fieldScale, isSigned)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 2fd976e0b9..6dbfbd013e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -34,6 +34,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val tempDir = Utils.createTempDir() val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" + val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build() var conn: java.sql.Connection = null override def sparkConf: SparkConf = super.sparkConf @@ -138,8 +139,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("load a table") { val t = spark.table("h2.test.people") val expectedSchema = new StructType() - .add("NAME", StringType) - .add("ID", IntegerType) + .add("NAME", StringType, true, defaultMetadata) + .add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table => val msg = intercept[AnalysisException] { @@ -177,13 +178,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)") var t = spark.table(tableName) var expectedSchema = new StructType() - .add("ID", IntegerType) - .add("C1", IntegerType) - .add("C2", StringType) + .add("ID", IntegerType, true, defaultMetadata) + .add("C1", IntegerType, true, defaultMetadata) + .add("C2", StringType, true, defaultMetadata) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)") t = spark.table(tableName) - expectedSchema = expectedSchema.add("c3", DoubleType) + expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata) assert(t.schema === expectedSchema) // Add already existing column val msg = intercept[AnalysisException] { @@ -207,8 +208,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("C", IntegerType) - .add("C0", IntegerType) + .add("C", IntegerType, true, defaultMetadata) + .add("C0", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) // Rename to already existing column val msg = intercept[AnalysisException] { @@ -232,7 +233,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName DROP COLUMN C1") sql(s"ALTER TABLE $tableName DROP COLUMN c3") val t = spark.table(tableName) - val expectedSchema = new StructType().add("C2", IntegerType) + val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) // Drop not existing column val msg = intercept[AnalysisException] { @@ -256,7 +257,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE DOUBLE") sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE") val t = spark.table(tableName) - val expectedSchema = new StructType().add("ID", DoubleType).add("deptno", DoubleType) + val expectedSchema = new StructType() + .add("ID", DoubleType, true, defaultMetadata) + .add("deptno", DoubleType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update not existing column val msg1 = intercept[AnalysisException] { @@ -286,7 +289,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("ID", IntegerType, nullable = true).add("deptno", IntegerType, nullable = true) + .add("ID", IntegerType, true, defaultMetadata) + .add("deptno", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update nullability of not existing column val msg = intercept[AnalysisException] { @@ -332,7 +336,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withTable(tableName) { sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)") var t = spark.table(tableName) - var expectedSchema = new StructType().add("c1", IntegerType).add("c2", IntegerType) + var expectedSchema = new StructType() + .add("c1", IntegerType, true, defaultMetadata) + .add("c2", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { @@ -344,7 +350,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3") - expectedSchema = new StructType().add("c1", IntegerType).add("c3", IntegerType) + expectedSchema = new StructType() + .add("c1", IntegerType, true, defaultMetadata) + .add("c3", IntegerType, true, defaultMetadata) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -358,7 +366,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName DROP COLUMN C3") - expectedSchema = new StructType().add("c1", IntegerType) + expectedSchema = new StructType().add("c1", IntegerType, true, defaultMetadata) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -372,7 +380,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE") - expectedSchema = new StructType().add("c1", DoubleType) + expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -386,7 +394,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL") - expectedSchema = new StructType().add("c1", DoubleType, nullable = true) + expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) t = spark.table(tableName) assert(t.schema === expectedSchema) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 730eb787cb..f2a49cb94d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -74,6 +74,8 @@ class JDBCSuite extends QueryTest } } + val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build() + override def beforeAll(): Unit = { super.beforeAll() Utils.classForName("org.h2.Driver") @@ -1252,8 +1254,8 @@ class JDBCSuite extends QueryTest } test("SPARK-16848: jdbc API throws an exception for user specified schema") { - val schema = StructType(Seq( - StructField("name", StringType, false), StructField("theid", IntegerType, false))) + val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata), + StructField("theid", IntegerType, false, defaultMetadata))) val parts = Array[String]("THEID < 2", "THEID >= 2") val e1 = intercept[AnalysisException] { spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) @@ -1273,7 +1275,9 @@ class JDBCSuite extends QueryTest props.put("customSchema", customSchema) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props) assert(df.schema.size === 2) - assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema)) + val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map( + f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + assert(df.schema === expectedSchema) assert(df.count() === 3) } @@ -1289,7 +1293,9 @@ class JDBCSuite extends QueryTest """.stripMargin.replaceAll("\n", " ")) val df = sql("select * from people_view") assert(df.schema.length === 2) - assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema)) + val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema) + .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + assert(df.schema === expectedSchema) assert(df.count() === 3) } } @@ -1404,8 +1410,8 @@ class JDBCSuite extends QueryTest } test("jdbc data source shouldn't have unnecessary metadata in its schema") { - val schema = StructType(Seq( - StructField("NAME", StringType, true), StructField("THEID", IntegerType, true))) + val schema = StructType(Seq(StructField("NAME", StringType, true, defaultMetadata), + StructField("THEID", IntegerType, true, defaultMetadata))) val df = spark.read.format("jdbc") .option("Url", urlWithUserAndPass)