[SPARK-34312][SQL] Support partition(s) truncation by Supports(Atomic)PartitionManagement

### What changes were proposed in this pull request?
1. Add new method `truncatePartition()` to the `SupportsPartitionManagement` interface.
2. Add new method `truncatePartitions()` to the `SupportsAtomicPartitionManagement` interface.
3. Default implementation of new methods in `InMemoryPartitionTable`/`InMemoryAtomicPartitionTable`.

### Why are the changes needed?
This is the first step in supporting of v2 `TRUNCATE TABLE .. PARTITION`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *SupportsPartitionManagementSuite"
$ build/sbt "test:testOnly *SupportsAtomicPartitionManagementSuite"
```

Closes #31420 from MaxGekk/dsv2-truncate-table-partitions.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2021-02-02 08:25:59 +00:00 committed by Wenchen Fan
parent f024d3051c
commit 6d3674bb62
7 changed files with 118 additions and 4 deletions

View file

@ -37,6 +37,8 @@ import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
* ${@link #purgePartitions}:
* remove an array of partitions and any data they contain from the table by skipping
* a trash even if it is supported
* ${@link #truncatePartitions}:
* truncate an array of partitions by removing partitions data
*
* @since 3.1.0
*/
@ -105,4 +107,22 @@ public interface SupportsAtomicPartitionManagement extends SupportsPartitionMana
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition purge is not supported");
}
/**
* Truncate an array of partitions atomically from table, and completely remove partitions data.
* <p>
* If any partition doesn't exists,
* the operation of truncatePartitions need to be safely rolled back.
*
* @param idents an array of partition identifiers
* @return true if partitions were truncated successfully otherwise false
* @throws NoSuchPartitionException If any partition identifier to truncate doesn't exist
* @throws UnsupportedOperationException If partition truncate is not supported
*
* @since 3.2.0
*/
default boolean truncatePartitions(InternalRow[] idents)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partitions truncate is not supported");
}
}

View file

@ -41,6 +41,8 @@ import org.apache.spark.sql.types.StructType;
* even if it is supported.
* ${@link #replacePartitionMetadata}:
* point a partition to a new location, which will swap one location's data for the other
* ${@link #truncatePartition}:
* remove partition data from the table
*
* @since 3.1.0
*/
@ -158,4 +160,19 @@ public interface SupportsPartitionManagement extends Table {
NoSuchPartitionException {
throw new UnsupportedOperationException("Partition renaming is not supported");
}
/**
* Truncate a partition in the table by completely removing partition data.
*
* @param ident a partition identifier
* @return true if the partition was truncated successfully otherwise false
* @throws NoSuchPartitionException If the partition identifier to alter doesn't exist
* @throws UnsupportedOperationException If partition truncation is not supported
*
* @since 3.2.0
*/
default boolean truncatePartition(InternalRow ident)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition truncate is not supported");
}
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.connector
import java.util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
@ -75,4 +75,14 @@ class InMemoryAtomicPartitionTable (
}
idents.forall(dropPartition)
}
override def truncatePartitions(idents: Array[InternalRow]): Boolean = {
val nonExistent = idents.filterNot(partitionExists)
if (nonExistent.isEmpty) {
idents.foreach(truncatePartition)
true
} else {
throw new NoSuchPartitionException(name, nonExistent.head, partitionSchema)
}
}
}

View file

@ -122,4 +122,13 @@ class InMemoryPartitionTable(
renamePartitionKey(partitionSchema, from.toSeq(schema), to.toSeq(schema))
}
}
override def truncatePartition(ident: InternalRow): Boolean = {
if (memoryTablePartitions.containsKey(ident)) {
clearPartition(ident.toSeq(schema))
true
} else {
throw new NoSuchPartitionException(name, ident, partitionSchema)
}
}
}

View file

@ -204,6 +204,11 @@ class InMemoryTable(
}
}
protected def clearPartition(key: Seq[Any]): Unit = dataMap.synchronized {
assert(dataMap.contains(key))
dataMap(key).clear()
}
def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized {
data.foreach(_.rows.foreach { row =>
val key = getKey(row)
@ -464,6 +469,8 @@ class BufferedRows(
rows.append(row)
this
}
def clear(): Unit = rows.clear()
}
private class BufferedRowsReaderFactory(

View file

@ -21,8 +21,8 @@ import java.util
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.connector.{InMemoryAtomicPartitionTable, InMemoryTableCatalog}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.{BufferedRows, InMemoryAtomicPartitionTable, InMemoryTableCatalog}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@ -141,4 +141,33 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
partTable.dropPartition(partIdent)
assert(!hasPartitions(partTable))
}
test("truncatePartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
partTable.createPartitions(
Array(InternalRow("3"), InternalRow("4"), InternalRow("5")),
Array.tabulate(3)(_ => new util.HashMap[String, String]()))
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 3)
partTable.withData(Array(
new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
new BufferedRows("4").withRow(InternalRow(1, "def", "4")),
new BufferedRows("5").withRow(InternalRow(2, "zyx", "5"))
))
partTable.truncatePartitions(Array(InternalRow("3"), InternalRow("4")))
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 3)
assert(partTable.rows === InternalRow(2, "zyx", "5") :: Nil)
// Truncate non-existing partition
val errMsg = intercept[NoSuchPartitionException] {
partTable.truncatePartitions(Array(InternalRow("5"), InternalRow("6")))
}.getMessage
assert(errMsg.contains("Partition not found in table test.ns.test_table: 6 -> dt"))
assert(partTable.rows === InternalRow(2, "zyx", "5") :: Nil)
}
}

View file

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.{BufferedRows, InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@ -233,4 +233,26 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
assert(partTable.renamePartition(InternalRow(0, "abc"), newPart))
assert(partTable.partitionExists(newPart))
}
test("truncatePartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))
val partIdent = InternalRow.apply("3")
val partIdent1 = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
partTable.createPartition(partIdent1, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
partTable.withData(Array(
new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
new BufferedRows("4").withRow(InternalRow(1, "def", "4"))
))
partTable.truncatePartition(partIdent)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
assert(partTable.rows === InternalRow(1, "def", "4") :: Nil)
}
}