[SPARK-26012][SQL] Null and '' values should not cause dynamic partition failure of string types

Dynamic partition will fail when both '' and null values are taken as dynamic partition values simultaneously.
For example, the test bellow will fail before this PR:

test("Null and '' values should not cause dynamic partition failure of string types") {
withTable("t1", "t2") {
spark.range(3).write.saveAsTable("t1")
spark.sql("select id, cast(case when id = 1 then '' else null end as string) as p" +
" from t1").write.partitionBy("p").saveAsTable("t2")
checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null)))
}
}

The error is: 'org.apache.hadoop.fs.FileAlreadyExistsException: File already exists'.
This PR convert the empty strings to null for partition values.
This is another way for PR(https://github.com/apache/spark/pull/23010)

(Please fill in changes proposed in this fix)

How was this patch tested?
New added test.

Closes #24334 from eatoncys/FileFormatWriter.

Authored-by: 10129659 <chen.yanshan@zte.com.cn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
10129659 2019-04-10 19:54:19 +08:00 committed by Wenchen Fan
parent 85e5d4f141
commit 5ea4deec44
2 changed files with 50 additions and 5 deletions

View file

@ -35,9 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{SerializableConfiguration, Utils}
@ -49,6 +52,22 @@ object FileFormatWriter extends Logging {
customPartitionLocations: Map[TablePartitionSpec, String],
outputColumns: Seq[Attribute])
/** A function that converts the empty string to null for partition values. */
case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression {
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v
override def nullable: Boolean = true
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
s"""if ($c.numBytes() == 0) {
| ${ev.isNull} = true;
| ${ev.value} = null;
|} else {
| ${ev.value} = $c;
|}""".stripMargin
})
}
}
/**
* Basic work flow of this command is:
* 1. Driver side setup, including output committer initialization and data source specific
@ -84,6 +103,15 @@ object FileFormatWriter extends Logging {
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
var needConvert = false
val projectList: Seq[NamedExpression] = plan.output.map {
case p if partitionSet.contains(p) && p.dataType == StringType && p.nullable =>
needConvert = true
Alias(Empty2Null(p), p.name)()
case attr => attr
}
val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan
val bucketIdExpression = bucketSpec.map { spec =>
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
// Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
@ -123,7 +151,7 @@ object FileFormatWriter extends Logging {
// We should first sort by partition columns, then bucket id, and finally sorting columns.
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
// the sort order doesn't matter
val actualOrdering = plan.outputOrdering.map(_.child)
val actualOrdering = empty2NullPlan.outputOrdering.map(_.child)
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
false
} else {
@ -141,7 +169,7 @@ object FileFormatWriter extends Logging {
try {
val rdd = if (orderingMatched) {
plan.execute()
empty2NullPlan.execute()
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
// the physical plan may have different attribute ids due to optimizer removing some
@ -151,7 +179,7 @@ object FileFormatWriter extends Logging {
SortExec(
orderingExpr,
global = false,
child = plan).execute()
child = empty2NullPlan).execute()
}
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single

View file

@ -18,9 +18,14 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest
import org.apache.spark.sql.test.SharedSQLContext
class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
class FileFormatWriterSuite
extends QueryTest
with SharedSQLContext
with CodegenInterpretedPlanTest{
import testImplicits._
test("empty file should be skipped while write to file") {
@ -44,4 +49,16 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
checkAnswer(spark.table("t4"), Row(0, 0))
}
}
test("Null and '' values should not cause dynamic partition failure of string types") {
withTable("t1", "t2") {
Seq((0, None), (1, Some("")), (2, None)).toDF("id", "p")
.write.partitionBy("p").saveAsTable("t1")
checkAnswer(spark.table("t1").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null)))
sql("create table t2(id long, p string) using parquet partitioned by (p)")
sql("insert overwrite table t2 partition(p) select id, p from t1")
checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null)))
}
}
}