[SPARK-28054][SQL][FOLLOWUP] move the bug fix closer to where causes the issue
## What changes were proposed in this pull request? The bug fixed by https://github.com/apache/spark/pull/24886 is caused by Hive's `loadDynamicPartitions`. It's better to keep the fix surgical and put it right before we call `loadDynamicPartitions`. This also makes the fix safer, instead of analyzing all the callers of `saveAsHiveFile` and proving that they are safe. ## How was this patch tested? N/A Closes #25234 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
e04f696f7f
commit
a45739d97e
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.hive.execution
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.hive.ql.ErrorMsg
|
||||
|
@ -186,10 +188,15 @@ case class InsertIntoHiveTable(
|
|||
}
|
||||
|
||||
val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
|
||||
query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
|
||||
val attr = query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
|
||||
throw new AnalysisException(
|
||||
s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
|
||||
}.asInstanceOf[Attribute]
|
||||
// SPARK-28054: Hive metastore is not case preserving and keeps partition columns
|
||||
// with lower cased names. Hive will validate the column names in the partition directories
|
||||
// during `loadDynamicPartitions`. Spark needs to write partition directories with lower-cased
|
||||
// column names in order to make `loadDynamicPartitions` work.
|
||||
attr.withName(name.toLowerCase(Locale.ROOT))
|
||||
}
|
||||
|
||||
saveAsHiveFile(
|
||||
|
|
|
@ -83,16 +83,6 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
|
|||
jobId = java.util.UUID.randomUUID().toString,
|
||||
outputPath = outputLocation)
|
||||
|
||||
// SPARK-28054: Hive metastore is not case preserving and keeps partition columns
|
||||
// with lower cased names, Hive will validate the column names in partition spec and
|
||||
// the partition paths. Besides lowercasing the column names in the partition spec,
|
||||
// we also need to lowercase the column names in written partition paths.
|
||||
// scalastyle:off caselocale
|
||||
val hiveCompatiblePartitionColumns = partitionAttributes.map { attr =>
|
||||
attr.withName(attr.name.toLowerCase)
|
||||
}
|
||||
// scalastyle:on caselocale
|
||||
|
||||
FileFormatWriter.write(
|
||||
sparkSession = sparkSession,
|
||||
plan = plan,
|
||||
|
@ -101,7 +91,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
|
|||
outputSpec =
|
||||
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns),
|
||||
hadoopConf = hadoopConf,
|
||||
partitionColumns = hiveCompatiblePartitionColumns,
|
||||
partitionColumns = partitionAttributes,
|
||||
bucketSpec = None,
|
||||
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
|
||||
options = Map.empty)
|
||||
|
|
Loading…
Reference in a new issue