[SPARK-31968][SQL] Duplicate partition columns check when writing data

### What changes were proposed in this pull request?
A unit test is added
Partition duplicate check added in `org.apache.spark.sql.execution.datasources.PartitioningUtils#validatePartitionColumn`

### Why are the changes needed?
When people write data with duplicate partition column, it will cause a `org.apache.spark.sql.AnalysisException: Found duplicate column ...` in loading data from the  writted.

### Does this PR introduce _any_ user-facing change?
Yes.
It will prevent people from using duplicate partition columns to write data.
1. Before the PR:
It will look ok at `df.write.partitionBy("b", "b").csv("file:///tmp/output")`,
but get an exception when read:
`spark.read.csv("file:///tmp/output").show()`
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the partition schema: `b`;
2. After the PR:
`df.write.partitionBy("b", "b").csv("file:///tmp/output")` will trigger the exception:
org.apache.spark.sql.AnalysisException: Found duplicate column(s) b, b: `b`;

### How was this patch tested?
Unit test.

Closes #28814 from TJX2014/master-SPARK-31968.

Authored-by: TJX2014 <xiaoxingstack@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
TJX2014 2020-06-13 22:21:35 -07:00 committed by Dongjoon Hyun
parent c2e5012a0a
commit a4ea599b1b
2 changed files with 12 additions and 1 deletions

View file

@ -545,6 +545,9 @@ object PartitioningUtils {
partitionColumns: Seq[String],
caseSensitive: Boolean): Unit = {
SchemaUtils.checkColumnNameDuplication(
partitionColumns, partitionColumns.mkString(", "), caseSensitive)
partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
field => field.dataType match {
case _: AtomicType => // OK

View file

@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.TestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
@ -156,4 +156,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
}
}
}
test("SPARK-31968: duplicate partition columns check") {
withTempPath { f =>
val e = intercept[AnalysisException](
Seq((3, 2)).toDF("a", "b").write.partitionBy("b", "b").csv(f.getAbsolutePath))
assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;"))
}
}
}