[SPARK-33862][SQL] Throw PartitionAlreadyExistsException if the target partition exists while renaming

### What changes were proposed in this pull request?
Throw `PartitionAlreadyExistsException` from `ALTER TABLE .. RENAME TO PARTITION` for a table from Hive V1 External Catalog in the case when the target partition already exists.

### Why are the changes needed?
1. To have the same behavior of V1 In-Memory and Hive External Catalog.
2. To not propagate internal Hive's exceptions to users.

### Does this PR introduce _any_ user-facing change?
Yes. After the changes, the partition renaming command throws `PartitionAlreadyExistsException` for tables from the Hive catalog.

### How was this patch tested?
Added new UT:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *HiveCatalogedDDLSuite"
```

Closes #30866 from MaxGekk/throw-PartitionAlreadyExistsException.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Max Gekk 2020-12-21 03:37:30 -08:00 committed by Dongjoon Hyun
parent f4e1069bb8
commit cdd1752ad1
2 changed files with 11 additions and 2 deletions

View file

@ -29,7 +29,7 @@ import org.apache.spark.internal.config
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, PartitionAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
@ -1635,6 +1635,12 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
// target partition already exists
val errMsg = intercept[PartitionAlreadyExistsException] {
sql("ALTER TABLE tab1 PARTITION (a='1', b='p') RENAME TO PARTITION (a='20', b='c')")
}.getMessage
assert(errMsg.contains("Partition already exists"))
}
protected def testChangeColumn(isDatasourceTable: Boolean): Unit = {

View file

@ -49,7 +49,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
@ -665,6 +665,9 @@ private[hive] class HiveClientImpl(
val catalogTable = getTable(db, table)
val hiveTable = toHiveTable(catalogTable, Some(userName))
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
if (client.getPartition(hiveTable, newSpec.asJava, false) != null) {
throw new PartitionAlreadyExistsException(db, table, newSpec)
}
val hivePart = getPartitionOption(catalogTable, oldSpec)
.map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
.getOrElse { throw new NoSuchPartitionException(db, table, oldSpec) }