[SPARK-17732][SQL] ALTER TABLE DROP PARTITION should support comparators
## What changes were proposed in this pull request? This PR aims to support `comparators`, e.g. '<', '<=', '>', '>=', again in Apache Spark 2.0 for backward compatibility. **Spark 1.6** ``` scala scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = [result: string] scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')") res1: org.apache.spark.sql.DataFrame = [result: string] ``` **Spark 2.0** ``` scala scala> sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") res0: org.apache.spark.sql.DataFrame = [] scala> sql("ALTER TABLE sales DROP PARTITION (country < 'KR')") org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '<' expecting {')', ','}(line 1, pos 42) ``` After this PR, it's supported. ## How was this patch tested? Pass the Jenkins test with a newly added testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15704 from dongjoon-hyun/SPARK-17732-2.
This commit is contained in:
parent
503378f10c
commit
3ce057d001
|
@ -239,11 +239,7 @@ partitionSpecLocation
|
|||
;
|
||||
|
||||
partitionSpec
|
||||
: PARTITION '(' partitionVal (',' partitionVal)* ')'
|
||||
;
|
||||
|
||||
partitionVal
|
||||
: identifier (EQ constant)?
|
||||
: PARTITION '(' expression (',' expression)* ')'
|
||||
;
|
||||
|
||||
describeFuncName
|
||||
|
|
|
@ -194,10 +194,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
|
|||
*/
|
||||
override def visitPartitionSpec(
|
||||
ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
|
||||
val parts = ctx.partitionVal.asScala.map { pVal =>
|
||||
val name = pVal.identifier.getText
|
||||
val value = Option(pVal.constant).map(visitStringConstant)
|
||||
name -> value
|
||||
val parts = ctx.expression.asScala.map { pVal =>
|
||||
expression(pVal) match {
|
||||
case UnresolvedAttribute(name :: Nil) =>
|
||||
name -> None
|
||||
case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) =>
|
||||
name -> Option(constant.toString)
|
||||
case _ =>
|
||||
throw new ParseException("Invalid partition filter specification", ctx)
|
||||
}
|
||||
}
|
||||
// Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
|
||||
// in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
|
||||
|
@ -206,6 +211,23 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
|
|||
parts.toMap
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a partition filter specification.
|
||||
*/
|
||||
def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) {
|
||||
val parts = ctx.expression.asScala.map { pVal =>
|
||||
expression(pVal) match {
|
||||
case EqualNullSafe(_, _) =>
|
||||
throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
|
||||
case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
|
||||
cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant))
|
||||
case _ =>
|
||||
throw new ParseException("Invalid partition filter specification", ctx)
|
||||
}
|
||||
}
|
||||
parts.reduceLeft(And)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a partition specification map without optional values.
|
||||
*/
|
||||
|
|
|
@ -813,7 +813,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
|
|||
}
|
||||
AlterTableDropPartitionCommand(
|
||||
visitTableIdentifier(ctx.tableIdentifier),
|
||||
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
|
||||
ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
|
||||
ctx.EXISTS != null,
|
||||
ctx.PURGE != null)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
|
|||
import org.apache.spark.sql.catalyst.analysis.Resolver
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
|
||||
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
|
||||
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.util.SerializableConfiguration
|
||||
|
@ -418,27 +419,55 @@ case class AlterTableRenamePartitionCommand(
|
|||
*/
|
||||
case class AlterTableDropPartitionCommand(
|
||||
tableName: TableIdentifier,
|
||||
specs: Seq[TablePartitionSpec],
|
||||
specs: Seq[Expression],
|
||||
ifExists: Boolean,
|
||||
purge: Boolean)
|
||||
extends RunnableCommand {
|
||||
extends RunnableCommand with PredicateHelper {
|
||||
|
||||
private def isRangeComparison(expr: Expression): Boolean = {
|
||||
expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
|
||||
}
|
||||
|
||||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||
val catalog = sparkSession.sessionState.catalog
|
||||
val table = catalog.getTableMetadata(tableName)
|
||||
val resolver = sparkSession.sessionState.conf.resolver
|
||||
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
|
||||
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
|
||||
|
||||
val normalizedSpecs = specs.map { spec =>
|
||||
specs.foreach { expr =>
|
||||
expr.references.foreach { attr =>
|
||||
if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
|
||||
throw new AnalysisException(s"${attr.name} is not a valid partition column " +
|
||||
s"in table ${table.identifier.quotedString}.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (specs.exists(isRangeComparison)) {
|
||||
val partitionSet = specs.flatMap { spec =>
|
||||
val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
|
||||
if (partitions.isEmpty && !ifExists) {
|
||||
throw new AnalysisException(s"There is no partition for ${spec.sql}")
|
||||
}
|
||||
partitions
|
||||
}.distinct
|
||||
catalog.dropPartitions(
|
||||
table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
|
||||
} else {
|
||||
val normalizedSpecs = specs.map { expr =>
|
||||
val spec = splitConjunctivePredicates(expr).map {
|
||||
case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
|
||||
}.toMap
|
||||
PartitioningUtils.normalizePartitionSpec(
|
||||
spec,
|
||||
table.partitionColumnNames,
|
||||
table.identifier.quotedString,
|
||||
sparkSession.sessionState.conf.resolver)
|
||||
resolver)
|
||||
}
|
||||
|
||||
catalog.dropPartitions(
|
||||
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
|
||||
}
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
|
|
|
@ -215,8 +215,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
|
|||
if (overwrite.enabled) {
|
||||
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
|
||||
if (deletedPartitions.nonEmpty) {
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
val expressions = deletedPartitions.map { specs =>
|
||||
specs.map { case (key, value) =>
|
||||
EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
|
||||
}.reduceLeft(And)
|
||||
}.toSeq
|
||||
AlterTableDropPartitionCommand(
|
||||
l.catalogTable.get.identifier, deletedPartitions.toSeq,
|
||||
l.catalogTable.get.identifier, expressions,
|
||||
ifExists = true, purge = true).run(t.sparkSession)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import scala.reflect.{classTag, ClassTag}
|
|||
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.catalyst.plans.PlanTest
|
||||
import org.apache.spark.sql.catalyst.plans.logical.Project
|
||||
|
@ -612,8 +613,12 @@ class DDLCommandSuite extends PlanTest {
|
|||
val expected1_table = AlterTableDropPartitionCommand(
|
||||
tableIdent,
|
||||
Seq(
|
||||
Map("dt" -> "2008-08-08", "country" -> "us"),
|
||||
Map("dt" -> "2009-09-09", "country" -> "uk")),
|
||||
And(
|
||||
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
|
||||
EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
|
||||
And(
|
||||
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
|
||||
EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
|
||||
ifExists = true,
|
||||
purge = false)
|
||||
val expected2_table = expected1_table.copy(ifExists = false)
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
|
|||
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.execution.command.DDLUtils
|
||||
import org.apache.spark.sql.hive.HiveExternalCatalog
|
||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||
|
@ -225,6 +226,108 @@ class HiveDDLSuite
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-17732: Drop partitions by filter") {
|
||||
withTable("sales") {
|
||||
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
|
||||
|
||||
for (country <- Seq("US", "CA", "KR")) {
|
||||
for (quarter <- 1 to 4) {
|
||||
sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
|
||||
}
|
||||
}
|
||||
|
||||
sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
|
||||
checkAnswer(sql("SHOW PARTITIONS sales"),
|
||||
Row("country=CA/quarter=1") ::
|
||||
Row("country=CA/quarter=2") ::
|
||||
Row("country=KR/quarter=1") ::
|
||||
Row("country=KR/quarter=2") ::
|
||||
Row("country=KR/quarter=3") ::
|
||||
Row("country=KR/quarter=4") ::
|
||||
Row("country=US/quarter=1") ::
|
||||
Row("country=US/quarter=2") ::
|
||||
Row("country=US/quarter=3") ::
|
||||
Row("country=US/quarter=4") :: Nil)
|
||||
|
||||
sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
|
||||
checkAnswer(sql("SHOW PARTITIONS sales"),
|
||||
Row("country=KR/quarter=2") ::
|
||||
Row("country=KR/quarter=3") ::
|
||||
Row("country=KR/quarter=4") ::
|
||||
Row("country=US/quarter=2") ::
|
||||
Row("country=US/quarter=3") ::
|
||||
Row("country=US/quarter=4") :: Nil)
|
||||
|
||||
sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
|
||||
sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
|
||||
checkAnswer(sql("SHOW PARTITIONS sales"),
|
||||
Row("country=KR/quarter=2") ::
|
||||
Row("country=KR/quarter=3") ::
|
||||
Row("country=US/quarter=2") ::
|
||||
Row("country=US/quarter=4") :: Nil)
|
||||
|
||||
sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
|
||||
checkAnswer(sql("SHOW PARTITIONS sales"),
|
||||
Row("country=KR/quarter=3") :: Nil)
|
||||
|
||||
// According to the declarative partition spec definitions, this drops the union of target
|
||||
// partitions without exceptions. Hive raises exceptions because it handles them sequentially.
|
||||
sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
|
||||
checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-17732: Error handling for drop partitions by filter") {
|
||||
withTable("sales") {
|
||||
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
|
||||
|
||||
val m = intercept[AnalysisException] {
|
||||
sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
|
||||
}.getMessage
|
||||
assert(m.contains("unknown is not a valid partition column in table"))
|
||||
|
||||
val m2 = intercept[AnalysisException] {
|
||||
sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
|
||||
}.getMessage
|
||||
assert(m2.contains("unknown is not a valid partition column in table"))
|
||||
|
||||
val m3 = intercept[AnalysisException] {
|
||||
sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
|
||||
}.getMessage
|
||||
assert(m3.contains("'<=>' operator is not allowed in partition specification"))
|
||||
|
||||
val m4 = intercept[ParseException] {
|
||||
sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
|
||||
}.getMessage
|
||||
assert(m4.contains("'<=>' operator is not allowed in partition specification"))
|
||||
|
||||
val m5 = intercept[ParseException] {
|
||||
sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
|
||||
}.getMessage
|
||||
assert(m5.contains("Invalid partition filter specification"))
|
||||
|
||||
sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
|
||||
val m6 = intercept[AnalysisException] {
|
||||
sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
|
||||
}.getMessage
|
||||
// The query is not executed because `PARTITION (quarter <= '2')` is invalid.
|
||||
checkAnswer(sql("SHOW PARTITIONS sales"),
|
||||
Row("country=KR/quarter=3") :: Nil)
|
||||
assert(m6.contains("There is no partition for (`quarter` <= '2')"))
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
|
||||
withTable("sales") {
|
||||
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")
|
||||
|
||||
val m = intercept[ParseException] {
|
||||
sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
|
||||
}.getMessage()
|
||||
assert(m.contains("Invalid partition filter specification"))
|
||||
}
|
||||
}
|
||||
|
||||
test("drop views") {
|
||||
withTable("tab1") {
|
||||
val tabName = "tab1"
|
||||
|
|
Loading…
Reference in a new issue