[SPARK-33789][SQL][TESTS] Refactor unified V1 and V2 datasource tests

### What changes were proposed in this pull request?
1. Move common utility functions such as `test()`, `withNsTable()` and `checkPartitions()` to `DDLCommandTestUtils`.
2. Place common settings such as `version`, `catalog`, `defaultUsing`, `sparkConf` to `CommandSuiteBase`.

### Why are the changes needed?
To improve code maintenance of the unified tests.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *ShowPartitionsSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *ShowTablesSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableAddPartitionSuite"
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite"
```

Closes #30779 from MaxGekk/refactor-unified-tests.

Lead-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Max Gekk 2020-12-16 13:49:49 +00:00 committed by Wenchen Fan
parent 7845865b8d
commit 9d9d4a8e12
20 changed files with 274 additions and 334 deletions

View file

@ -17,50 +17,18 @@
package org.apache.spark.sql.execution.command
import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
protected def version: String
protected def catalog: String
protected def defaultUsing: String
trait AlterTableAddPartitionSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "ALTER TABLE .. ADD PARTITION"
override def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
super.test(s"ALTER TABLE .. ADD PARTITION $version: " + testName, testTags: _*)(testFun)
}
protected def checkPartitions(t: String, expected: Map[String, String]*): Unit = {
val partitions = sql(s"SHOW PARTITIONS $t")
.collect()
.toSet
.map((row: Row) => row.getString(0))
.map(PartitioningUtils.parsePathFragment)
assert(partitions === expected.toSet)
}
protected def checkLocation(t: String, spec: TablePartitionSpec, expected: String): Unit
protected def withNsTable(ns: String, tableName: String, cat: String = catalog)
(f: String => Unit): Unit = {
val nsCat = s"$cat.$ns"
withNamespace(nsCat) {
sql(s"CREATE NAMESPACE $nsCat")
val t = s"$nsCat.$tableName"
withTable(t) {
f(t)
}
}
}
test("one partition") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
Seq("", "IF NOT EXISTS").foreach { exists =>
sql(s"ALTER TABLE $t ADD $exists PARTITION (id=1) LOCATION 'loc'")
@ -72,7 +40,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("multiple partitions") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
Seq("", "IF NOT EXISTS").foreach { exists =>
sql(s"""
@ -88,7 +56,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("multi-part partition") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, a int, b string) $defaultUsing PARTITIONED BY (a, b)")
Seq("", "IF NOT EXISTS").foreach { exists =>
sql(s"ALTER TABLE $t ADD $exists PARTITION (a=2, b='abc')")
@ -99,7 +67,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("table to alter does not exist") {
withNsTable("ns", "does_not_exist") { t =>
withNamespaceAndTable("ns", "does_not_exist") { t =>
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (a='4', b='9')")
}.getMessage
@ -108,7 +76,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("case sensitivity in resolving partition specs") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val errMsg = intercept[AnalysisException] {
@ -125,7 +93,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("SPARK-33521: universal type conversions of partition values") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"""
|CREATE TABLE $t (
| id int,
@ -173,7 +141,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("SPARK-33676: not fully specified partition spec") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"""
|CREATE TABLE $t (id bigint, part0 int, part1 string)
|$defaultUsing
@ -187,7 +155,7 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("partition already exists") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")

View file

@ -17,48 +17,15 @@
package org.apache.spark.sql.execution.command
import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
protected def version: String
protected def catalog: String
protected def defaultUsing: String
trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "ALTER TABLE .. DROP PARTITION"
protected def notFullPartitionSpecErr: String
override def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
super.test(s"ALTER TABLE .. DROP PARTITION $version: " + testName, testTags: _*)(testFun)
}
protected def withNsTable(ns: String, tableName: String, cat: String = catalog)
(f: String => Unit): Unit = {
val nsCat = s"$cat.$ns"
withNamespace(nsCat) {
sql(s"CREATE NAMESPACE $nsCat")
val t = s"$nsCat.$tableName"
withTable(t) {
f(t)
}
}
}
protected def checkPartitions(t: String, expected: Map[String, String]*): Unit = {
val partitions = sql(s"SHOW PARTITIONS $t")
.collect()
.toSet
.map((row: Row) => row.getString(0))
.map(PartitioningUtils.parsePathFragment)
assert(partitions === expected.toSet)
}
protected def checkDropPartition(
t: String,
ifExists: String,
@ -75,7 +42,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("single partition") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
Seq("", "IF EXISTS").foreach { ifExists =>
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")
@ -85,7 +52,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("multiple partitions") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
Seq("", "IF EXISTS").foreach { ifExists =>
sql(s"""
@ -98,7 +65,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("multi-part partition") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, a int, b string) $defaultUsing PARTITIONED BY (a, b)")
Seq("", "IF EXISTS").foreach { ifExists =>
sql(s"ALTER TABLE $t ADD PARTITION (a = 2, b = 'abc')")
@ -108,7 +75,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("table to alter does not exist") {
withNsTable("ns", "does_not_exist") { t =>
withNamespaceAndTable("ns", "does_not_exist") { t =>
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (a='4', b='9')")
}.getMessage
@ -117,7 +84,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("case sensitivity in resolving partition specs") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val errMsg = intercept[AnalysisException] {
@ -136,7 +103,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("SPARK-33676: not fully specified partition spec") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"""
|CREATE TABLE $t (id bigint, part0 int, part1 string)
|$defaultUsing
@ -149,7 +116,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with SQLTestUtils {
}
test("partition not exists") {
withNsTable("ns", "tbl") { t =>
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)")
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")

View file

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.test.SQLTestUtils
trait DDLCommandTestUtils extends SQLTestUtils {
// The version of the catalog under testing such as "V1", "V2", "Hive V1".
protected def version: String
// Name of the command as SQL statement, for instance "SHOW PARTITIONS"
protected def command: String
protected def catalog: String
protected def defaultUsing: String
override def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
super.test(s"$command $version: " + testName, testTags: _*)(testFun)
}
protected def withNamespaceAndTable(ns: String, tableName: String, cat: String = catalog)
(f: String => Unit): Unit = {
val nsCat = s"$cat.$ns"
withNamespace(nsCat) {
sql(s"CREATE NAMESPACE $nsCat")
val t = s"$nsCat.$tableName"
withTable(t) {
f(t)
}
}
}
protected def checkPartitions(t: String, expected: Map[String, String]*): Unit = {
val partitions = sql(s"SHOW PARTITIONS $t")
.collect()
.toSet
.map((row: Row) => row.getString(0))
.map(PartitioningUtils.parsePathFragment)
assert(partitions === expected.toSet)
}
}

View file

@ -17,18 +17,12 @@
package org.apache.spark.sql.execution.command
import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.{StringType, StructType}
trait ShowPartitionsSuiteBase extends QueryTest with SQLTestUtils {
protected def version: String
protected def catalog: String
protected def defaultUsing: String
trait ShowPartitionsSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "SHOW PARTITIONS"
// Gets the schema of `SHOW PARTITIONS`
private val showSchema: StructType = new StructType().add("partition", StringType, false)
protected def runShowPartitionsSql(sqlText: String, expected: Seq[Row]): Unit = {
@ -37,11 +31,6 @@ trait ShowPartitionsSuiteBase extends QueryTest with SQLTestUtils {
checkAnswer(df, expected)
}
override def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
super.test(s"SHOW PARTITIONS $version: " + testName, testTags: _*)(testFun)
}
protected def createDateTable(table: String): Unit = {
sql(s"""
|CREATE TABLE $table (price int, qty int, year int, month int)
@ -72,122 +61,94 @@ trait ShowPartitionsSuiteBase extends QueryTest with SQLTestUtils {
}
test("show partitions of non-partitioned table") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val table = s"$catalog.ns.not_partitioned_table"
withTable(table) {
sql(s"CREATE TABLE $table (col1 int) $defaultUsing")
val errMsg = intercept[AnalysisException] {
sql(s"SHOW PARTITIONS $table")
}.getMessage
assert(errMsg.contains("not allowed on a table that is not partitioned"))
}
withNamespaceAndTable("ns", "not_partitioned_table") { t =>
sql(s"CREATE TABLE $t (col1 int) $defaultUsing")
val errMsg = intercept[AnalysisException] {
sql(s"SHOW PARTITIONS $t")
}.getMessage
assert(errMsg.contains("not allowed on a table that is not partitioned"))
}
}
test("non-partitioning columns") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val table = s"$catalog.ns.dateTable"
withTable(table) {
createDateTable(table)
val errMsg = intercept[AnalysisException] {
sql(s"SHOW PARTITIONS $table PARTITION(abcd=2015, xyz=1)")
}.getMessage
assert(errMsg.contains("abcd is not a valid partition column"))
}
withNamespaceAndTable("ns", "dateTable") { t =>
createDateTable(t)
val errMsg = intercept[AnalysisException] {
sql(s"SHOW PARTITIONS $t PARTITION(abcd=2015, xyz=1)")
}.getMessage
assert(errMsg.contains("abcd is not a valid partition column"))
}
}
test("show everything") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val table = s"$catalog.ns.dateTable"
withTable(table) {
createDateTable(table)
runShowPartitionsSql(
s"show partitions $table",
Row("year=2015/month=1") ::
Row("year=2015/month=2") ::
Row("year=2016/month=2") ::
Row("year=2016/month=3") :: Nil)
}
withNamespaceAndTable("ns", "dateTable") { t =>
createDateTable(t)
runShowPartitionsSql(
s"show partitions $t",
Row("year=2015/month=1") ::
Row("year=2015/month=2") ::
Row("year=2016/month=2") ::
Row("year=2016/month=3") :: Nil)
}
}
test("filter by partitions") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val table = s"$catalog.ns.dateTable"
withTable(table) {
createDateTable(table)
runShowPartitionsSql(
s"show partitions $table PARTITION(year=2015)",
Row("year=2015/month=1") ::
Row("year=2015/month=2") :: Nil)
runShowPartitionsSql(
s"show partitions $table PARTITION(year=2015, month=1)",
Row("year=2015/month=1") :: Nil)
runShowPartitionsSql(
s"show partitions $table PARTITION(month=2)",
Row("year=2015/month=2") ::
Row("year=2016/month=2") :: Nil)
}
withNamespaceAndTable("ns", "dateTable") { t =>
createDateTable(t)
runShowPartitionsSql(
s"show partitions $t PARTITION(year=2015)",
Row("year=2015/month=1") ::
Row("year=2015/month=2") :: Nil)
runShowPartitionsSql(
s"show partitions $t PARTITION(year=2015, month=1)",
Row("year=2015/month=1") :: Nil)
runShowPartitionsSql(
s"show partitions $t PARTITION(month=2)",
Row("year=2015/month=2") ::
Row("year=2016/month=2") :: Nil)
}
}
test("show everything more than 5 part keys") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val table = s"$catalog.ns.wideTable"
withTable(table) {
createWideTable(table)
runShowPartitionsSql(
s"show partitions $table",
Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") ::
Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil)
}
withNamespaceAndTable("ns", "wideTable") { t =>
createWideTable(t)
runShowPartitionsSql(
s"show partitions $t",
Row("year=2016/month=3/hour=10/minute=10/sec=10/extra=1") ::
Row("year=2016/month=4/hour=10/minute=10/sec=10/extra=1") :: Nil)
}
}
test("SPARK-33667: case sensitivity of partition spec") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val t = s"$catalog.ns.part_table"
withTable(t) {
sql(s"""
|CREATE TABLE $t (price int, qty int, year int, month int)
|$defaultUsing
|PARTITIONED BY (year, month)""".stripMargin)
sql(s"INSERT INTO $t PARTITION(year = 2015, month = 1) SELECT 1, 1")
Seq(
true -> "PARTITION(year = 2015, month = 1)",
false -> "PARTITION(YEAR = 2015, Month = 1)"
).foreach { case (caseSensitive, partitionSpec) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
runShowPartitionsSql(
s"SHOW PARTITIONS $t $partitionSpec",
Row("year=2015/month=1") :: Nil)
}
withNamespaceAndTable("ns", "part_table") { t =>
sql(s"""
|CREATE TABLE $t (price int, qty int, year int, month int)
|$defaultUsing
|PARTITIONED BY (year, month)""".stripMargin)
sql(s"INSERT INTO $t PARTITION(year = 2015, month = 1) SELECT 1, 1")
Seq(
true -> "PARTITION(year = 2015, month = 1)",
false -> "PARTITION(YEAR = 2015, Month = 1)"
).foreach { case (caseSensitive, partitionSpec) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
runShowPartitionsSql(
s"SHOW PARTITIONS $t $partitionSpec",
Row("year=2015/month=1") :: Nil)
}
}
}
}
test("SPARK-33777: sorted output") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val table = s"$catalog.ns.dateTable"
withTable(table) {
sql(s"""
|CREATE TABLE $table (id int, part string)
|$defaultUsing
|PARTITIONED BY (part)""".stripMargin)
sql(s"ALTER TABLE $table ADD PARTITION(part = 'b')")
sql(s"ALTER TABLE $table ADD PARTITION(part = 'a')")
val partitions = sql(s"show partitions $table")
assert(partitions.first().getString(0) === "part=a")
}
withNamespaceAndTable("ns", "dateTable") { t =>
sql(s"""
|CREATE TABLE $t (id int, part string)
|$defaultUsing
|PARTITIONED BY (part)""".stripMargin)
sql(s"ALTER TABLE $t ADD PARTITION(part = 'b')")
sql(s"ALTER TABLE $t ADD PARTITION(part = 'a')")
val partitions = sql(s"show partitions $t")
assert(partitions.first().getString(0) === "part=a")
}
}
}

View file

@ -17,21 +17,15 @@
package org.apache.spark.sql.execution.command
import org.scalactic.source.Position
import org.scalatest.Tag
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
trait ShowTablesSuiteBase extends QueryTest with SQLTestUtils {
protected def version: String
protected def catalog: String
trait ShowTablesSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "SHOW TABLES"
protected def defaultNamespace: Seq[String]
protected def defaultUsing: String
case class ShowRow(namespace: String, table: String, isTemporary: Boolean)
protected def getRows(showRows: Seq[ShowRow]): Seq[Row]
// Gets the schema of `SHOW TABLES`
@ -43,18 +37,10 @@ trait ShowTablesSuiteBase extends QueryTest with SQLTestUtils {
checkAnswer(df, getRows(expected))
}
override def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
super.test(s"SHOW TABLES $version: " + testName, testTags: _*)(testFun)
}
test("show an existing table") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
withTable(s"$catalog.ns.table") {
sql(s"CREATE TABLE $catalog.ns.table (name STRING, id INT) $defaultUsing")
runShowTablesSql(s"SHOW TABLES IN $catalog.ns", Seq(ShowRow("ns", "table", false)))
}
withNamespaceAndTable("ns", "table") { t =>
sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
runShowTablesSql(s"SHOW TABLES IN $catalog.ns", Seq(ShowRow("ns", "table", false)))
}
}
@ -117,20 +103,17 @@ trait ShowTablesSuiteBase extends QueryTest with SQLTestUtils {
}
test("change current catalog and namespace with USE statements") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
withTable(s"$catalog.ns.table") {
sql(s"CREATE TABLE $catalog.ns.table (name STRING, id INT) $defaultUsing")
withNamespaceAndTable("ns", "table") { t =>
sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing")
sql(s"USE $catalog")
// No table is matched since the current namespace is not ["ns"]
assert(defaultNamespace != Seq("ns"))
runShowTablesSql("SHOW TABLES", Seq())
sql(s"USE $catalog")
// No table is matched since the current namespace is not ["ns"]
assert(defaultNamespace != Seq("ns"))
runShowTablesSql("SHOW TABLES", Seq())
// Update the current namespace to match "ns.tbl".
sql(s"USE $catalog.ns")
runShowTablesSql("SHOW TABLES", Seq(ShowRow("ns", "table", false)))
}
// Update the current namespace to match "ns.tbl".
sql(s"USE $catalog.ns")
runShowTablesSql("SHOW TABLES", Seq(ShowRow("ns", "table", false)))
}
}
}

View file

@ -18,15 +18,9 @@
package org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.test.SharedSparkSession
trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase {
override def version: String = "V1"
override def catalog: String = CatalogManager.SESSION_CATALOG_NAME
override def defaultUsing: String = "USING parquet"
override protected def checkLocation(
t: String,
spec: TablePartitionSpec,
@ -43,4 +37,4 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
}
}
class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession
class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with CommandSuiteBase

View file

@ -17,18 +17,12 @@
package org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.test.SharedSparkSession
trait AlterTableDropPartitionSuiteBase extends command.AlterTableDropPartitionSuiteBase {
override def version: String = "V1"
override def catalog: String = CatalogManager.SESSION_CATALOG_NAME
override def defaultUsing: String = "USING parquet"
override protected val notFullPartitionSpecErr = "The following partitions not found in table"
}
class AlterTableDropPartitionSuite
extends AlterTableDropPartitionSuiteBase
with SharedSparkSession
with CommandSuiteBase

View file

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.test.SharedSparkSession
trait CommandSuiteBase extends SharedSparkSession {
def version: String = "V1"
def catalog: String = CatalogManager.SESSION_CATALOG_NAME
def defaultUsing: String = "USING parquet"
}

View file

@ -18,15 +18,9 @@
package org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.test.SharedSparkSession
trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase {
override def version: String = "V1"
override def catalog: String = CatalogManager.SESSION_CATALOG_NAME
override def defaultUsing: String = "USING parquet"
test("show everything in the default database") {
val table = "dateTable"
withTable(table) {
@ -69,7 +63,7 @@ trait ShowPartitionsSuiteBase extends command.ShowPartitionsSuiteBase {
}
}
class ShowPartitionsSuite extends ShowPartitionsSuiteBase with SharedSparkSession {
class ShowPartitionsSuite extends ShowPartitionsSuiteBase with CommandSuiteBase {
// The test is placed here because it fails with `USING HIVE`:
// org.apache.spark.sql.AnalysisException:
// Hive data source can only be used with tables, you can't use it with CREATE TEMP VIEW USING

View file

@ -18,17 +18,12 @@
package org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{BooleanType, StringType, StructType}
trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase {
override def version: String = "V1"
override def catalog: String = CatalogManager.SESSION_CATALOG_NAME
override def defaultNamespace: Seq[String] = Seq("default")
override def defaultUsing: String = "USING parquet"
override def showSchema: StructType = {
new StructType()
.add("database", StringType, nullable = false)
@ -87,31 +82,27 @@ trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase {
}
test("case sensitivity of partition spec") {
withNamespace(s"$catalog.ns") {
sql(s"CREATE NAMESPACE $catalog.ns")
val t = s"$catalog.ns.part_table"
withTable(t) {
sql(s"""
|CREATE TABLE $t (price int, qty int, year int, month int)
|$defaultUsing
|partitioned by (year, month)""".stripMargin)
sql(s"INSERT INTO $t PARTITION(year = 2015, month = 1) SELECT 1, 1")
Seq(
true -> "PARTITION(year = 2015, month = 1)",
false -> "PARTITION(YEAR = 2015, Month = 1)"
).foreach { case (caseSensitive, partitionSpec) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val df = sql(s"SHOW TABLE EXTENDED LIKE 'part_table' $partitionSpec")
val information = df.select("information").first().getString(0)
assert(information.contains("Partition Values: [year=2015, month=1]"))
}
withNamespaceAndTable("ns", "part_table") { t =>
sql(s"""
|CREATE TABLE $t (price int, qty int, year int, month int)
|$defaultUsing
|partitioned by (year, month)""".stripMargin)
sql(s"INSERT INTO $t PARTITION(year = 2015, month = 1) SELECT 1, 1")
Seq(
true -> "PARTITION(year = 2015, month = 1)",
false -> "PARTITION(YEAR = 2015, Month = 1)"
).foreach { case (caseSensitive, partitionSpec) =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
val df = sql(s"SHOW TABLE EXTENDED LIKE 'part_table' $partitionSpec")
val information = df.select("information").first().getString(0)
assert(information.contains("Partition Values: [year=2015, month=1]"))
}
}
}
}
}
class ShowTablesSuite extends ShowTablesSuiteBase with SharedSparkSession {
class ShowTablesSuite extends ShowTablesSuiteBase with CommandSuiteBase {
test("SPARK-33670: show partitions from a datasource table") {
import testImplicits._
withNamespace(s"$catalog.ns") {

View file

@ -17,29 +17,19 @@
package org.apache.spark.sql.execution.command.v2
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.InMemoryPartitionTable
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.test.SharedSparkSession
class AlterTableAddPartitionSuite
extends command.AlterTableAddPartitionSuiteBase
with SharedSparkSession {
with CommandSuiteBase {
import CatalogV2Implicits._
override def version: String = "V2"
override def catalog: String = "test_catalog"
override def defaultUsing: String = "USING _"
override def sparkConf: SparkConf = super.sparkConf
.set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
.set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName)
override protected def checkLocation(
t: String,
spec: TablePartitionSpec,
@ -61,7 +51,7 @@ class AlterTableAddPartitionSuite
}
test("SPARK-33650: add partition into a table which doesn't support partition management") {
withNsTable("ns", "tbl", s"non_part_$catalog") { t =>
withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t ADD PARTITION (id=1)")

View file

@ -17,28 +17,17 @@
package org.apache.spark.sql.execution.command.v2
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.{InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.test.SharedSparkSession
class AlterTableDropPartitionSuite
extends command.AlterTableDropPartitionSuiteBase
with SharedSparkSession {
override def version: String = "V2"
override def catalog: String = "test_catalog"
override def defaultUsing: String = "USING _"
with CommandSuiteBase {
override protected val notFullPartitionSpecErr = "Partition spec is invalid"
override def sparkConf: SparkConf = super.sparkConf
.set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
.set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName)
test("SPARK-33650: drop partition into a table which doesn't support partition management") {
withNsTable("ns", "tbl", s"non_part_$catalog") { t =>
withNamespaceAndTable("ns", "tbl", s"non_part_$catalog") { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (id=1)")

View file

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command.v2
import org.apache.spark.SparkConf
import org.apache.spark.sql.connector.{InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.test.SharedSparkSession
trait CommandSuiteBase extends SharedSparkSession {
def version: String = "V2"
def catalog: String = "test_catalog"
def defaultUsing: String = "USING _"
override def sparkConf: SparkConf = super.sparkConf
.set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
.set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName)
}

View file

@ -17,21 +17,10 @@
package org.apache.spark.sql.execution.command.v2
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.{InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.test.SharedSparkSession
class ShowPartitionsSuite extends command.ShowPartitionsSuiteBase with SharedSparkSession {
override def version: String = "V2"
override def catalog: String = "test_catalog"
override def defaultUsing: String = "USING _"
override def sparkConf: SparkConf = super.sparkConf
.set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
.set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName)
class ShowPartitionsSuite extends command.ShowPartitionsSuiteBase with CommandSuiteBase {
test("a table does not support partitioning") {
val table = s"non_part_$catalog.tab1"
withTable(table) {

View file

@ -17,18 +17,12 @@
package org.apache.spark.sql.execution.command.v2
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.connector.InMemoryTableCatalog
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructType}
class ShowTablesSuite extends command.ShowTablesSuiteBase with SharedSparkSession {
override def version: String = "V2"
override def catalog: String = "test_catalog"
class ShowTablesSuite extends command.ShowTablesSuiteBase with CommandSuiteBase {
override def defaultNamespace: Seq[String] = Nil
override def defaultUsing: String = "USING _"
override def showSchema: StructType = {
new StructType()
.add("namespace", StringType, nullable = false)
@ -40,9 +34,6 @@ class ShowTablesSuite extends command.ShowTablesSuiteBase with SharedSparkSessio
}
}
override def sparkConf: SparkConf = super.sparkConf
.set(s"spark.sql.catalog.$catalog", classOf[InMemoryTableCatalog].getName)
// The test fails for V1 catalog with the error:
// org.apache.spark.sql.AnalysisException:
// The namespace in session catalog must have exactly one name part: spark_catalog.n1.n2.db

View file

@ -18,11 +18,7 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.hive.test.TestHiveSingleton
class AlterTableAddPartitionSuite
extends v1.AlterTableAddPartitionSuiteBase
with TestHiveSingleton {
override def version: String = "Hive V1"
override def defaultUsing: String = "USING HIVE"
}
with CommandSuiteBase

View file

@ -18,12 +18,7 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.hive.test.TestHiveSingleton
class AlterTableDropPartitionSuite
extends v1.AlterTableDropPartitionSuiteBase
with TestHiveSingleton {
override def version: String = "Hive V1"
override def defaultUsing: String = "USING HIVE"
}
with CommandSuiteBase

View file

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.hive.test.TestHiveSingleton
trait CommandSuiteBase extends TestHiveSingleton {
def version: String = "Hive V1"
def catalog: String = CatalogManager.SESSION_CATALOG_NAME
def defaultUsing: String = "USING HIVE"
}

View file

@ -18,9 +18,5 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.hive.test.TestHiveSingleton
class ShowPartitionsSuite extends v1.ShowPartitionsSuiteBase with TestHiveSingleton {
override def version: String = "Hive V1"
override def defaultUsing: String = "USING HIVE"
}
class ShowPartitionsSuite extends v1.ShowPartitionsSuiteBase with CommandSuiteBase

View file

@ -18,9 +18,5 @@
package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.hive.test.TestHiveSingleton
class ShowTablesSuite extends v1.ShowTablesSuiteBase with TestHiveSingleton {
override def version: String = "Hive V1"
override def defaultUsing: String = "USING HIVE"
}
class ShowTablesSuite extends v1.ShowTablesSuiteBase with CommandSuiteBase