[SPARK-19359][SQL] clear useless path after rename a partition with upper-case by HiveExternalCatalog
## What changes were proposed in this pull request? Hive metastore is not case preserving and keep partition columns with lower case names. If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case. while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it. ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16700 from windpiger/clearUselessPathAfterRenamPartition.
This commit is contained in:
parent
bb1a1fe05e
commit
1b5ee2003c
|
@ -839,6 +839,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The partition path created by Hive is in lowercase, while Spark SQL will
|
||||
* rename it with the partition name in partitionColumnNames, and this function
|
||||
* returns the extra lowercase path created by Hive, and then we can delete it.
|
||||
* e.g. /path/A=1/B=2/C=3 is changed to /path/A=4/B=5/C=6, this function returns
|
||||
* /path/a=4
|
||||
*/
|
||||
def getExtraPartPathCreatedByHive(
|
||||
spec: TablePartitionSpec,
|
||||
partitionColumnNames: Seq[String],
|
||||
tablePath: Path): Path = {
|
||||
val partColumnNames = partitionColumnNames
|
||||
.take(partitionColumnNames.indexWhere(col => col.toLowerCase != col) + 1)
|
||||
.map(_.toLowerCase)
|
||||
|
||||
ExternalCatalogUtils.generatePartitionPath(lowerCasePartitionSpec(spec),
|
||||
partColumnNames, tablePath)
|
||||
}
|
||||
|
||||
override def createPartitions(
|
||||
db: String,
|
||||
table: String,
|
||||
|
@ -899,6 +919,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
spec, partitionColumnNames, tablePath)
|
||||
try {
|
||||
tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
|
||||
|
||||
// If the newSpec contains more than one depth partition, FileSystem.rename just deletes
|
||||
// the leaf(i.e. wrongPath), we should check if wrongPath's parents need to be deleted.
|
||||
// For example, give a newSpec 'A=1/B=2', after calling Hive's client.renamePartitions,
|
||||
// the location path in FileSystem is changed to 'a=1/b=2', which is wrongPath, then
|
||||
// although we renamed it to 'A=1/B=2', 'a=1/b=2' in FileSystem is deleted, but 'a=1'
|
||||
// is still exists, which we also need to delete
|
||||
val delHivePartPathAfterRename = getExtraPartPathCreatedByHive(
|
||||
spec,
|
||||
partitionColumnNames,
|
||||
tablePath)
|
||||
|
||||
if (delHivePartPathAfterRename != wrongPath) {
|
||||
tablePath.getFileSystem(hadoopConf).delete(delHivePartPathAfterRename, true)
|
||||
}
|
||||
} catch {
|
||||
case e: IOException => throw new SparkException(
|
||||
s"Unable to rename partition path from $wrongPath to $rightPath", e)
|
||||
|
|
|
@ -19,8 +19,11 @@ package org.apache.spark.sql.hive
|
|||
|
||||
import java.io.File
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.metrics.source.HiveCatalogMetrics
|
||||
import org.apache.spark.sql.{AnalysisException, QueryTest}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.test.SQLTestUtils
|
||||
|
@ -481,4 +484,37 @@ class PartitionProviderCompatibilitySuite
|
|||
assert(spark.sql("show partitions test").count() == 5)
|
||||
}
|
||||
}
|
||||
|
||||
test("partition path created by Hive should be deleted after renamePartitions with upper-case") {
|
||||
withTable("t", "t1", "t2") {
|
||||
Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t")
|
||||
spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)")
|
||||
|
||||
var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
|
||||
var extraHivePath = new Path(table.location + "/a=4")
|
||||
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
|
||||
.exists(extraHivePath), "partition path created by Hive should be deleted " +
|
||||
"after renamePartitions with upper-case")
|
||||
|
||||
Seq((1, 2, 3, 4)).toDF("id", "A", "B", "C").write.partitionBy("A", "B", "C").saveAsTable("t1")
|
||||
spark.sql("alter table t1 partition(A=2, B=3, C=4) rename to partition(A=5, B=6, C=7)")
|
||||
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
|
||||
extraHivePath = new Path(table.location + "/a=5")
|
||||
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
|
||||
.exists(extraHivePath), "partition path created by Hive should be deleted " +
|
||||
"after renamePartitions with upper-case")
|
||||
|
||||
Seq((1, 2, 3, 4)).toDF("id", "a", "B", "C").write.partitionBy("a", "B", "C").saveAsTable("t2")
|
||||
spark.sql("alter table t2 partition(a=2, B=3, C=4) rename to partition(a=4, B=5, C=6)")
|
||||
table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
|
||||
val partPath = new Path(table.location + "/a=4")
|
||||
assert(partPath.getFileSystem(spark.sessionState.newHadoopConf())
|
||||
.exists(partPath), "partition path of lower-case partition name should not be deleted")
|
||||
|
||||
extraHivePath = new Path(table.location + "/a=4/b=5")
|
||||
assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
|
||||
.exists(extraHivePath), "partition path created by Hive should be deleted " +
|
||||
"after renamePartitions with upper-case")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue