diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9d2c9ba0c1..d66cb09bda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -155,7 +155,8 @@ object PartitioningUtils { "root directory of the table. If there are multiple root directories, " + "please load them separately and then union them.") - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues, timeZone) + val resolvedPartitionValues = + resolvePartitions(pathsWithPartitionValues, caseSensitive, timeZone) // Creates the StructType which represents the partition columns. val fields = { @@ -345,15 +346,18 @@ object PartitioningUtils { */ def resolvePartitions( pathsWithPartitionValues: Seq[(Path, PartitionValues)], + caseSensitive: Boolean, timeZone: TimeZone): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - // TODO: Selective case sensitivity. - val distinctPartColNames = - pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct + val partColNames = if (caseSensitive) { + pathsWithPartitionValues.map(_._2.columnNames) + } else { + pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())) + } assert( - distinctPartColNames.size == 1, + partColNames.distinct.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) // Resolves possible type conflicts for each column diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index fdb0511f01..ec552f7ddf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -52,7 +52,7 @@ class FileIndexSuite extends SharedSQLContext { test("SPARK-26188: don't infer data types of partition columns if user specifies schema") { withTempDir { dir => - val partitionDirectory = new File(dir, s"a=4d") + val partitionDirectory = new File(dir, "a=4d") partitionDirectory.mkdir() val file = new File(partitionDirectory, "text.txt") stringToFile(file, "text") @@ -65,6 +65,36 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26230: if case sensitive, validate partitions with original column names") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=1") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val partitionDirectory2 = new File(dir, "A=2") + partitionDirectory2.mkdir() + val file2 = new File(partitionDirectory2, "text.txt") + stringToFile(file2, "text") + val path = new Path(dir.getCanonicalPath) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + val partitionValues = fileIndex.partitionSpec().partitions.map(_.values) + assert(partitionValues.length == 2) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val msg = intercept[AssertionError] { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + fileIndex.partitionSpec() + }.getMessage + assert(msg.contains("Conflicting partition column names detected")) + assert("Partition column name list #[0-1]: A".r.findFirstIn(msg).isDefined) + assert("Partition column name list #[0-1]: a".r.findFirstIn(msg).isDefined) + } + } + } + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt")