From cdd1752ad1bbb03b817870e1ad6b1d9cbda734a1 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 21 Dec 2020 03:37:30 -0800 Subject: [PATCH] [SPARK-33862][SQL] Throw `PartitionAlreadyExistsException` if the target partition exists while renaming ### What changes were proposed in this pull request? Throw `PartitionAlreadyExistsException` from `ALTER TABLE .. RENAME TO PARTITION` for a table from Hive V1 External Catalog in the case when the target partition already exists. ### Why are the changes needed? 1. To have the same behavior of V1 In-Memory and Hive External Catalog. 2. To not propagate internal Hive's exceptions to users. ### Does this PR introduce _any_ user-facing change? Yes. After the changes, the partition renaming command throws `PartitionAlreadyExistsException` for tables from the Hive catalog. ### How was this patch tested? Added new UT: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *HiveCatalogedDDLSuite" ``` Closes #30866 from MaxGekk/throw-PartitionAlreadyExistsException. Authored-by: Max Gekk Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 8 +++++++- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f92a93d54b..49184d0a2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, TempTableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, PartitionAlreadyExistsException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER @@ -1635,6 +1635,12 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p"))) + + // target partition already exists + val errMsg = intercept[PartitionAlreadyExistsException] { + sql("ALTER TABLE tab1 PARTITION (a='1', b='p') RENAME TO PARTITION (a='20', b='c')") + }.getMessage + assert(errMsg.contains("Partition already exists")) } protected def testChangeColumn(isDatasourceTable: Boolean): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index e779a80f7c..40bcdefbc3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -49,7 +49,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression @@ -665,6 +665,9 @@ private[hive] class HiveClientImpl( val catalogTable = getTable(db, table) val hiveTable = toHiveTable(catalogTable, Some(userName)) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => + if (client.getPartition(hiveTable, newSpec.asJava, false) != null) { + throw new PartitionAlreadyExistsException(db, table, newSpec) + } val hivePart = getPartitionOption(catalogTable, oldSpec) .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } .getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) }