[SPARK-34926][SQL] PartitioningUtils.getPathFragment() should respect partition value is null

### What changes were proposed in this pull request?

When we insert data into a partition table partition with empty DataFrame. We will call `PartitioningUtils.getPathFragment()`
then to update this partition's metadata too.
When we insert to a partition when partition value is `null`, it will throw exception like
```
[info]   java.lang.NullPointerException:
[info]   at scala.collection.immutable.StringOps$.length$extension(StringOps.scala:51)
[info]   at scala.collection.immutable.StringOps.length(StringOps.scala:51)
[info]   at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:35)
[info]   at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
[info]   at scala.collection.immutable.StringOps.foreach(StringOps.scala:33)
[info]   at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.escapePathName(ExternalCatalogUtils.scala:69)
[info]   at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.getPartitionValueString(ExternalCatalogUtils.scala:126)
[info]   at org.apache.spark.sql.execution.datasources.PartitioningUtils$.$anonfun$getPathFragment$1(PartitioningUtils.scala:354)
[info]   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
[info]   at scala.collection.Iterator.foreach(Iterator.scala:941)
[info]   at scala.collection.Iterator.foreach$(Iterator.scala:941)
[info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
[info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
[info]   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
```
`PartitioningUtils.getPathFragment()`  should support `null` value too

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #32018 from AngersZhuuuu/SPARK-34926.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Angerszhuuuu 2021-04-02 10:26:14 +03:00 committed by Max Gekk
parent 280a2f359c
commit 65da9287bc
3 changed files with 25 additions and 3 deletions

View file

@ -119,12 +119,16 @@ object ExternalCatalogUtils {
}
}
def getPartitionPathString(col: String, value: String): String = {
val partitionString = if (value == null || value.isEmpty) {
def getPartitionValueString(value: String): String = {
if (value == null || value.isEmpty) {
DEFAULT_PARTITION_NAME
} else {
escapePathName(value)
}
}
def getPartitionPathString(col: String, value: String): String = {
val partitionString = getPartitionValueString(value)
escapePathName(col) + "=" + partitionString
}

View file

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionValueString
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
@ -350,7 +351,7 @@ object PartitioningUtils {
*/
def getPathFragment(spec: TablePartitionSpec, partitionSchema: StructType): String = {
partitionSchema.map { field =>
escapePathName(field.name) + "=" + escapePathName(spec(field.name))
escapePathName(field.name) + "=" + getPartitionValueString(spec(field.name))
}.mkString("/")
}

View file

@ -933,6 +933,23 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
assert(msg.contains("cannot resolve 'c3' given input columns"))
}
}
test("SPARK-34926: PartitioningUtils.getPathFragment() should respect partition value is null") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(id INT) USING PARQUET")
sql(
"""
|CREATE TABLE t2 (c1 INT, part STRING)
| USING parquet
|PARTITIONED BY (part)
|""".stripMargin)
sql(
"""
|INSERT INTO TABLE t2 PARTITION (part = null)
|SELECT * FROM t1 where 1=0""".stripMargin)
checkAnswer(spark.table("t2"), Nil)
}
}
}
class FileExistingTestFileSystem extends RawLocalFileSystem {