diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 8c5721340a..f250fec7d0 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -23,6 +23,7 @@ license: | {:toc} ## Upgrading from Spark SQL 2.4 to 3.0 + - Since Spark 3.0, when inserting a value into a table column with a different data type, the type coercion is performed as per ANSI SQL standard. Certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean` are disallowed. A runtime exception will be thrown if the value is out-of-range for the data type of the column. In Spark version 2.4 and earlier, type conversions during table insertion are allowed as long as they are valid `Cast`. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of byte type, the result is 1. The behavior is controlled by the option `spark.sql.storeAssignmentPolicy`, with a default value as "ANSI". Setting the option as "Legacy" restores the previous behavior. - In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 10e833c8a0..46eca4a1d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2502,9 +2502,9 @@ class Analyzer( override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case append @ AppendData(table, query, _, isByName) if table.resolved && query.resolved && !append.outputResolved => + validateStoreAssignmentPolicy() val projection = - TableOutputResolver.resolveOutputColumns( - table.name, table.output, query, isByName, conf, storeAssignmentPolicy) + TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf) if (projection != query) { append.copy(query = projection) @@ -2514,9 +2514,9 @@ class Analyzer( case overwrite @ OverwriteByExpression(table, _, query, _, isByName) if table.resolved && query.resolved && !overwrite.outputResolved => + validateStoreAssignmentPolicy() val projection = - TableOutputResolver.resolveOutputColumns( - table.name, table.output, query, isByName, conf, storeAssignmentPolicy) + TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf) if (projection != query) { overwrite.copy(query = projection) @@ -2526,9 +2526,9 @@ class Analyzer( case overwrite @ OverwritePartitionsDynamic(table, query, _, isByName) if table.resolved && query.resolved && !overwrite.outputResolved => + validateStoreAssignmentPolicy() val projection = - TableOutputResolver.resolveOutputColumns( - table.name, table.output, query, isByName, conf, storeAssignmentPolicy) + TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf) if (projection != query) { overwrite.copy(query = projection) @@ -2538,16 +2538,14 @@ class Analyzer( } } - private def storeAssignmentPolicy: StoreAssignmentPolicy.Value = { - val policy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.STRICT) + private def validateStoreAssignmentPolicy(): Unit = { // SPARK-28730: LEGACY store assignment policy is disallowed in data source v2. - if (policy == StoreAssignmentPolicy.LEGACY) { + if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) { val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key throw new AnalysisException(s""" |"LEGACY" store assignment policy is disallowed in Spark data source V2. |Please set the configuration $configKey to other values.""".stripMargin) } - policy } private def commonNaturalJoinProcessing( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index e5d25547d4..4f33ca99c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -32,8 +32,7 @@ object TableOutputResolver { expected: Seq[Attribute], query: LogicalPlan, byName: Boolean, - conf: SQLConf, - storeAssignmentPolicy: StoreAssignmentPolicy.Value): LogicalPlan = { + conf: SQLConf): LogicalPlan = { if (expected.size < query.output.size) { throw new AnalysisException( @@ -47,8 +46,7 @@ object TableOutputResolver { expected.flatMap { tableAttr => query.resolve(Seq(tableAttr.name), conf.resolver) match { case Some(queryExpr) => - checkField( - tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err) + checkField(tableAttr, queryExpr, byName, conf, err => errors += err) case None => errors += s"Cannot find data for output column '${tableAttr.name}'" None @@ -66,8 +64,7 @@ object TableOutputResolver { query.output.zip(expected).flatMap { case (queryExpr, tableAttr) => - checkField( - tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err) + checkField(tableAttr, queryExpr, byName, conf, err => errors += err) } } @@ -88,9 +85,9 @@ object TableOutputResolver { queryExpr: NamedExpression, byName: Boolean, conf: SQLConf, - storeAssignmentPolicy: StoreAssignmentPolicy.Value, addError: String => Unit): Option[NamedExpression] = { + val storeAssignmentPolicy = conf.storeAssignmentPolicy lazy val outputField = if (tableAttr.dataType.sameType(queryExpr.dataType) && tableAttr.name == queryExpr.name && tableAttr.metadata == queryExpr.metadata) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 42b471f20f..d71f300dd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -165,6 +165,7 @@ object Cast { */ def canANSIStoreAssign(from: DataType, to: DataType): Boolean = (from, to) match { case _ if from == to => true + case (NullType, _) => true case (_: NumericType, _: NumericType) => true case (_: AtomicType, StringType) => true case (_: CalendarIntervalType, StringType) => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f00a4b545e..42e3beca2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1740,7 +1740,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(StoreAssignmentPolicy.values.map(_.toString)) - .createOptional + .createWithDefault(StoreAssignmentPolicy.ANSI.toString) val ANSI_ENABLED = buildConf("spark.sql.ansi.enabled") .doc("When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will " + @@ -2473,8 +2473,8 @@ class SQLConf extends Serializable with Logging { def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) - def storeAssignmentPolicy: Option[StoreAssignmentPolicy.Value] = - getConf(STORE_ASSIGNMENT_POLICY).map(StoreAssignmentPolicy.withName) + def storeAssignmentPolicy: StoreAssignmentPolicy.Value = + StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY)) def ansiEnabled: Boolean = getConf(ANSI_ENABLED) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 3a10a56f69..ad1d6b62ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -456,6 +456,8 @@ object DataType { true } + case (_: NullType, _) if storeAssignmentPolicy == ANSI => true + case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI => if (!Cast.canANSIStoreAssign(w, r)) { addError(s"Cannot safely cast '$context': $w to $r") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index 9d6827194f..c47332f5d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -76,6 +76,14 @@ class StrictDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBa assert(err.contains("Cannot safely cast")) } } + + test("Check NullType is incompatible with all other types") { + allNonNullTypes.foreach { t => + assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => + assert(err.contains(s"incompatible with $t")) + } + } + } } class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBaseSuite { @@ -145,6 +153,12 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase assert(err.contains("Cannot safely cast 'timestampToLong': TimestampType to LongType")) } } + + test("Check NullType is compatible with all other types") { + allNonNullTypes.foreach { t => + assertAllowed(NullType, t, "nulls", s"Should allow writing None to type $t") + } + } } abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { @@ -175,17 +189,9 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { private val nestedContainerTypes = Seq(ArrayType(point2, containsNull = false), MapType(StringType, point3, valueContainsNull = false)) - private val allNonNullTypes = Seq( + protected val allNonNullTypes = Seq( atomicTypes, simpleContainerTypes, nestedContainerTypes, Seq(CalendarIntervalType)).flatten - test("Check NullType is incompatible with all other types") { - allNonNullTypes.foreach { t => - assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => - assert(err.contains(s"incompatible with $t")) - } - } - } - test("Check each type with itself") { allNonNullTypes.foreach { t => assertAllowed(t, t, "t", s"Should allow writing type to itself $t") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 58e9f89418..95343e2872 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -189,14 +189,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi query } - // SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy. - // TODO: use ANSI store assignment policy by default in SPARK-28495. - val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY) c.copy( tableDesc = existingTable, query = Some(TableOutputResolver.resolveOutputColumns( tableDesc.qualifiedName, existingTable.schema.toAttributes, newQuery, - byName = true, conf, storeAssignmentPolicy))) + byName = true, conf))) // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity // config, and do various checks: @@ -402,11 +399,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { s"including ${staticPartCols.size} partition column(s) having constant value(s).") } - // SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy. - // TODO: use ANSI store assignment policy by default in SPARK-28495. - val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY) val newQuery = TableOutputResolver.resolveOutputColumns( - tblName, expectedColumns, insert.query, byName = false, conf, storeAssignmentPolicy) + tblName, expectedColumns, insert.query, byName = false, conf) if (normalizedPartSpec.nonEmpty) { if (normalizedPartSpec.size != partColNames.length) { throw new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 127deea1ce..075f6920b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -142,7 +142,19 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { /** List of test cases to ignore, in lower cases. */ protected def blackList: Set[String] = Set( - "blacklist.sql" // Do NOT remove this one. It is here to test the blacklist functionality. + "blacklist.sql", // Do NOT remove this one. It is here to test the blacklist functionality. + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + "postgreSQL/numeric.sql", + "postgreSQL/int2.sql", + "postgreSQL/int4.sql", + "postgreSQL/int8.sql", + "postgreSQL/float4.sql", + "postgreSQL/float8.sql", + // SPARK-28885 String value is not allowed to be stored as date/timestamp type with + // ANSI store assignment policy. + "postgreSQL/date.sql", + "postgreSQL/timestamp.sql" ) // Create all the test cases. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 70b1db8e5f..1634809601 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -150,9 +150,9 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { Seq(3 -> "c").toDF("i", "j").write.mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) - Seq("c" -> 3).toDF("i", "j").write.mode("append").saveAsTable("t") + Seq(3.5 -> 3).toDF("i", "j").write.mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") - :: Row(null, "3") :: Nil) + :: Row(3, "3") :: Nil) Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 55b361d5ac..1e27593584 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -346,7 +346,9 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } - test("SPARK-23340 Empty float/double array columns raise EOFException") { + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + ignore("SPARK-23340 Empty float/double array columns raise EOFException") { Seq(Seq(Array.empty[Float]).toDF(), Seq(Array.empty[Double]).toDF()).foreach { df => withTempPath { path => df.write.format("orc").save(path.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 88b94281d8..f38973f7df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -162,9 +162,9 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS test("SPARK-10634 timestamp written and read as INT64 - truncation") { withTable("ts") { sql("create table ts (c1 int, c2 timestamp) using parquet") - sql("insert into ts values (1, '2016-01-01 10:11:12.123456')") + sql("insert into ts values (1, timestamp'2016-01-01 10:11:12.123456')") sql("insert into ts values (2, null)") - sql("insert into ts values (3, '1965-01-01 10:11:12.123456')") + sql("insert into ts values (3, timestamp'1965-01-01 10:11:12.123456')") val expected = Seq( (1, "2016-01-01 10:11:12.123456"), (2, null), @@ -177,13 +177,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS withTable("ts") { withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") { sql("create table ts (c1 int, c2 timestamp) using parquet") - sql("insert into ts values (1, '2016-01-01 10:11:12.123456')") + sql("insert into ts values (1, timestamp'2016-01-01 10:11:12.123456')") sql("insert into ts values (2, null)") - sql("insert into ts values (3, '1965-01-01 10:11:12.125456')") - sql("insert into ts values (4, '1965-01-01 10:11:12.125')") - sql("insert into ts values (5, '1965-01-01 10:11:12.1')") - sql("insert into ts values (6, '1965-01-01 10:11:12.123456789')") - sql("insert into ts values (7, '0001-01-01 00:00:00.000000')") + sql("insert into ts values (3, timestamp'1965-01-01 10:11:12.125456')") + sql("insert into ts values (4, timestamp'1965-01-01 10:11:12.125')") + sql("insert into ts values (5, timestamp'1965-01-01 10:11:12.1')") + sql("insert into ts values (6, timestamp'1965-01-01 10:11:12.123456789')") + sql("insert into ts values (7, timestamp'0001-01-01 00:00:00.000000')") val expected = Seq( (1, "2016-01-01 10:11:12.123"), (2, null), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 5e853e666b..9e33b8aaec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -729,7 +729,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { spark.sessionState.catalog.createTable(newTable, false) sql("INSERT INTO TABLE test_table SELECT 1, 'a'") - sql("INSERT INTO TABLE test_table SELECT 2, null") + val msg = intercept[AnalysisException] { + sql("INSERT INTO TABLE test_table SELECT 2, null") + }.getMessage + assert(msg.contains("Cannot write nullable values to non-null column 's'")) } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 36fcde3598..799f00a28f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -92,6 +92,17 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { "date.sql", // SPARK-28620 "postgreSQL/float4.sql", + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + "postgreSQL/numeric.sql", + "postgreSQL/int2.sql", + "postgreSQL/int4.sql", + "postgreSQL/int8.sql", + "postgreSQL/float8.sql", + // SPARK-28885 String value is not allowed to be stored as date/timestamp type with + // ANSI store assignment policy. + "postgreSQL/date.sql", + "postgreSQL/timestamp.sql", // SPARK-28636 "decimalArithmeticOperations.sql", "literals.sql", diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 7a9f5c67fc..36c19c680d 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy /** * Runs the test cases that are included in the hive distribution. @@ -59,6 +60,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Ensures that cross joins are enabled so that we can test them TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true) + // Ensures that the table insertion behaivor is consistent with Hive + TestHive.setConf(SQLConf.STORE_ASSIGNMENT_POLICY, StoreAssignmentPolicy.LEGACY.toString) // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 1c82c7e86f..ac31557b94 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -908,7 +908,7 @@ class VersionsSuite extends SparkFunSuite with Logging { """.stripMargin ) - val errorMsg = "data type mismatch: cannot cast decimal(2,1) to binary" + val errorMsg = "Cannot safely cast 'f0': DecimalType(2,1) to BinaryType" if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 4253fe2e1e..6d12310714 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1828,10 +1828,10 @@ class HiveDDLSuite .write.format("hive").mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) - Seq("c" -> 3).toDF("i", "j") + Seq(3.5 -> 3).toDF("i", "j") .write.format("hive").mode("append").saveAsTable("t") checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") - :: Row(null, "3") :: Nil) + :: Row(3, "3") :: Nil) Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala index 25ff354418..f8ba7bf2c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeReadWriteSuite.scala @@ -65,7 +65,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 TIMESTAMP) STORED AS $fileFormat") hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11 15:50:00')") checkAnswer(spark.table("hive_serde"), Row(Timestamp.valueOf("2019-04-11 15:50:00"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12 15:50:00')") + spark.sql("INSERT INTO TABLE hive_serde values(TIMESTAMP('2019-04-12 15:50:00'))") checkAnswer( spark.table("hive_serde"), Seq(Row(Timestamp.valueOf("2019-04-11 15:50:00")), @@ -77,7 +77,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 DATE) STORED AS $fileFormat") hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11')") checkAnswer(spark.table("hive_serde"), Row(Date.valueOf("2019-04-11"))) - spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12')") + spark.sql("INSERT INTO TABLE hive_serde values(TIMESTAMP('2019-04-12'))") checkAnswer( spark.table("hive_serde"), Seq(Row(Date.valueOf("2019-04-11")), Row(Date.valueOf("2019-04-12")))) @@ -119,7 +119,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BINARY) STORED AS $fileFormat") hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('1')") checkAnswer(spark.table("hive_serde"), Row("1".getBytes)) - spark.sql("INSERT INTO TABLE hive_serde values('2')") + spark.sql("INSERT INTO TABLE hive_serde values(BINARY('2'))") checkAnswer(spark.table("hive_serde"), Seq(Row("1".getBytes), Row("2".getBytes))) } } @@ -168,6 +168,8 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS checkNumericTypes(fileFormat, "DECIMAL(38, 2)", 2.1D) // Date/Time Types + // SPARK-28885 String value is not allowed to be stored as date/timestamp type with + // ANSI store assignment policy. checkDateTimeTypes(fileFormat) // String Types diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 00333397e1..3c545c577f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -210,7 +210,10 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } - test("SPARK-23340 Empty float/double array columns raise EOFException") { + // SPARK-28885 String value is not allowed to be stored as numeric type with + // ANSI store assignment policy. + // TODO: re-enable the test case when SPARK-29462 is fixed. + ignore("SPARK-23340 Empty float/double array columns raise EOFException") { withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { withTable("spark_23340") { sql("CREATE TABLE spark_23340(a array, b array) STORED AS ORC")