[SPARK-33706][SQL] Require fully specified partition identifier in partitionExists()

### What changes were proposed in this pull request?
1. Check that the partition identifier passed to `SupportsPartitionManagement.partitionExists()` is fully specified (specifies all values of partition fields).
2. Remove the custom implementation of `partitionExists()` from `InMemoryPartitionTable`, and re-use the default implementation from `SupportsPartitionManagement`.

### Why are the changes needed?
The method is supposed to check existence of one partition but currently it can return `true` for partially specified partition. This can lead to incorrect commands behavior, for instance the commands could modify or place data in the middle of partition path.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running existing test suites:
```
$ build/sbt "test:testOnly *AlterTablePartitionV2SQLSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *SupportsPartitionManagementSuite"
```

Closes #30667 from MaxGekk/check-len-partitionExists.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2020-12-11 12:48:40 +00:00 committed by Wenchen Fan
parent 8f5db716fa
commit 8b97b19ffa
3 changed files with 30 additions and 10 deletions

View file

@ -17,7 +17,6 @@
package org.apache.spark.sql.connector.catalog; package org.apache.spark.sql.connector.catalog;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.Experimental;
@ -76,13 +75,18 @@ public interface SupportsPartitionManagement extends Table {
/** /**
* Test whether a partition exists using an {@link InternalRow ident} from the table. * Test whether a partition exists using an {@link InternalRow ident} from the table.
* *
* @param ident a partition identifier * @param ident a partition identifier which must contain all partition fields in order
* @return true if the partition exists, false otherwise * @return true if the partition exists, false otherwise
*/ */
default boolean partitionExists(InternalRow ident) { default boolean partitionExists(InternalRow ident) {
String[] partitionNames = partitionSchema().names(); String[] partitionNames = partitionSchema().names();
String[] requiredNames = Arrays.copyOfRange(partitionNames, 0, ident.numFields()); if (ident.numFields() == partitionNames.length) {
return listPartitionIdentifiers(requiredNames, ident).length > 0; return listPartitionIdentifiers(partitionNames, ident).length > 0;
} else {
throw new IllegalArgumentException("The number of fields (" + ident.numFields() +
") in the partition identifier is not equal to the partition schema length (" +
partitionNames.length + "). The identifier might not refer to one partition.");
}
} }
/** /**

View file

@ -83,9 +83,6 @@ class InMemoryPartitionTable(
} }
} }
override def partitionExists(ident: InternalRow): Boolean =
memoryTablePartitions.containsKey(ident)
override protected def addPartitionKey(key: Seq[Any]): Unit = { override protected def addPartitionKey(key: Seq[Any]): Unit = {
memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava) memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava)
} }

View file

@ -145,7 +145,7 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
assert(!hasPartitions(partTable)) assert(!hasPartitions(partTable))
} }
test("listPartitionByNames") { private def createMultiPartTable(): InMemoryPartitionTable = {
val partCatalog = new InMemoryPartitionTableCatalog val partCatalog = new InMemoryPartitionTableCatalog
partCatalog.initialize("test", CaseInsensitiveStringMap.empty()) partCatalog.initialize("test", CaseInsensitiveStringMap.empty())
val table = partCatalog.createTable( val table = partCatalog.createTable(
@ -156,8 +156,8 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
.add("part1", StringType), .add("part1", StringType),
Array(LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))), Array(LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))),
util.Collections.emptyMap[String, String]) util.Collections.emptyMap[String, String])
val partTable = table.asInstanceOf[InMemoryPartitionTable]
val partTable = table.asInstanceOf[InMemoryPartitionTable]
Seq( Seq(
InternalRow(0, "abc"), InternalRow(0, "abc"),
InternalRow(0, "def"), InternalRow(0, "def"),
@ -165,6 +165,12 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
partTable.createPartition(partIdent, new util.HashMap[String, String]()) partTable.createPartition(partIdent, new util.HashMap[String, String]())
} }
partTable
}
test("listPartitionByNames") {
val partTable = createMultiPartTable()
Seq( Seq(
(Array("part0", "part1"), InternalRow(0, "abc")) -> Set(InternalRow(0, "abc")), (Array("part0", "part1"), InternalRow(0, "abc")) -> Set(InternalRow(0, "abc")),
(Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")), (Array("part0"), InternalRow(0)) -> Set(InternalRow(0, "abc"), InternalRow(0, "def")),
@ -185,4 +191,17 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
intercept[AssertionError](partTable.listPartitionIdentifiers(names, idents)) intercept[AssertionError](partTable.listPartitionIdentifiers(names, idents))
} }
} }
test("partitionExists") {
val partTable = createMultiPartTable()
assert(partTable.partitionExists(InternalRow(0, "def")))
assert(!partTable.partitionExists(InternalRow(-1, "def")))
assert(!partTable.partitionExists(InternalRow("abc", "def")))
val errMsg = intercept[IllegalArgumentException] {
partTable.partitionExists(InternalRow(0))
}.getMessage
assert(errMsg.contains("The identifier might not refer to one partition"))
}
} }