diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index bdba10eb48..f8299eee86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -476,14 +476,26 @@ case class AlterTableAddPartitionCommand( CatalogTablePartition(normalizedSpec, table.storage.copy( locationUri = location.map(CatalogUtils.stringToURI))) } - catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) + + // Hive metastore may not have enough memory to handle millions of partitions in single RPC. + // Also the request to metastore times out when adding lot of partitions in one shot. + // we should split them into smaller batches + val batchSize = 100 + parts.toIterator.grouped(batchSize).foreach { batch => + catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists) + } if (table.stats.nonEmpty) { if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { - val addedSize = parts.map { part => - CommandUtils.calculateLocationSize(sparkSession.sessionState, table.identifier, - part.storage.locationUri) - }.sum + def calculatePartSize(part: CatalogTablePartition) = CommandUtils.calculateLocationSize( + sparkSession.sessionState, table.identifier, part.storage.locationUri) + val threshold = sparkSession.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD) + val partSizes = if (parts.length > threshold) { + ThreadUtils.parmap(parts, "gatheringNewPartitionStats", 8)(calculatePartSize) + } else { + parts.map(calculatePartSize) + } + val addedSize = partSizes.sum if (addedSize > 0) { val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize) catalog.alterTableStats(table.identifier, Some(newStats)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 871cb1ff15..bcff30a51c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -521,6 +521,20 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } + test("new partitions should be added to catalog after writing to catalog table") { + val table = "partitioned_catalog_table" + val numParts = 210 + withTable(table) { + val df = (1 to numParts).map(i => (i, i)).toDF("part", "col1") + val tempTable = "partitioned_catalog_temp_table" + df.createOrReplaceTempView(tempTable) + sql(s"CREATE TABLE $table (part Int, col1 Int) USING parquet PARTITIONED BY (part)") + sql(s"INSERT INTO TABLE $table SELECT * from $tempTable") + val partitions = spark.sessionState.catalog.listPartitionNames(TableIdentifier(table)) + assert(partitions.size == numParts) + } + } + test("SPARK-20236: dynamic partition overwrite without catalog table") { withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { withTempPath { path =>