[SPARK-28885][SQL] Follow ANSI store assignment rules in table insertion by default
### What changes were proposed in this pull request? When inserting a value into a column with the different data type, Spark performs type coercion. Currently, we support 3 policies for the store assignment rules: ANSI, legacy and strict, which can be set via the option "spark.sql.storeAssignmentPolicy": 1. ANSI: Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean`. It will throw a runtime exception if the value is out-of-range(overflow). 2. Legacy: Spark allows the type coercion as long as it is a valid `Cast`, which is very loose. E.g., converting either `string` to `int` or `double` to `boolean` is allowed. It is the current behavior in Spark 2.x for compatibility with Hive. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of Byte type, the result is 1. 3. Strict: Spark doesn't allow any possible precision loss or data truncation in store assignment, e.g., converting either `double` to `int` or `decimal` to `double` is allowed. The rules are originally for Dataset encoder. As far as I know, no mainstream DBMS is using this policy by default. Currently, the V1 data source uses "Legacy" policy by default, while V2 uses "Strict". This proposal is to use "ANSI" policy by default for both V1 and V2 in Spark 3.0. ### Why are the changes needed? Following the ANSI SQL standard is most reasonable among the 3 policies. ### Does this PR introduce any user-facing change? Yes. The default store assignment policy is ANSI for both V1 and V2 data sources. ### How was this patch tested? Unit test Closes #26107 from gengliangwang/ansiPolicyAsDefault. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
2e28622d8a
commit
322ec0ba9b
|
@ -23,6 +23,7 @@ license: |
|
||||||
{:toc}
|
{:toc}
|
||||||
|
|
||||||
## Upgrading from Spark SQL 2.4 to 3.0
|
## Upgrading from Spark SQL 2.4 to 3.0
|
||||||
|
- Since Spark 3.0, when inserting a value into a table column with a different data type, the type coercion is performed as per ANSI SQL standard. Certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean` are disallowed. A runtime exception will be thrown if the value is out-of-range for the data type of the column. In Spark version 2.4 and earlier, type conversions during table insertion are allowed as long as they are valid `Cast`. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of byte type, the result is 1. The behavior is controlled by the option `spark.sql.storeAssignmentPolicy`, with a default value as "ANSI". Setting the option as "Legacy" restores the previous behavior.
|
||||||
|
|
||||||
- In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`.
|
- In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`.
|
||||||
|
|
||||||
|
|
|
@ -2502,9 +2502,9 @@ class Analyzer(
|
||||||
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
|
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
|
||||||
case append @ AppendData(table, query, _, isByName)
|
case append @ AppendData(table, query, _, isByName)
|
||||||
if table.resolved && query.resolved && !append.outputResolved =>
|
if table.resolved && query.resolved && !append.outputResolved =>
|
||||||
|
validateStoreAssignmentPolicy()
|
||||||
val projection =
|
val projection =
|
||||||
TableOutputResolver.resolveOutputColumns(
|
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
|
||||||
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
|
|
||||||
|
|
||||||
if (projection != query) {
|
if (projection != query) {
|
||||||
append.copy(query = projection)
|
append.copy(query = projection)
|
||||||
|
@ -2514,9 +2514,9 @@ class Analyzer(
|
||||||
|
|
||||||
case overwrite @ OverwriteByExpression(table, _, query, _, isByName)
|
case overwrite @ OverwriteByExpression(table, _, query, _, isByName)
|
||||||
if table.resolved && query.resolved && !overwrite.outputResolved =>
|
if table.resolved && query.resolved && !overwrite.outputResolved =>
|
||||||
|
validateStoreAssignmentPolicy()
|
||||||
val projection =
|
val projection =
|
||||||
TableOutputResolver.resolveOutputColumns(
|
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
|
||||||
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
|
|
||||||
|
|
||||||
if (projection != query) {
|
if (projection != query) {
|
||||||
overwrite.copy(query = projection)
|
overwrite.copy(query = projection)
|
||||||
|
@ -2526,9 +2526,9 @@ class Analyzer(
|
||||||
|
|
||||||
case overwrite @ OverwritePartitionsDynamic(table, query, _, isByName)
|
case overwrite @ OverwritePartitionsDynamic(table, query, _, isByName)
|
||||||
if table.resolved && query.resolved && !overwrite.outputResolved =>
|
if table.resolved && query.resolved && !overwrite.outputResolved =>
|
||||||
|
validateStoreAssignmentPolicy()
|
||||||
val projection =
|
val projection =
|
||||||
TableOutputResolver.resolveOutputColumns(
|
TableOutputResolver.resolveOutputColumns(table.name, table.output, query, isByName, conf)
|
||||||
table.name, table.output, query, isByName, conf, storeAssignmentPolicy)
|
|
||||||
|
|
||||||
if (projection != query) {
|
if (projection != query) {
|
||||||
overwrite.copy(query = projection)
|
overwrite.copy(query = projection)
|
||||||
|
@ -2538,16 +2538,14 @@ class Analyzer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def storeAssignmentPolicy: StoreAssignmentPolicy.Value = {
|
private def validateStoreAssignmentPolicy(): Unit = {
|
||||||
val policy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.STRICT)
|
|
||||||
// SPARK-28730: LEGACY store assignment policy is disallowed in data source v2.
|
// SPARK-28730: LEGACY store assignment policy is disallowed in data source v2.
|
||||||
if (policy == StoreAssignmentPolicy.LEGACY) {
|
if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
|
||||||
val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key
|
val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key
|
||||||
throw new AnalysisException(s"""
|
throw new AnalysisException(s"""
|
||||||
|"LEGACY" store assignment policy is disallowed in Spark data source V2.
|
|"LEGACY" store assignment policy is disallowed in Spark data source V2.
|
||||||
|Please set the configuration $configKey to other values.""".stripMargin)
|
|Please set the configuration $configKey to other values.""".stripMargin)
|
||||||
}
|
}
|
||||||
policy
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def commonNaturalJoinProcessing(
|
private def commonNaturalJoinProcessing(
|
||||||
|
|
|
@ -32,8 +32,7 @@ object TableOutputResolver {
|
||||||
expected: Seq[Attribute],
|
expected: Seq[Attribute],
|
||||||
query: LogicalPlan,
|
query: LogicalPlan,
|
||||||
byName: Boolean,
|
byName: Boolean,
|
||||||
conf: SQLConf,
|
conf: SQLConf): LogicalPlan = {
|
||||||
storeAssignmentPolicy: StoreAssignmentPolicy.Value): LogicalPlan = {
|
|
||||||
|
|
||||||
if (expected.size < query.output.size) {
|
if (expected.size < query.output.size) {
|
||||||
throw new AnalysisException(
|
throw new AnalysisException(
|
||||||
|
@ -47,8 +46,7 @@ object TableOutputResolver {
|
||||||
expected.flatMap { tableAttr =>
|
expected.flatMap { tableAttr =>
|
||||||
query.resolve(Seq(tableAttr.name), conf.resolver) match {
|
query.resolve(Seq(tableAttr.name), conf.resolver) match {
|
||||||
case Some(queryExpr) =>
|
case Some(queryExpr) =>
|
||||||
checkField(
|
checkField(tableAttr, queryExpr, byName, conf, err => errors += err)
|
||||||
tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err)
|
|
||||||
case None =>
|
case None =>
|
||||||
errors += s"Cannot find data for output column '${tableAttr.name}'"
|
errors += s"Cannot find data for output column '${tableAttr.name}'"
|
||||||
None
|
None
|
||||||
|
@ -66,8 +64,7 @@ object TableOutputResolver {
|
||||||
|
|
||||||
query.output.zip(expected).flatMap {
|
query.output.zip(expected).flatMap {
|
||||||
case (queryExpr, tableAttr) =>
|
case (queryExpr, tableAttr) =>
|
||||||
checkField(
|
checkField(tableAttr, queryExpr, byName, conf, err => errors += err)
|
||||||
tableAttr, queryExpr, byName, conf, storeAssignmentPolicy, err => errors += err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,9 +85,9 @@ object TableOutputResolver {
|
||||||
queryExpr: NamedExpression,
|
queryExpr: NamedExpression,
|
||||||
byName: Boolean,
|
byName: Boolean,
|
||||||
conf: SQLConf,
|
conf: SQLConf,
|
||||||
storeAssignmentPolicy: StoreAssignmentPolicy.Value,
|
|
||||||
addError: String => Unit): Option[NamedExpression] = {
|
addError: String => Unit): Option[NamedExpression] = {
|
||||||
|
|
||||||
|
val storeAssignmentPolicy = conf.storeAssignmentPolicy
|
||||||
lazy val outputField = if (tableAttr.dataType.sameType(queryExpr.dataType) &&
|
lazy val outputField = if (tableAttr.dataType.sameType(queryExpr.dataType) &&
|
||||||
tableAttr.name == queryExpr.name &&
|
tableAttr.name == queryExpr.name &&
|
||||||
tableAttr.metadata == queryExpr.metadata) {
|
tableAttr.metadata == queryExpr.metadata) {
|
||||||
|
|
|
@ -165,6 +165,7 @@ object Cast {
|
||||||
*/
|
*/
|
||||||
def canANSIStoreAssign(from: DataType, to: DataType): Boolean = (from, to) match {
|
def canANSIStoreAssign(from: DataType, to: DataType): Boolean = (from, to) match {
|
||||||
case _ if from == to => true
|
case _ if from == to => true
|
||||||
|
case (NullType, _) => true
|
||||||
case (_: NumericType, _: NumericType) => true
|
case (_: NumericType, _: NumericType) => true
|
||||||
case (_: AtomicType, StringType) => true
|
case (_: AtomicType, StringType) => true
|
||||||
case (_: CalendarIntervalType, StringType) => true
|
case (_: CalendarIntervalType, StringType) => true
|
||||||
|
|
|
@ -1740,7 +1740,7 @@ object SQLConf {
|
||||||
.stringConf
|
.stringConf
|
||||||
.transform(_.toUpperCase(Locale.ROOT))
|
.transform(_.toUpperCase(Locale.ROOT))
|
||||||
.checkValues(StoreAssignmentPolicy.values.map(_.toString))
|
.checkValues(StoreAssignmentPolicy.values.map(_.toString))
|
||||||
.createOptional
|
.createWithDefault(StoreAssignmentPolicy.ANSI.toString)
|
||||||
|
|
||||||
val ANSI_ENABLED = buildConf("spark.sql.ansi.enabled")
|
val ANSI_ENABLED = buildConf("spark.sql.ansi.enabled")
|
||||||
.doc("When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will " +
|
.doc("When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will " +
|
||||||
|
@ -2473,8 +2473,8 @@ class SQLConf extends Serializable with Logging {
|
||||||
def partitionOverwriteMode: PartitionOverwriteMode.Value =
|
def partitionOverwriteMode: PartitionOverwriteMode.Value =
|
||||||
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
|
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
|
||||||
|
|
||||||
def storeAssignmentPolicy: Option[StoreAssignmentPolicy.Value] =
|
def storeAssignmentPolicy: StoreAssignmentPolicy.Value =
|
||||||
getConf(STORE_ASSIGNMENT_POLICY).map(StoreAssignmentPolicy.withName)
|
StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY))
|
||||||
|
|
||||||
def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
|
def ansiEnabled: Boolean = getConf(ANSI_ENABLED)
|
||||||
|
|
||||||
|
|
|
@ -456,6 +456,8 @@ object DataType {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case (_: NullType, _) if storeAssignmentPolicy == ANSI => true
|
||||||
|
|
||||||
case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI =>
|
case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI =>
|
||||||
if (!Cast.canANSIStoreAssign(w, r)) {
|
if (!Cast.canANSIStoreAssign(w, r)) {
|
||||||
addError(s"Cannot safely cast '$context': $w to $r")
|
addError(s"Cannot safely cast '$context': $w to $r")
|
||||||
|
|
|
@ -76,6 +76,14 @@ class StrictDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBa
|
||||||
assert(err.contains("Cannot safely cast"))
|
assert(err.contains("Cannot safely cast"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Check NullType is incompatible with all other types") {
|
||||||
|
allNonNullTypes.foreach { t =>
|
||||||
|
assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err =>
|
||||||
|
assert(err.contains(s"incompatible with $t"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBaseSuite {
|
class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBaseSuite {
|
||||||
|
@ -145,6 +153,12 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase
|
||||||
assert(err.contains("Cannot safely cast 'timestampToLong': TimestampType to LongType"))
|
assert(err.contains("Cannot safely cast 'timestampToLong': TimestampType to LongType"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Check NullType is compatible with all other types") {
|
||||||
|
allNonNullTypes.foreach { t =>
|
||||||
|
assertAllowed(NullType, t, "nulls", s"Should allow writing None to type $t")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite {
|
abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite {
|
||||||
|
@ -175,17 +189,9 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite {
|
||||||
private val nestedContainerTypes = Seq(ArrayType(point2, containsNull = false),
|
private val nestedContainerTypes = Seq(ArrayType(point2, containsNull = false),
|
||||||
MapType(StringType, point3, valueContainsNull = false))
|
MapType(StringType, point3, valueContainsNull = false))
|
||||||
|
|
||||||
private val allNonNullTypes = Seq(
|
protected val allNonNullTypes = Seq(
|
||||||
atomicTypes, simpleContainerTypes, nestedContainerTypes, Seq(CalendarIntervalType)).flatten
|
atomicTypes, simpleContainerTypes, nestedContainerTypes, Seq(CalendarIntervalType)).flatten
|
||||||
|
|
||||||
test("Check NullType is incompatible with all other types") {
|
|
||||||
allNonNullTypes.foreach { t =>
|
|
||||||
assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err =>
|
|
||||||
assert(err.contains(s"incompatible with $t"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
test("Check each type with itself") {
|
test("Check each type with itself") {
|
||||||
allNonNullTypes.foreach { t =>
|
allNonNullTypes.foreach { t =>
|
||||||
assertAllowed(t, t, "t", s"Should allow writing type to itself $t")
|
assertAllowed(t, t, "t", s"Should allow writing type to itself $t")
|
||||||
|
|
|
@ -189,14 +189,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
|
||||||
query
|
query
|
||||||
}
|
}
|
||||||
|
|
||||||
// SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy.
|
|
||||||
// TODO: use ANSI store assignment policy by default in SPARK-28495.
|
|
||||||
val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY)
|
|
||||||
c.copy(
|
c.copy(
|
||||||
tableDesc = existingTable,
|
tableDesc = existingTable,
|
||||||
query = Some(TableOutputResolver.resolveOutputColumns(
|
query = Some(TableOutputResolver.resolveOutputColumns(
|
||||||
tableDesc.qualifiedName, existingTable.schema.toAttributes, newQuery,
|
tableDesc.qualifiedName, existingTable.schema.toAttributes, newQuery,
|
||||||
byName = true, conf, storeAssignmentPolicy)))
|
byName = true, conf)))
|
||||||
|
|
||||||
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
|
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
|
||||||
// config, and do various checks:
|
// config, and do various checks:
|
||||||
|
@ -402,11 +399,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
|
||||||
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
|
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
|
||||||
}
|
}
|
||||||
|
|
||||||
// SPARK-28730: for V1 data source, we use the "LEGACY" as default store assignment policy.
|
|
||||||
// TODO: use ANSI store assignment policy by default in SPARK-28495.
|
|
||||||
val storeAssignmentPolicy = conf.storeAssignmentPolicy.getOrElse(StoreAssignmentPolicy.LEGACY)
|
|
||||||
val newQuery = TableOutputResolver.resolveOutputColumns(
|
val newQuery = TableOutputResolver.resolveOutputColumns(
|
||||||
tblName, expectedColumns, insert.query, byName = false, conf, storeAssignmentPolicy)
|
tblName, expectedColumns, insert.query, byName = false, conf)
|
||||||
if (normalizedPartSpec.nonEmpty) {
|
if (normalizedPartSpec.nonEmpty) {
|
||||||
if (normalizedPartSpec.size != partColNames.length) {
|
if (normalizedPartSpec.size != partColNames.length) {
|
||||||
throw new AnalysisException(
|
throw new AnalysisException(
|
||||||
|
|
|
@ -142,7 +142,19 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
|
||||||
|
|
||||||
/** List of test cases to ignore, in lower cases. */
|
/** List of test cases to ignore, in lower cases. */
|
||||||
protected def blackList: Set[String] = Set(
|
protected def blackList: Set[String] = Set(
|
||||||
"blacklist.sql" // Do NOT remove this one. It is here to test the blacklist functionality.
|
"blacklist.sql", // Do NOT remove this one. It is here to test the blacklist functionality.
|
||||||
|
// SPARK-28885 String value is not allowed to be stored as numeric type with
|
||||||
|
// ANSI store assignment policy.
|
||||||
|
"postgreSQL/numeric.sql",
|
||||||
|
"postgreSQL/int2.sql",
|
||||||
|
"postgreSQL/int4.sql",
|
||||||
|
"postgreSQL/int8.sql",
|
||||||
|
"postgreSQL/float4.sql",
|
||||||
|
"postgreSQL/float8.sql",
|
||||||
|
// SPARK-28885 String value is not allowed to be stored as date/timestamp type with
|
||||||
|
// ANSI store assignment policy.
|
||||||
|
"postgreSQL/date.sql",
|
||||||
|
"postgreSQL/timestamp.sql"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Create all the test cases.
|
// Create all the test cases.
|
||||||
|
|
|
@ -150,9 +150,9 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
|
||||||
Seq(3 -> "c").toDF("i", "j").write.mode("append").saveAsTable("t")
|
Seq(3 -> "c").toDF("i", "j").write.mode("append").saveAsTable("t")
|
||||||
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
|
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
|
||||||
|
|
||||||
Seq("c" -> 3).toDF("i", "j").write.mode("append").saveAsTable("t")
|
Seq(3.5 -> 3).toDF("i", "j").write.mode("append").saveAsTable("t")
|
||||||
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
|
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
|
||||||
:: Row(null, "3") :: Nil)
|
:: Row(3, "3") :: Nil)
|
||||||
|
|
||||||
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
|
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
|
||||||
|
|
||||||
|
|
|
@ -346,7 +346,9 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-23340 Empty float/double array columns raise EOFException") {
|
// SPARK-28885 String value is not allowed to be stored as numeric type with
|
||||||
|
// ANSI store assignment policy.
|
||||||
|
ignore("SPARK-23340 Empty float/double array columns raise EOFException") {
|
||||||
Seq(Seq(Array.empty[Float]).toDF(), Seq(Array.empty[Double]).toDF()).foreach { df =>
|
Seq(Seq(Array.empty[Float]).toDF(), Seq(Array.empty[Double]).toDF()).foreach { df =>
|
||||||
withTempPath { path =>
|
withTempPath { path =>
|
||||||
df.write.format("orc").save(path.getCanonicalPath)
|
df.write.format("orc").save(path.getCanonicalPath)
|
||||||
|
|
|
@ -162,9 +162,9 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
|
||||||
test("SPARK-10634 timestamp written and read as INT64 - truncation") {
|
test("SPARK-10634 timestamp written and read as INT64 - truncation") {
|
||||||
withTable("ts") {
|
withTable("ts") {
|
||||||
sql("create table ts (c1 int, c2 timestamp) using parquet")
|
sql("create table ts (c1 int, c2 timestamp) using parquet")
|
||||||
sql("insert into ts values (1, '2016-01-01 10:11:12.123456')")
|
sql("insert into ts values (1, timestamp'2016-01-01 10:11:12.123456')")
|
||||||
sql("insert into ts values (2, null)")
|
sql("insert into ts values (2, null)")
|
||||||
sql("insert into ts values (3, '1965-01-01 10:11:12.123456')")
|
sql("insert into ts values (3, timestamp'1965-01-01 10:11:12.123456')")
|
||||||
val expected = Seq(
|
val expected = Seq(
|
||||||
(1, "2016-01-01 10:11:12.123456"),
|
(1, "2016-01-01 10:11:12.123456"),
|
||||||
(2, null),
|
(2, null),
|
||||||
|
@ -177,13 +177,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
|
||||||
withTable("ts") {
|
withTable("ts") {
|
||||||
withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
|
withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
|
||||||
sql("create table ts (c1 int, c2 timestamp) using parquet")
|
sql("create table ts (c1 int, c2 timestamp) using parquet")
|
||||||
sql("insert into ts values (1, '2016-01-01 10:11:12.123456')")
|
sql("insert into ts values (1, timestamp'2016-01-01 10:11:12.123456')")
|
||||||
sql("insert into ts values (2, null)")
|
sql("insert into ts values (2, null)")
|
||||||
sql("insert into ts values (3, '1965-01-01 10:11:12.125456')")
|
sql("insert into ts values (3, timestamp'1965-01-01 10:11:12.125456')")
|
||||||
sql("insert into ts values (4, '1965-01-01 10:11:12.125')")
|
sql("insert into ts values (4, timestamp'1965-01-01 10:11:12.125')")
|
||||||
sql("insert into ts values (5, '1965-01-01 10:11:12.1')")
|
sql("insert into ts values (5, timestamp'1965-01-01 10:11:12.1')")
|
||||||
sql("insert into ts values (6, '1965-01-01 10:11:12.123456789')")
|
sql("insert into ts values (6, timestamp'1965-01-01 10:11:12.123456789')")
|
||||||
sql("insert into ts values (7, '0001-01-01 00:00:00.000000')")
|
sql("insert into ts values (7, timestamp'0001-01-01 00:00:00.000000')")
|
||||||
val expected = Seq(
|
val expected = Seq(
|
||||||
(1, "2016-01-01 10:11:12.123"),
|
(1, "2016-01-01 10:11:12.123"),
|
||||||
(2, null),
|
(2, null),
|
||||||
|
|
|
@ -729,7 +729,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
|
||||||
spark.sessionState.catalog.createTable(newTable, false)
|
spark.sessionState.catalog.createTable(newTable, false)
|
||||||
|
|
||||||
sql("INSERT INTO TABLE test_table SELECT 1, 'a'")
|
sql("INSERT INTO TABLE test_table SELECT 1, 'a'")
|
||||||
sql("INSERT INTO TABLE test_table SELECT 2, null")
|
val msg = intercept[AnalysisException] {
|
||||||
|
sql("INSERT INTO TABLE test_table SELECT 2, null")
|
||||||
|
}.getMessage
|
||||||
|
assert(msg.contains("Cannot write nullable values to non-null column 's'"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,6 +92,17 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite {
|
||||||
"date.sql",
|
"date.sql",
|
||||||
// SPARK-28620
|
// SPARK-28620
|
||||||
"postgreSQL/float4.sql",
|
"postgreSQL/float4.sql",
|
||||||
|
// SPARK-28885 String value is not allowed to be stored as numeric type with
|
||||||
|
// ANSI store assignment policy.
|
||||||
|
"postgreSQL/numeric.sql",
|
||||||
|
"postgreSQL/int2.sql",
|
||||||
|
"postgreSQL/int4.sql",
|
||||||
|
"postgreSQL/int8.sql",
|
||||||
|
"postgreSQL/float8.sql",
|
||||||
|
// SPARK-28885 String value is not allowed to be stored as date/timestamp type with
|
||||||
|
// ANSI store assignment policy.
|
||||||
|
"postgreSQL/date.sql",
|
||||||
|
"postgreSQL/timestamp.sql",
|
||||||
// SPARK-28636
|
// SPARK-28636
|
||||||
"decimalArithmeticOperations.sql",
|
"decimalArithmeticOperations.sql",
|
||||||
"literals.sql",
|
"literals.sql",
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
|
||||||
import org.apache.spark.sql.hive.HiveUtils
|
import org.apache.spark.sql.hive.HiveUtils
|
||||||
import org.apache.spark.sql.hive.test.TestHive
|
import org.apache.spark.sql.hive.test.TestHive
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
|
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the test cases that are included in the hive distribution.
|
* Runs the test cases that are included in the hive distribution.
|
||||||
|
@ -59,6 +60,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
|
||||||
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
|
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
|
||||||
// Ensures that cross joins are enabled so that we can test them
|
// Ensures that cross joins are enabled so that we can test them
|
||||||
TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true)
|
TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true)
|
||||||
|
// Ensures that the table insertion behaivor is consistent with Hive
|
||||||
|
TestHive.setConf(SQLConf.STORE_ASSIGNMENT_POLICY, StoreAssignmentPolicy.LEGACY.toString)
|
||||||
// Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
|
// Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
|
||||||
// (timestamp_*)
|
// (timestamp_*)
|
||||||
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
|
TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
|
||||||
|
|
|
@ -908,7 +908,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
|
||||||
""".stripMargin
|
""".stripMargin
|
||||||
)
|
)
|
||||||
|
|
||||||
val errorMsg = "data type mismatch: cannot cast decimal(2,1) to binary"
|
val errorMsg = "Cannot safely cast 'f0': DecimalType(2,1) to BinaryType"
|
||||||
|
|
||||||
if (isPartitioned) {
|
if (isPartitioned) {
|
||||||
val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3"
|
val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3"
|
||||||
|
|
|
@ -1828,10 +1828,10 @@ class HiveDDLSuite
|
||||||
.write.format("hive").mode("append").saveAsTable("t")
|
.write.format("hive").mode("append").saveAsTable("t")
|
||||||
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
|
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
|
||||||
|
|
||||||
Seq("c" -> 3).toDF("i", "j")
|
Seq(3.5 -> 3).toDF("i", "j")
|
||||||
.write.format("hive").mode("append").saveAsTable("t")
|
.write.format("hive").mode("append").saveAsTable("t")
|
||||||
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
|
checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
|
||||||
:: Row(null, "3") :: Nil)
|
:: Row(3, "3") :: Nil)
|
||||||
|
|
||||||
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
|
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS
|
||||||
hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 TIMESTAMP) STORED AS $fileFormat")
|
hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 TIMESTAMP) STORED AS $fileFormat")
|
||||||
hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11 15:50:00')")
|
hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11 15:50:00')")
|
||||||
checkAnswer(spark.table("hive_serde"), Row(Timestamp.valueOf("2019-04-11 15:50:00")))
|
checkAnswer(spark.table("hive_serde"), Row(Timestamp.valueOf("2019-04-11 15:50:00")))
|
||||||
spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12 15:50:00')")
|
spark.sql("INSERT INTO TABLE hive_serde values(TIMESTAMP('2019-04-12 15:50:00'))")
|
||||||
checkAnswer(
|
checkAnswer(
|
||||||
spark.table("hive_serde"),
|
spark.table("hive_serde"),
|
||||||
Seq(Row(Timestamp.valueOf("2019-04-11 15:50:00")),
|
Seq(Row(Timestamp.valueOf("2019-04-11 15:50:00")),
|
||||||
|
@ -77,7 +77,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS
|
||||||
hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 DATE) STORED AS $fileFormat")
|
hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 DATE) STORED AS $fileFormat")
|
||||||
hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11')")
|
hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('2019-04-11')")
|
||||||
checkAnswer(spark.table("hive_serde"), Row(Date.valueOf("2019-04-11")))
|
checkAnswer(spark.table("hive_serde"), Row(Date.valueOf("2019-04-11")))
|
||||||
spark.sql("INSERT INTO TABLE hive_serde values('2019-04-12')")
|
spark.sql("INSERT INTO TABLE hive_serde values(TIMESTAMP('2019-04-12'))")
|
||||||
checkAnswer(
|
checkAnswer(
|
||||||
spark.table("hive_serde"),
|
spark.table("hive_serde"),
|
||||||
Seq(Row(Date.valueOf("2019-04-11")), Row(Date.valueOf("2019-04-12"))))
|
Seq(Row(Date.valueOf("2019-04-11")), Row(Date.valueOf("2019-04-12"))))
|
||||||
|
@ -119,7 +119,7 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS
|
||||||
hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BINARY) STORED AS $fileFormat")
|
hiveClient.runSqlHive(s"CREATE TABLE hive_serde (c1 BINARY) STORED AS $fileFormat")
|
||||||
hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('1')")
|
hiveClient.runSqlHive("INSERT INTO TABLE hive_serde values('1')")
|
||||||
checkAnswer(spark.table("hive_serde"), Row("1".getBytes))
|
checkAnswer(spark.table("hive_serde"), Row("1".getBytes))
|
||||||
spark.sql("INSERT INTO TABLE hive_serde values('2')")
|
spark.sql("INSERT INTO TABLE hive_serde values(BINARY('2'))")
|
||||||
checkAnswer(spark.table("hive_serde"), Seq(Row("1".getBytes), Row("2".getBytes)))
|
checkAnswer(spark.table("hive_serde"), Seq(Row("1".getBytes), Row("2".getBytes)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -168,6 +168,8 @@ class HiveSerDeReadWriteSuite extends QueryTest with SQLTestUtils with TestHiveS
|
||||||
checkNumericTypes(fileFormat, "DECIMAL(38, 2)", 2.1D)
|
checkNumericTypes(fileFormat, "DECIMAL(38, 2)", 2.1D)
|
||||||
|
|
||||||
// Date/Time Types
|
// Date/Time Types
|
||||||
|
// SPARK-28885 String value is not allowed to be stored as date/timestamp type with
|
||||||
|
// ANSI store assignment policy.
|
||||||
checkDateTimeTypes(fileFormat)
|
checkDateTimeTypes(fileFormat)
|
||||||
|
|
||||||
// String Types
|
// String Types
|
||||||
|
|
|
@ -210,7 +210,10 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-23340 Empty float/double array columns raise EOFException") {
|
// SPARK-28885 String value is not allowed to be stored as numeric type with
|
||||||
|
// ANSI store assignment policy.
|
||||||
|
// TODO: re-enable the test case when SPARK-29462 is fixed.
|
||||||
|
ignore("SPARK-23340 Empty float/double array columns raise EOFException") {
|
||||||
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") {
|
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "false") {
|
||||||
withTable("spark_23340") {
|
withTable("spark_23340") {
|
||||||
sql("CREATE TABLE spark_23340(a array<float>, b array<double>) STORED AS ORC")
|
sql("CREATE TABLE spark_23340(a array<float>, b array<double>) STORED AS ORC")
|
||||||
|
|
Loading…
Reference in a new issue