[SPARK-33591][SQL] Recognize null in partition spec values

### What changes were proposed in this pull request?
1. Recognize `null` while parsing partition specs, and put `null` instead of `"null"` as partition values.
2. For V1 catalog: replace `null` by `__HIVE_DEFAULT_PARTITION__`.
3. For V2 catalogs: pass `null` AS IS, and let catalog implementations to decide how to handle `null`s as partition values in spec.

### Why are the changes needed?
Currently, `null` in partition specs is recognized as the `"null"` string which could lead to incorrect results, for example:
```sql
spark-sql> CREATE TABLE tbl5 (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1);
spark-sql> INSERT INTO TABLE tbl5 PARTITION (p1 = null) SELECT 0;
spark-sql> SELECT isnull(p1) FROM tbl5;
false
```
Even we inserted a row to the partition with the `null` value, **the resulted table doesn't contain `null`**.

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the example above works as expected:
```sql
spark-sql> SELECT isnull(p1) FROM tbl5;
true
```

### How was this patch tested?
1. By running the affected test suites `SQLQuerySuite`, `AlterTablePartitionV2SQLSuite` and `v1/ShowPartitionsSuite`.
2. Compiling by Scala 2.13:
```
$  ./dev/change-scala-version.sh 2.13
$ ./build/sbt -Pscala-2.13 compile
```

Closes #30538 from MaxGekk/partition-spec-value-null.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2021-01-08 14:14:27 +00:00 committed by Wenchen Fan
parent 71d261ab8f
commit 157b72ac9f
12 changed files with 74 additions and 19 deletions

View file

@ -161,6 +161,10 @@ object ExternalCatalogUtils {
}
}
private def isNullPartitionValue(value: String): Boolean = {
value == null || value == DEFAULT_PARTITION_NAME
}
/**
* Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a
* partial partition spec w.r.t. PARTITION (a=1,b=2).
@ -169,9 +173,15 @@ object ExternalCatalogUtils {
spec1: TablePartitionSpec,
spec2: TablePartitionSpec): Boolean = {
spec1.forall {
case (partitionColumn, value) if isNullPartitionValue(value) =>
isNullPartitionValue(spec2(partitionColumn))
case (partitionColumn, value) => spec2(partitionColumn) == value
}
}
def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = {
spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap
}
}
object CatalogUtils {

View file

@ -541,7 +541,12 @@ class InMemoryCatalog(
listPartitions(db, table, partialSpec).map { partition =>
partitionColumnNames.map { name =>
escapePathName(name) + "=" + escapePathName(partition.spec(name))
val partValue = if (partition.spec(name) == null) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(partition.spec(name))
}
escapePathName(name) + "=" + partValue
}.mkString("/")
}.sorted
}

View file

@ -1178,7 +1178,7 @@ class SessionCatalog(
*/
private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
specs.foreach { s =>
if (s.values.exists(_.isEmpty)) {
if (s.values.exists(v => v != null && v.isEmpty)) {
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw QueryCompilationErrors.invalidPartitionSpecError(
s"The spec ($spec) contains an empty partition column value")

View file

@ -511,6 +511,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
protected def visitStringConstant(ctx: ConstantContext): String = withOrigin(ctx) {
ctx match {
case _: NullLiteralContext => null
case s: StringLiteralContext => createString(s)
case o => o.getText
}

View file

@ -406,7 +406,8 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
catalogTable.get.tracksPartitionsInCatalog
if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
// empty partition column value
if (normalizedPartSpec.filter(_._2.isDefined).exists(_._2.get.isEmpty)) {
if (normalizedPartSpec.map(_._2)
.filter(_.isDefined).map(_.get).exists(v => v != null && v.isEmpty)) {
val spec = normalizedPartSpec.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")

View file

@ -3854,6 +3854,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
assert(unions.size == 1)
}
test("SPARK-33591: null as a partition value") {
val t = "part_table"
withTable(t) {
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) USING PARQUET PARTITIONED BY (p1)")
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
checkAnswer(sql(s"SELECT * FROM $t"), Row(0, null))
}
}
}
case class Foo(bar: Option[String])

View file

@ -39,6 +39,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
override val command = "ALTER TABLE .. DROP PARTITION"
protected def notFullPartitionSpecErr: String
protected def nullPartitionValue: String
protected def checkDropPartition(
t: String,
@ -170,4 +171,14 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 1)))
}
}
test("SPARK-33591: null as a partition value") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
sql(s"ALTER TABLE $t ADD PARTITION (p1 = null)")
checkPartitions(t, Map("p1" -> nullPartitionValue))
sql(s"ALTER TABLE $t DROP PARTITION (p1 = null)")
checkPartitions(t)
}
}
}

View file

@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.command
*/
trait AlterTableDropPartitionSuiteBase extends command.AlterTableDropPartitionSuiteBase {
override protected val notFullPartitionSpecErr = "The following partitions not found in table"
override protected def nullPartitionValue: String = "__HIVE_DEFAULT_PARTITION__"
test("purge partition data") {
withNamespaceAndTable("ns", "tbl") { t =>

View file

@ -69,6 +69,18 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase {
assert(errMsg.contains("'SHOW PARTITIONS' expects a table"))
}
}
test("SPARK-33591: null as a partition value") {
val t = "part_table"
withTable(t) {
sql(s"CREATE TABLE $t (col1 INT, p1 STRING) $defaultUsing PARTITIONED BY (p1)")
sql(s"INSERT INTO TABLE $t PARTITION (p1 = null) SELECT 0")
checkAnswer(sql(s"SHOW PARTITIONS $t"), Row("p1=__HIVE_DEFAULT_PARTITION__"))
checkAnswer(
sql(s"SHOW PARTITIONS $t PARTITION (p1 = null)"),
Row("p1=__HIVE_DEFAULT_PARTITION__"))
}
}
}
/**

View file

@ -27,8 +27,8 @@ import org.apache.spark.sql.execution.command
class AlterTableDropPartitionSuite
extends command.AlterTableDropPartitionSuiteBase
with CommandSuiteBase {
override protected val notFullPartitionSpecErr = "Partition spec is invalid"
override protected def nullPartitionValue: String = "null"
test("SPARK-33650: drop partition into a table which doesn't support partition management") {
withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t =>

View file

@ -942,9 +942,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Hive metastore is not case preserving and the partition columns are always lower cased. We need
// to lower case the column names in partition specification before calling partition related Hive
// APIs, to match this behaviour.
private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
private def toMetaStorePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
// scalastyle:off caselocale
spec.map { case (k, v) => k.toLowerCase -> v }
val lowNames = spec.map { case (k, v) => k.toLowerCase -> v }
ExternalCatalogUtils.convertNullPartitionValues(lowNames)
// scalastyle:on caselocale
}
@ -993,8 +994,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))
}
val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
val metaStoreParts = partsWithLocation
.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
client.createPartitions(db, table, metaStoreParts, ignoreIfExists)
}
override def dropPartitions(
@ -1006,7 +1008,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
retainData: Boolean): Unit = withClient {
requireTableExists(db, table)
client.dropPartitions(
db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
db, table, parts.map(toMetaStorePartitionSpec), ignoreIfNotExists, purge, retainData)
}
override def renamePartitions(
@ -1015,7 +1017,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
client.renamePartitions(
db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
db, table, specs.map(toMetaStorePartitionSpec), newSpecs.map(toMetaStorePartitionSpec))
val tableMeta = getTable(db, table)
val partitionColumnNames = tableMeta.partitionColumnNames
@ -1031,7 +1033,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val fs = tablePath.getFileSystem(hadoopConf)
val newParts = newSpecs.map { spec =>
val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val partition = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri)))
}
alterPartitions(db, table, newParts)
@ -1141,12 +1143,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withClient {
val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
val metaStoreParts = newParts.map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec)))
val rawTable = getRawTable(db, table)
// convert partition statistics to properties so that we can persist them through hive api
val withStatsProps = lowerCasedParts.map { p =>
val withStatsProps = metaStoreParts.map { p =>
if (p.stats.isDefined) {
val statsProperties = statsToProperties(p.stats.get)
p.copy(parameters = p.parameters ++ statsProperties)
@ -1162,7 +1164,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = withClient {
val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
val part = client.getPartition(db, table, toMetaStorePartitionSpec(spec))
restorePartitionMetadata(part, getTable(db, table))
}
@ -1200,7 +1202,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
table: String,
spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
client.getPartitionOption(db, table, toMetaStorePartitionSpec(spec)).map { part =>
restorePartitionMetadata(part, getTable(db, table))
}
}
@ -1215,7 +1217,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val catalogTable = getTable(db, table)
val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
val clientPartitionNames =
client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec))
clientPartitionNames.map { partitionPath =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
partSpec.map { case (partName, partValue) =>
@ -1234,11 +1236,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table))
val res = client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
val metaStoreSpec = partialSpec.map(toMetaStorePartitionSpec)
val res = client.getPartitions(db, table, metaStoreSpec)
.map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap))
}
partialSpec match {
metaStoreSpec match {
// This might be a bug of Hive: When the partition value inside the partial partition spec
// contains dot, and we ask Hive to list partitions w.r.t. the partial partition spec, Hive
// treats dot as matching any single character and may return more partitions than we

View file

@ -133,6 +133,7 @@ case class InsertIntoHiveTable(
val numDynamicPartitions = partition.values.count(_.isEmpty)
val numStaticPartitions = partition.values.count(_.nonEmpty)
val partitionSpec = partition.map {
case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
case (key, Some(value)) => key -> value
case (key, None) => key -> ""
}
@ -229,6 +230,7 @@ case class InsertIntoHiveTable(
val caseInsensitiveDpMap = CaseInsensitiveMap(dpMap)
val updatedPartitionSpec = partition.map {
case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME
case (key, Some(value)) => key -> value
case (key, None) if caseInsensitiveDpMap.contains(key) =>
key -> caseInsensitiveDpMap(key)