diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a9de64908c..f1fc5d762a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -35,9 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -49,6 +52,22 @@ object FileFormatWriter extends Logging { customPartitionLocations: Map[TablePartitionSpec, String], outputColumns: Seq[Attribute]) + /** A function that converts the empty string to null for partition values. */ + case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { + override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v + override def nullable: Boolean = true + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => { + s"""if ($c.numBytes() == 0) { + | ${ev.isNull} = true; + | ${ev.value} = null; + |} else { + | ${ev.value} = $c; + |}""".stripMargin + }) + } + } + /** * Basic work flow of this command is: * 1. Driver side setup, including output committer initialization and data source specific @@ -84,6 +103,15 @@ object FileFormatWriter extends Logging { val partitionSet = AttributeSet(partitionColumns) val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) + var needConvert = false + val projectList: Seq[NamedExpression] = plan.output.map { + case p if partitionSet.contains(p) && p.dataType == StringType && p.nullable => + needConvert = true + Alias(Empty2Null(p), p.name)() + case attr => attr + } + val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else plan + val bucketIdExpression = bucketSpec.map { spec => val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can @@ -123,7 +151,7 @@ object FileFormatWriter extends Logging { // We should first sort by partition columns, then bucket id, and finally sorting columns. val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns // the sort order doesn't matter - val actualOrdering = plan.outputOrdering.map(_.child) + val actualOrdering = empty2NullPlan.outputOrdering.map(_.child) val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { false } else { @@ -141,7 +169,7 @@ object FileFormatWriter extends Logging { try { val rdd = if (orderingMatched) { - plan.execute() + empty2NullPlan.execute() } else { // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and // the physical plan may have different attribute ids due to optimizer removing some @@ -151,7 +179,7 @@ object FileFormatWriter extends Logging { SortExec( orderingExpr, global = false, - child = plan).execute() + child = empty2NullPlan).execute() } // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index 13f0e0bca8..e09ec0d7bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -18,9 +18,14 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest import org.apache.spark.sql.test.SharedSQLContext -class FileFormatWriterSuite extends QueryTest with SharedSQLContext { +class FileFormatWriterSuite + extends QueryTest + with SharedSQLContext + with CodegenInterpretedPlanTest{ + import testImplicits._ test("empty file should be skipped while write to file") { @@ -44,4 +49,16 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext { checkAnswer(spark.table("t4"), Row(0, 0)) } } + + test("Null and '' values should not cause dynamic partition failure of string types") { + withTable("t1", "t2") { + Seq((0, None), (1, Some("")), (2, None)).toDF("id", "p") + .write.partitionBy("p").saveAsTable("t1") + checkAnswer(spark.table("t1").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null))) + + sql("create table t2(id long, p string) using parquet partitioned by (p)") + sql("insert overwrite table t2 partition(p) select id, p from t1") + checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null))) + } + } }