[SPARK-26230][SQL] FileIndex: if case sensitive, validate partitions with original column names

## What changes were proposed in this pull request?

Partition column name is required to be unique under the same directory. The following paths are invalid partitioned directory:
```
hdfs://host:9000/path/a=1
hdfs://host:9000/path/b=2
```

If case sensitive, the following paths should be invalid too:
```
hdfs://host:9000/path/a=1
hdfs://host:9000/path/A=2
```
Since column 'a' and 'A' are different, and it is wrong to use either one as the column name in partition schema.

Also, there is a `TODO` comment in the code. Currently the Spark doesn't validate such case when `CASE_SENSITIVE` enabled.

This PR is to resolve the problem.

## How was this patch tested?

Add unit test

Closes #23186 from gengliangwang/SPARK-26230.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Gengliang Wang 2018-12-03 19:53:45 +08:00 committed by Wenchen Fan
parent 11e5f1bcd4
commit b569ba53f4
2 changed files with 40 additions and 6 deletions

View file

@ -155,7 +155,8 @@ object PartitioningUtils {
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone)
val resolvedPartitionValues =
resolvePartitions(pathsWithPartitionValues, caseSensitive, timeZone)
// Creates the StructType which represents the partition columns.
val fields = {
@ -345,15 +346,18 @@ object PartitioningUtils {
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
caseSensitive: Boolean,
timeZone: TimeZone): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
// TODO: Selective case sensitivity.
val distinctPartColNames =
pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct
val partColNames = if (caseSensitive) {
pathsWithPartitionValues.map(_._2.columnNames)
} else {
pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase()))
}
assert(
distinctPartColNames.size == 1,
partColNames.distinct.size == 1,
listConflictingPartitionColumns(pathsWithPartitionValues))
// Resolves possible type conflicts for each column

View file

@ -52,7 +52,7 @@ class FileIndexSuite extends SharedSQLContext {
test("SPARK-26188: don't infer data types of partition columns if user specifies schema") {
withTempDir { dir =>
val partitionDirectory = new File(dir, s"a=4d")
val partitionDirectory = new File(dir, "a=4d")
partitionDirectory.mkdir()
val file = new File(partitionDirectory, "text.txt")
stringToFile(file, "text")
@ -65,6 +65,36 @@ class FileIndexSuite extends SharedSQLContext {
}
}
test("SPARK-26230: if case sensitive, validate partitions with original column names") {
withTempDir { dir =>
val partitionDirectory = new File(dir, "a=1")
partitionDirectory.mkdir()
val file = new File(partitionDirectory, "text.txt")
stringToFile(file, "text")
val partitionDirectory2 = new File(dir, "A=2")
partitionDirectory2.mkdir()
val file2 = new File(partitionDirectory2, "text.txt")
stringToFile(file2, "text")
val path = new Path(dir.getCanonicalPath)
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
val partitionValues = fileIndex.partitionSpec().partitions.map(_.values)
assert(partitionValues.length == 2)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val msg = intercept[AssertionError] {
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
fileIndex.partitionSpec()
}.getMessage
assert(msg.contains("Conflicting partition column names detected"))
assert("Partition column name list #[0-1]: A".r.findFirstIn(msg).isDefined)
assert("Partition column name list #[0-1]: a".r.findFirstIn(msg).isDefined)
}
}
}
test("InMemoryFileIndex: input paths are converted to qualified paths") {
withTempDir { dir =>
val file = new File(dir, "text.txt")