[SPARK-34056][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. RECOVER PARTITIONS tests

### What changes were proposed in this pull request?
1. Port DS V2 tests from `AlterTablePartitionV2SQLSuite ` to the test suite `v2.AlterTableRecoverPartitionsSuite`.
2. Port DS v1 tests from `DDLSuite` to `v1.AlterTableRecoverPartitionsSuiteBase`.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsParserSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CatalogedDDLSuite"
```

Closes #31105 from MaxGekk/unify-recover-partitions-tests.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2021-01-20 01:49:31 +00:00 committed by Wenchen Fan
parent f6338a3e0b
commit 00b444d5ed
9 changed files with 312 additions and 112 deletions

View file

@ -2037,13 +2037,6 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsed4, expected4)
}
test("alter table: recover partitions") {
comparePlans(
parsePlan("ALTER TABLE a.b.c RECOVER PARTITIONS"),
AlterTableRecoverPartitions(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... RECOVER PARTITIONS")))
}
test("alter view: add partition (not supported)") {
assertUnsupported(
"""

View file

@ -46,12 +46,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil
ifExists: String,
specs: Map[String, Any]*): Unit = {
checkPartitions(t, specs.map(_.mapValues(_.toString).toMap): _*)
val specStr = specs.map(
_.map {
case (k, v: String) => s"$k = '$v'"
case (k, v) => s"$k = $v"
}.mkString("PARTITION (", ", ", ")"))
.mkString(", ")
val specStr = specs.map(partSpecToString).mkString(", ")
sql(s"ALTER TABLE $t DROP $ifExists $specStr")
checkPartitions(t)
}

View file

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTable}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.AlterTableRecoverPartitions
import org.apache.spark.sql.test.SharedSparkSession
class AlterTableRecoverPartitionsParserSuite extends AnalysisTest with SharedSparkSession {
test("recover partitions without table") {
val errMsg = intercept[ParseException] {
parsePlan("ALTER TABLE RECOVER PARTITIONS")
}.getMessage
assert(errMsg.contains("no viable alternative at input 'ALTER TABLE RECOVER PARTITIONS'"))
}
test("recover partitions of a table") {
comparePlans(
parsePlan("ALTER TABLE tbl RECOVER PARTITIONS"),
AlterTableRecoverPartitions(
UnresolvedTable(Seq("tbl"), "ALTER TABLE ... RECOVER PARTITIONS")))
}
test("recover partitions of a table in a database") {
comparePlans(
parsePlan("alter table db.tbl recover partitions"),
AlterTableRecoverPartitions(
UnresolvedTable(Seq("db", "tbl"), "ALTER TABLE ... RECOVER PARTITIONS")))
}
test("recover partitions of a table spark_catalog") {
comparePlans(
parsePlan("alter table spark_catalog.db.TBL recover partitions"),
AlterTableRecoverPartitions(
UnresolvedTable(Seq("spark_catalog", "db", "TBL"), "ALTER TABLE ... RECOVER PARTITIONS")))
}
test("recover partitions of a table in nested namespaces") {
comparePlans(
parsePlan("Alter Table ns1.ns2.ns3.ns4.ns5.ns6.ns7.ns8.t Recover Partitions"),
AlterTableRecoverPartitions(
UnresolvedTable(
Seq("ns1", "ns2", "ns3", "ns4", "ns5", "ns6", "ns7", "ns8", "t"),
"ALTER TABLE ... RECOVER PARTITIONS")))
}
}

View file

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.QueryTest
/**
* This base suite contains unified tests for the `ALTER TABLE .. RECOVER PARTITIONS` command that
* check V1 and V2 table catalogs. The tests that cannot run for all supported catalogs are
* located in more specific test suites:
*
* - V2 table catalog tests:
* `org.apache.spark.sql.execution.command.v2.AlterTableRecoverPartitionsSuite`
* - V1 table catalog tests:
* `org.apache.spark.sql.execution.command.v1.AlterTableRecoverPartitionsSuiteBase`
* - V1 In-Memory catalog (sequential):
* `org.apache.spark.sql.execution.command.v1.AlterTableRecoverPartitionsSuite`
* - V1 In-Memory catalog (parallel):
* `org.apache.spark.sql.execution.command.v1.AlterTableRecoverPartitionsParallelSuite`
* - V1 Hive External catalog:
* `org.apache.spark.sql.hive.execution.command.AlterTableRecoverPartitionsSuite`
*/
trait AlterTableRecoverPartitionsSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "ALTER TABLE .. RECOVER PARTITIONS"
}

View file

@ -108,4 +108,11 @@ trait DDLCommandTestUtils extends SQLTestUtils {
}
size
}
def partSpecToString(spec: Map[String, Any]): String = {
spec.map {
case (k, v: String) => s"$k = '$v'"
case (k, v) => s"$k = $v"
}.mkString("PARTITION (", ", ", ")")
}
}

View file

@ -26,7 +26,6 @@ import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, A
import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.internal.config
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, TempTableAlreadyExistsException}
@ -1186,89 +1185,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")
}
test("alter table: recover partitions (sequential)") {
val oldRddParallelListingThreshold = spark.sparkContext.conf.get(
RDD_PARALLEL_LISTING_THRESHOLD)
try {
spark.sparkContext.conf.set(RDD_PARALLEL_LISTING_THRESHOLD.key, "10")
testRecoverPartitions()
} finally {
spark.sparkContext.conf.set(RDD_PARALLEL_LISTING_THRESHOLD, oldRddParallelListingThreshold)
}
}
test("alter table: recover partition (parallel)") {
val oldRddParallelListingThreshold = spark.sparkContext.conf.get(
RDD_PARALLEL_LISTING_THRESHOLD)
try {
spark.sparkContext.conf.set(RDD_PARALLEL_LISTING_THRESHOLD.key, "0")
testRecoverPartitions()
} finally {
spark.sparkContext.conf.set(RDD_PARALLEL_LISTING_THRESHOLD, oldRddParallelListingThreshold)
}
}
protected def testRecoverPartitions(): Unit = {
val catalog = spark.sessionState.catalog
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
}
val tableIdent = TableIdentifier("tab1")
createTable(catalog, tableIdent, partitionCols = Seq("a", "b", "c"))
val part1 = Map("a" -> "1", "b" -> "5", "c" -> "19")
createTablePartition(catalog, part1, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
val part2 = Map("a" -> "2", "b" -> "6", "c" -> "31")
val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
// valid
fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary"))
val parts = (10 to 100).map { a =>
val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42")
fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42"))
fs.createNewFile(new Path(new Path(root, s"a=$a/b=5/c=42"), "a.csv")) // file
createTablePartition(catalog, part, tableIdent)
part
}
// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
fs.mkdirs(new Path(root, "a=4")) // not enough columns
fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
try {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2) ++ parts)
if (!isUsingHiveMetastore) {
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
} else {
// After ALTER TABLE, the statistics of the first partition is removed by Hive megastore
assert(catalog.getPartition(tableIdent, part1).parameters.get("numFiles").isEmpty)
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
}
} finally {
fs.delete(root, true)
}
}
test("alter table: add partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
}

View file

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.v1
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.command
/**
* This base suite contains unified tests for the `ALTER TABLE .. RECOVER PARTITIONS` command that
* check V1 table catalogs. The tests that cannot run for all V1 catalogs are located in more
* specific test suites:
*
* - V1 In-Memory catalog (sequential):
* `org.apache.spark.sql.execution.command.v1.AlterTableRecoverPartitionsSuite`
* - V1 In-Memory catalog (parallel):
* `org.apache.spark.sql.execution.command.v1.AlterTableRecoverPartitionsParallelSuite`
* - V1 Hive External catalog:
* `org.apache.spark.sql.hive.execution.command.AlterTableRecoverPartitionsSuite`
*/
trait AlterTableRecoverPartitionsSuiteBase extends command.AlterTableRecoverPartitionsSuiteBase {
test("table does not exist") {
val errMsg = intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
}.getMessage
assert(errMsg.contains("Table not found"))
}
def withTableDir(tableName: String)(f: (FileSystem, Path) => Unit): Unit = {
val location = sql(s"DESCRIBE TABLE EXTENDED $tableName")
.where("col_name = 'Location'")
.select("data_type")
.first()
.getString(0)
val root = new Path(location)
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
f(fs, root)
}
test("valid locations") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (a INT, b INT, c INT, d INT) $defaultUsing PARTITIONED BY (a, b, c)")
checkPartitions(t)
withTableDir(t) { case (fs, root) =>
fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary"))
}
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
checkPartitions(t,
Map("a" -> "1", "b" -> "5", "c" -> "19"),
Map("a" -> "2", "b" -> "6", "c" -> "31"))
}
}
test("invalid locations") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (a INT, b INT, c INT, d INT) $defaultUsing PARTITIONED BY (a, b, c)")
sql(s"INSERT INTO $t PARTITION (a=0, b=1, c=2) SELECT 3")
checkPartitions(t, Map("a" -> "0", "b" -> "1", "c" -> "2"))
withTableDir(t) { case (fs, root) =>
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
fs.mkdirs(new Path(root, "a=4")) // not enough columns
fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
}
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
checkPartitions(t, Map("a" -> "0", "b" -> "1", "c" -> "2"))
}
}
test("multiple locations") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (a INT, b INT, c INT, d INT) $defaultUsing PARTITIONED BY (a, b, c)")
sql(s"INSERT INTO $t PARTITION (a=0, b=1, c=2) SELECT 3")
val initPart = Map("a" -> "0", "b" -> "1", "c" -> "2")
checkPartitions(t, initPart)
withTableDir(t) { case (fs, root) =>
(0 to 100).foreach { a =>
val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42")
fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42"))
val loc = s"a=$a/b=5/c=42"
fs.createNewFile(new Path(new Path(root, loc), "a.csv")) // file
if (a >= 10) {
sql(s"ALTER TABLE $t ADD ${partSpecToString(part)} LOCATION '$loc'")
}
}
}
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
val expected = (0 to 100)
.map(a => Map("a" -> a.toString, "b" -> "5", "c" -> "42")) :+ initPart
checkPartitions(t, expected: _*)
}
}
}
/**
* The class contains tests for the `ALTER TABLE .. RECOVER PARTITIONS` command to check
* V1 In-Memory table catalog (sequential).
*/
class AlterTableRecoverPartitionsSuite
extends AlterTableRecoverPartitionsSuiteBase
with CommandSuiteBase {
override protected def sparkConf = super.sparkConf
.set(RDD_PARALLEL_LISTING_THRESHOLD, 0)
}
/**
* The class contains tests for the `ALTER TABLE .. RECOVER PARTITIONS` command to check
* V1 In-Memory table catalog (parallel).
*/
class AlterTableRecoverPartitionsParallelSuite
extends AlterTableRecoverPartitionsSuiteBase
with CommandSuiteBase {
override protected def sparkConf = super.sparkConf
.set(RDD_PARALLEL_LISTING_THRESHOLD, 10)
}

View file

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.v2
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.command
/**
* The class contains tests for the `ALTER TABLE .. RECOVER PARTITIONS` command
* to check V2 table catalogs.
*/
class AlterTableRecoverPartitionsSuite
extends command.AlterTableRecoverPartitionsSuiteBase
with CommandSuiteBase {
test("partition recovering of v2 tables is not supported") {
withNamespaceAndTable("ns", "tbl") { t =>
spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
}.getMessage
assert(errMsg.contains("ALTER TABLE ... RECOVER PARTITIONS is not supported for v2 tables"))
}
}
}

View file

@ -15,20 +15,14 @@
* limitations under the License.
*/
package org.apache.spark.sql.connector
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.command.v1
class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
test("ALTER TABLE RECOVER PARTITIONS") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
}
assert(e.message.contains(
"ALTER TABLE ... RECOVER PARTITIONS is not supported for v2 tables."))
}
}
}
/**
* The class contains tests for the `ALTER TABLE .. RECOVER PARTITIONS` command to check
* V1 Hive external table catalog.
*/
class AlterTableRecoverPartitionsSuite
extends v1.AlterTableRecoverPartitionsSuiteBase
with CommandSuiteBase