[SPARK-34223][SQL] FIX NPE for static partition with null in InsertIntoHadoopFsRelationCommand

### What changes were proposed in this pull request?

with a simple case, the null will be passed to InsertIntoHadoopFsRelationCommand blindly, we should avoid the npe
```scala
 test("NPE") {
    withTable("t") {
      sql(s"CREATE TABLE t(i STRING, c string) USING $format PARTITIONED BY (c)")
      sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
      checkAnswer(spark.table("t"), Row("1", null))
    }
  }
```
```logtalk
java.lang.NullPointerException
	at scala.collection.immutable.StringOps$.length(StringOps.scala:51)
	at scala.collection.immutable.StringOps.length(StringOps.scala:51)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:35)
	at scala.collection.IndexedSeqOptimized.foreach
	at scala.collection.immutable.StringOps.foreach(StringOps.scala:33)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.escapePathName(ExternalCatalogUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.orig-s0.0000030000-r30676-expand-or-complete(InsertIntoHadoopFsRelationCommand.scala:231)
```

### Why are the changes needed?

a bug fix
### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

new tests

Closes #31320 from yaooqinn/SPARK-34223.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2021-01-26 12:05:58 +08:00 committed by Wenchen Fan
parent cb37c962be
commit b3915ddd91
2 changed files with 10 additions and 7 deletions

View file

@ -25,6 +25,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@ -59,7 +60,6 @@ case class InsertIntoHadoopFsRelationCommand(
fileIndex: Option[FileIndex],
outputColumnNames: Seq[String])
extends DataWritingCommand {
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
private lazy val parameters = CaseInsensitiveMap(options)
@ -226,12 +226,7 @@ case class InsertIntoHadoopFsRelationCommand(
committer: FileCommitProtocol): Unit = {
val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
"/" + partitionColumns.flatMap { p =>
staticPartitions.get(p.name) match {
case Some(value) =>
Some(escapePathName(p.name) + "=" + escapePathName(value))
case None =>
None
}
staticPartitions.get(p.name).map(getPartitionPathString(p.name, _))
}.mkString("/")
} else {
""

View file

@ -954,6 +954,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
assert(msg.contains("cannot resolve '`c3`' given input columns"))
}
}
test("SPARK-34223: static partition with null raise NPE") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)")
sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
checkAnswer(spark.table("t"), Row("1", null))
}
}
}
class FileExistingTestFileSystem extends RawLocalFileSystem {