[SPARK-33990][SQL][TESTS] Remove partition data by v2 ALTER TABLE .. DROP PARTITION

### What changes were proposed in this pull request?
Remove partition data by `ALTER TABLE .. DROP PARTITION` in V2 table catalog used in tests.

### Why are the changes needed?
This is a bug fix. Before the fix, `ALTER TABLE .. DROP PARTITION` does not remove the data belongs to the dropped partition. As a consequence of that, the `select` query returns removed data.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running tests suites for v1 and v2 catalogs:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite"
```

Closes #31014 from MaxGekk/fix-drop-partition-v2.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Max Gekk 2021-01-04 10:26:39 -08:00 committed by Dongjoon Hyun
parent 6b86aa0b52
commit fc3f22645e
4 changed files with 18 additions and 1 deletions

View file

@ -49,6 +49,7 @@ class InMemoryAtomicPartitionTable (
override def dropPartition(ident: InternalRow): Boolean = {
if (memoryTablePartitions.containsKey(ident)) {
memoryTablePartitions.remove(ident)
removePartitionKey(ident.toSeq(schema))
true
} else {
false

View file

@ -61,6 +61,7 @@ class InMemoryPartitionTable(
def dropPartition(ident: InternalRow): Boolean = {
if (memoryTablePartitions.containsKey(ident)) {
memoryTablePartitions.remove(ident)
removePartitionKey(ident.toSeq(schema))
true
} else {
false

View file

@ -187,6 +187,10 @@ class InMemoryTable(
true
}
protected def removePartitionKey(key: Seq[Any]): Unit = dataMap.synchronized {
dataMap.remove(key)
}
def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized {
data.foreach(_.rows.foreach { row =>
val key = getKey(row)

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
import org.apache.spark.sql.internal.SQLConf
@ -144,4 +144,15 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
checkPartitions(t)
}
}
test("SPARK-33990: don not return data from dropped partition") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id int, part int) $defaultUsing PARTITIONED BY (part)")
sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(0, 0), Row(1, 1)))
sql(s"ALTER TABLE $t DROP PARTITION (part=0)")
QueryTest.checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 1)))
}
}
}