[SPARK-29938][SQL] Add batching support in Alter table add partition flow

### What changes were proposed in this pull request?
Add batching support in Alter table add partition flow. Also calculate new partition sizes faster by doing listing in parallel.

### Why are the changes needed?
This PR split the the single createPartitions() call AlterTableAddPartition flow into smaller batches, which could prevent
 - SocketTimeoutException: Adding thousand of partitions in Hive metastore itself takes lot of time. Because of this hive client fails with SocketTimeoutException.

- Hive metastore from OOM (caused by millions of partitions).

It will also try to gather stats (total size of all files in all new partitions) faster by parallely listing the new partition paths.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Added UT.
Also tested on a cluster in HDI with 15000 partitions with remote metastore server. Without batching - operation fails with SocketTimeoutException, With batching it finishes in 25 mins.

Closes #26569 from prakharjain09/add_partition_batching_r1.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Prakhar Jain 2019-12-20 08:54:14 -06:00 committed by Sean Owen
parent 0d2ef3ae2b
commit 07b04c4c72
2 changed files with 31 additions and 5 deletions

View file

@ -476,14 +476,26 @@ case class AlterTableAddPartitionCommand(
CatalogTablePartition(normalizedSpec, table.storage.copy(
locationUri = location.map(CatalogUtils.stringToURI)))
}
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
// Hive metastore may not have enough memory to handle millions of partitions in single RPC.
// Also the request to metastore times out when adding lot of partitions in one shot.
// we should split them into smaller batches
val batchSize = 100
parts.toIterator.grouped(batchSize).foreach { batch =>
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}
if (table.stats.nonEmpty) {
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val addedSize = parts.map { part =>
CommandUtils.calculateLocationSize(sparkSession.sessionState, table.identifier,
part.storage.locationUri)
}.sum
def calculatePartSize(part: CatalogTablePartition) = CommandUtils.calculateLocationSize(
sparkSession.sessionState, table.identifier, part.storage.locationUri)
val threshold = sparkSession.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
val partSizes = if (parts.length > threshold) {
ThreadUtils.parmap(parts, "gatheringNewPartitionStats", 8)(calculatePartSize)
} else {
parts.map(calculatePartSize)
}
val addedSize = partSizes.sum
if (addedSize > 0) {
val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize)
catalog.alterTableStats(table.identifier, Some(newStats))

View file

@ -521,6 +521,20 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
}
test("new partitions should be added to catalog after writing to catalog table") {
val table = "partitioned_catalog_table"
val numParts = 210
withTable(table) {
val df = (1 to numParts).map(i => (i, i)).toDF("part", "col1")
val tempTable = "partitioned_catalog_temp_table"
df.createOrReplaceTempView(tempTable)
sql(s"CREATE TABLE $table (part Int, col1 Int) USING parquet PARTITIONED BY (part)")
sql(s"INSERT INTO TABLE $table SELECT * from $tempTable")
val partitions = spark.sessionState.catalog.listPartitionNames(TableIdentifier(table))
assert(partitions.size == numParts)
}
}
test("SPARK-20236: dynamic partition overwrite without catalog table") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTempPath { path =>