[SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore

### What changes were proposed in this pull request?
So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore.

When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed.

After this PR, a few changes are made, as summarized below,
- `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`.
- `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`.
- `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded:

1. The following test cases only make sense for `InMemoryCatalog`:
```
  test("desc table for parquet data source table using in-memory catalog")
  test("create a managed Hive source table") {
  test("create an external Hive source table")
  test("Create Hive Table As Select")
```

2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed:
```
  test("alter table: set location (datasource table)")
  test("alter table: set properties (datasource table)")
  test("alter table: unset properties (datasource table)")
  test("alter table: set serde (datasource table)")
  test("alter table: set serde partition (datasource table)")
  test("alter table: change column (datasource table)")
  test("alter table: add partition (datasource table)")
  test("alter table: drop partition (datasource table)")
  test("alter table: rename partition (datasource table)")
  test("drop table - data source table")
```

**TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`,  `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>
Author: gatorsmile <gatorsmile@gmail.com>

Closes #16592 from gatorsmile/refactorDDLSuite.
This commit is contained in:
Xiao Li 2017-03-08 23:12:10 -08:00 committed by Wenchen Fan
parent d809ceed97
commit 09829be621
3 changed files with 345 additions and 273 deletions

View file

@ -30,23 +30,164 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
private val escapedIdentifier = "`(.+)`".r
class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with BeforeAndAfterEach {
override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
spark.sessionState.catalog.reset()
} finally {
Utils.deleteRecursively(new File("spark-warehouse"))
Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath))
super.afterEach()
}
}
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier): CatalogTable = {
val storage =
CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L,
tracksPartitionsInCatalog = true)
}
test("desc table for parquet data source table using in-memory catalog") {
val tabName = "tab1"
withTable(tabName) {
sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
checkAnswer(
sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
Row("a", "int", "test")
)
}
}
test("alter table: set location (datasource table)") {
testSetLocation(isDatasourceTable = true)
}
test("alter table: set properties (datasource table)") {
testSetProperties(isDatasourceTable = true)
}
test("alter table: unset properties (datasource table)") {
testUnsetProperties(isDatasourceTable = true)
}
test("alter table: set serde (datasource table)") {
testSetSerde(isDatasourceTable = true)
}
test("alter table: set serde partition (datasource table)") {
testSetSerdePartition(isDatasourceTable = true)
}
test("alter table: change column (datasource table)") {
testChangeColumn(isDatasourceTable = true)
}
test("alter table: add partition (datasource table)") {
testAddPartitions(isDatasourceTable = true)
}
test("alter table: drop partition (datasource table)") {
testDropPartitions(isDatasourceTable = true)
}
test("alter table: rename partition (datasource table)") {
testRenamePartitions(isDatasourceTable = true)
}
test("drop table - data source table") {
testDropTable(isDatasourceTable = true)
}
test("create a managed Hive source table") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tbl"
withTable(tabName) {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $tabName (i INT, j STRING)")
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE"))
}
}
test("create an external Hive source table") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
withTempDir { tempDir =>
val tabName = "tbl"
withTable(tabName) {
val e = intercept[AnalysisException] {
sql(
s"""
|CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|LOCATION '${tempDir.toURI}'
""".stripMargin)
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE"))
}
}
}
test("Create Hive Table As Select") {
import testImplicits._
withTable("t", "t1") {
var e = intercept[AnalysisException] {
sql("CREATE TABLE t SELECT 1 as a, 1 as b")
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
e = intercept[AnalysisException] {
sql("CREATE TABLE t SELECT a, b from t1")
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
}
}
}
abstract class DDLSuite extends QueryTest with SQLTestUtils {
protected def isUsingHiveMetastore: Boolean = {
spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive"
}
protected def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable
private val escapedIdentifier = "`(.+)`".r
protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table
private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
props.filterNot(p => Seq("serialization.format", "path").contains(p._1))
}
private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
assert(normalizeCatalogTable(actual) == normalizeCatalogTable(expected))
}
/**
* Strip backticks, if any, from the string.
*/
@ -75,33 +216,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
ignoreIfExists = false)
}
private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = {
val storage =
CatalogStorageFormat(
locationUri = Some(catalog.defaultTablePath(name)),
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
properties = Map())
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L,
tracksPartitionsInCatalog = true)
}
private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
catalog.createTable(generateTable(catalog, name), ignoreIfExists = false)
}
@ -115,6 +229,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}
private def getDBPath(dbName: String): URI = {
val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
new Path(warehousePath, s"$dbName.db").toUri
}
test("the qualified path of a database is stored in the catalog") {
val catalog = spark.sessionState.catalog
@ -138,11 +257,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
try {
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbName)
val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbName.db")
assert(db1 == CatalogDatabase(
dbName,
"",
expectedLocation,
getDBPath(dbName),
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbName))
@ -185,16 +303,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
expectedLocation,
getDBPath(dbNameWithoutBackTicks),
Map.empty))
intercept[DatabaseAlreadyExistsException] {
// TODO: HiveExternalCatalog should throw DatabaseAlreadyExistsException
val e = intercept[AnalysisException] {
sql(s"CREATE DATABASE $dbName")
}
}.getMessage
assert(e.contains(s"already exists"))
} finally {
catalog.reset()
}
@ -413,19 +532,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
test("desc table for parquet data source table using in-memory catalog") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tab1"
withTable(tabName) {
sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
checkAnswer(
sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
Row("a", "int", "test")
)
}
}
test("Alter/Describe Database") {
val catalog = spark.sessionState.catalog
val databaseNames = Seq("db1", "`database`")
@ -433,7 +539,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
val location = getDBPath(dbNameWithoutBackTicks)
sql(s"CREATE DATABASE $dbName")
@ -477,7 +583,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
var message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName")
}.getMessage
// TODO: Unify the exception.
if (isUsingHiveMetastore) {
assert(message.contains(s"NoSuchObjectException: $dbNameWithoutBackTicks"))
} else {
assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
}
message = intercept[AnalysisException] {
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
@ -506,7 +617,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val message = intercept[AnalysisException] {
sql(s"DROP DATABASE $dbName RESTRICT")
}.getMessage
// TODO: Unify the exception.
if (isUsingHiveMetastore) {
assert(message.contains(s"Database $dbName is not empty. One or more tables exist"))
} else {
assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
}
catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
@ -537,7 +653,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
createTable(catalog, tableIdent1)
val expectedTableIdent = tableIdent1.copy(database = Some("default"))
val expectedTable = generateTable(catalog, expectedTableIdent)
assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
}
test("create table in a specific db") {
@ -546,7 +662,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
createTable(catalog, tableIdent1)
val expectedTable = generateTable(catalog, tableIdent1)
assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
}
test("create table using") {
@ -731,52 +847,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testSetLocation(isDatasourceTable = false)
}
test("alter table: set location (datasource table)") {
testSetLocation(isDatasourceTable = true)
}
test("alter table: set properties") {
testSetProperties(isDatasourceTable = false)
}
test("alter table: set properties (datasource table)") {
testSetProperties(isDatasourceTable = true)
}
test("alter table: unset properties") {
testUnsetProperties(isDatasourceTable = false)
}
test("alter table: unset properties (datasource table)") {
testUnsetProperties(isDatasourceTable = true)
}
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde") {
testSetSerde(isDatasourceTable = false)
}
test("alter table: set serde (datasource table)") {
testSetSerde(isDatasourceTable = true)
}
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde partition") {
testSetSerdePartition(isDatasourceTable = false)
}
test("alter table: set serde partition (datasource table)") {
testSetSerdePartition(isDatasourceTable = true)
}
test("alter table: change column") {
testChangeColumn(isDatasourceTable = false)
}
test("alter table: change column (datasource table)") {
testChangeColumn(isDatasourceTable = true)
}
test("alter table: bucketing is not supported") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@ -805,10 +897,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testAddPartitions(isDatasourceTable = false)
}
test("alter table: add partition (datasource table)") {
testAddPartitions(isDatasourceTable = true)
}
test("alter table: recover partitions (sequential)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
@ -821,7 +909,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
private def testRecoverPartitions() {
protected def testRecoverPartitions() {
val catalog = spark.sessionState.catalog
// table to alter does not exist
intercept[AnalysisException] {
@ -860,8 +948,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
if (!isUsingHiveMetastore) {
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
} else {
// After ALTER TABLE, the statistics of the first partition is removed by Hive megastore
assert(catalog.getPartition(tableIdent, part1).parameters.get("numFiles").isEmpty)
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
}
} finally {
fs.delete(root, true)
}
@ -875,10 +969,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testDropPartitions(isDatasourceTable = false)
}
test("alter table: drop partition (datasource table)") {
testDropPartitions(isDatasourceTable = true)
}
test("alter table: drop partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")
}
@ -887,10 +977,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testRenamePartitions(isDatasourceTable = false)
}
test("alter table: rename partition (datasource table)") {
testRenamePartitions(isDatasourceTable = true)
}
test("show table extended") {
withTempView("show1a", "show2b") {
sql(
@ -971,11 +1057,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testDropTable(isDatasourceTable = false)
}
test("drop table - data source table") {
testDropTable(isDatasourceTable = true)
}
private def testDropTable(isDatasourceTable: Boolean): Unit = {
protected def testDropTable(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@ -1011,9 +1093,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
tableIdent: TableIdentifier): Unit = {
catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
provider = Some("csv")))
assert(catalog.getTableMetadata(tableIdent).provider == Some("csv"))
}
private def testSetProperties(isDatasourceTable: Boolean): Unit = {
protected def testSetProperties(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@ -1022,8 +1105,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
if (isUsingHiveMetastore) {
normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
} else {
catalog.getTableMetadata(tableIdent).properties
}
}
assert(getProps.isEmpty)
// set table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
@ -1038,7 +1125,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
protected def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@ -1047,8 +1134,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
def getProps: Map[String, String] = {
if (isUsingHiveMetastore) {
normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
} else {
catalog.getTableMetadata(tableIdent).properties
}
}
// unset table properties
sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
@ -1071,7 +1162,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(getProps == Map("x" -> "y"))
}
private def testSetLocation(isDatasourceTable: Boolean): Unit = {
protected def testSetLocation(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val partSpec = Map("a" -> "1", "b" -> "2")
@ -1082,24 +1173,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
convertToDatasourceTable(catalog, tableIdent)
}
assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
assert(normalizeSerdeProp(catalog.getTableMetadata(tableIdent).storage.properties).isEmpty)
assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty)
assert(
normalizeSerdeProp(catalog.getPartition(tableIdent, partSpec).storage.properties).isEmpty)
// Verify that the location is set to the expected string
def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = None): Unit = {
val storageFormat = spec
.map { s => catalog.getPartition(tableIdent, s).storage }
.getOrElse { catalog.getTableMetadata(tableIdent).storage }
if (isDatasourceTable) {
if (spec.isDefined) {
assert(storageFormat.properties.isEmpty)
// TODO(gatorsmile): fix the bug in alter table set location.
// if (isUsingHiveMetastore) {
// assert(storageFormat.properties.get("path") === expected)
// }
assert(storageFormat.locationUri === Some(expected))
} else {
assert(storageFormat.locationUri === Some(expected))
}
} else {
assert(storageFormat.locationUri === Some(expected))
}
}
// set table location
sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
@ -1124,7 +1212,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
private def testSetSerde(isDatasourceTable: Boolean): Unit = {
protected def testSetSerde(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
createDatabase(catalog, "dbx")
@ -1132,8 +1220,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
def checkSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
val serdeProp = catalog.getTableMetadata(tableIdent).storage.properties
if (isUsingHiveMetastore) {
assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
} else {
assert(serdeProp == expectedSerdeProps)
}
}
if (isUsingHiveMetastore) {
assert(catalog.getTableMetadata(tableIdent).storage.serde ==
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
}
checkSerdeProps(Map.empty[String, String])
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@ -1146,31 +1247,30 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e1.getMessage.contains("datasource"))
assert(e2.getMessage.contains("datasource"))
} else {
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop"))
assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
val newSerde = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
sql(s"ALTER TABLE dbx.tab1 SET SERDE '$newSerde'")
assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(newSerde))
checkSerdeProps(Map.empty[String, String])
val serde2 = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"
sql(s"ALTER TABLE dbx.tab1 SET SERDE '$serde2' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop"))
assert(catalog.getTableMetadata(tableIdent).storage.properties ==
Map("k" -> "v", "kay" -> "vee"))
assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(serde2))
checkSerdeProps(Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
assert(catalog.getTableMetadata(tableIdent).storage.properties ==
Map("k" -> "vvv", "kay" -> "vee"))
checkSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
assert(catalog.getTableMetadata(tableIdent).storage.properties ==
Map("k" -> "vvv", "kay" -> "veee"))
checkSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
}
}
private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
protected def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val spec = Map("a" -> "1", "b" -> "2")
@ -1183,8 +1283,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
def checkPartitionSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
val serdeProp = catalog.getPartition(tableIdent, spec).storage.properties
if (isUsingHiveMetastore) {
assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
} else {
assert(serdeProp == expectedSerdeProps)
}
}
if (isUsingHiveMetastore) {
assert(catalog.getPartition(tableIdent, spec).storage.serde ==
Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
}
checkPartitionSerdeProps(Map.empty[String, String])
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
@ -1199,26 +1312,23 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
} else {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop"))
assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
checkPartitionSerdeProps(Map.empty[String, String])
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop"))
assert(catalog.getPartition(tableIdent, spec).storage.properties ==
Map("k" -> "v", "kay" -> "vee"))
checkPartitionSerdeProps(Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
"SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
assert(catalog.getPartition(tableIdent, spec).storage.properties ==
Map("k" -> "vvv", "kay" -> "vee"))
checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
}
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
assert(catalog.getPartition(tableIdent, spec).storage.properties ==
Map("k" -> "vvv", "kay" -> "veee"))
checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
}
// table to alter does not exist
intercept[AnalysisException] {
@ -1226,7 +1336,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "5")
@ -1247,7 +1357,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
"PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(new URI("paris")))
val partitionLocation = if (isUsingHiveMetastore) {
val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
assert(tableLocation.isDefined)
makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
} else {
new URI("paris")
}
assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation))
assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
// add partitions without explicitly specifying database
@ -1277,7 +1395,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Set(part1, part2, part3, part4, part5))
}
private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "5")
@ -1330,7 +1448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(catalog.listPartitions(tableIdent).isEmpty)
}
private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "q")
@ -1374,7 +1492,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
}
private def testChangeColumn(isDatasourceTable: Boolean): Unit = {
protected def testChangeColumn(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val resolver = spark.sessionState.conf.resolver
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@ -1474,35 +1592,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
)
}
test("create a managed Hive source table") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tbl"
withTable(tabName) {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $tabName (i INT, j STRING)")
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE"))
}
}
test("create an external Hive source table") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
withTempDir { tempDir =>
val tabName = "tbl"
withTable(tabName) {
val e = intercept[AnalysisException] {
sql(
s"""
|CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|LOCATION '${tempDir.toURI}'
""".stripMargin)
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE"))
}
}
}
test("create a data source table without schema") {
import testImplicits._
withTempPath { tempDir =>
@ -1541,22 +1630,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
test("Create Hive Table As Select") {
import testImplicits._
withTable("t", "t1") {
var e = intercept[AnalysisException] {
sql("CREATE TABLE t SELECT 1 as a, 1 as b")
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
e = intercept[AnalysisException] {
sql("CREATE TABLE t SELECT a, b from t1")
}.getMessage
assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
}
}
test("Create Data Source Table As Select") {
import testImplicits._
withTable("t", "t1", "t2") {
@ -1580,7 +1653,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("drop default database") {
Seq("true", "false").foreach { caseSensitive =>
val caseSensitiveOptions = if (isUsingHiveMetastore) Seq("false") else Seq("true", "false")
caseSensitiveOptions.foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
var message = intercept[AnalysisException] {
sql("DROP DATABASE default")

View file

@ -306,6 +306,11 @@ private[sql] trait SQLTestUtils
val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(hadoopPath).toUri
}
def makeQualifiedPath(path: Path): URI = {
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
fs.makeQualified(path).toUri
}
}
private[sql] object SQLTestUtils {

View file

@ -27,16 +27,88 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
override def afterEach(): Unit = {
try {
// drop all databases, tables and functions after each test
spark.sessionState.catalog.reset()
} finally {
super.afterEach()
}
}
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier): CatalogTable = {
val storage =
CatalogStorageFormat(
locationUri = Some(catalog.defaultTablePath(name)),
inputFormat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
compressed = false,
properties = Map("serialization.format" -> "1"))
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
provider = Some("hive"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L,
tracksPartitionsInCatalog = true)
}
protected override def normalizeCatalogTable(table: CatalogTable): CatalogTable = {
val nondeterministicProps = Set(
"CreateTime",
"transient_lastDdlTime",
"grantTime",
"lastUpdateTime",
"last_modified_by",
"last_modified_time",
"Owner:",
"COLUMN_STATS_ACCURATE",
// The following are hive specific schema parameters which we do not need to match exactly.
"numFiles",
"numRows",
"rawDataSize",
"totalSize",
"totalNumberFiles",
"maxFileSize",
"minFileSize"
)
table.copy(
createTime = 0L,
lastAccessTime = 0L,
owner = "",
properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
// View texts are checked separately
viewText = None
)
}
}
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@ -1719,61 +1791,6 @@ class HiveDDLSuite
}
}
Seq("a b", "a:b", "a%b").foreach { specialChars =>
test(s"datasource table: location uri contains $specialChars") {
withTable("t", "t1") {
withTempDir { dir =>
val loc = new File(dir, specialChars)
loc.mkdir()
spark.sql(
s"""
|CREATE TABLE t(a string)
|USING parquet
|LOCATION '$loc'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == new Path(loc.getAbsolutePath).toUri)
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
spark.sql("INSERT INTO TABLE t SELECT 1")
assert(loc.listFiles().length >= 1)
checkAnswer(spark.table("t"), Row("1") :: Nil)
}
withTempDir { dir =>
val loc = new File(dir, specialChars)
loc.mkdir()
spark.sql(
s"""
|CREATE TABLE t1(a string, b string)
|USING parquet
|PARTITIONED BY(b)
|LOCATION '$loc'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == new Path(loc.getAbsolutePath).toUri)
assert(new Path(table.location).toString.contains(specialChars))
assert(loc.listFiles().isEmpty)
spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
val partFile = new File(loc, "b=2")
assert(partFile.listFiles().length >= 1)
checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
assert(!partFile1.exists())
val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
assert(partFile2.listFiles().length >= 1)
checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)
}
}
}
}
Seq("a b", "a:b", "a%b").foreach { specialChars =>
test(s"hive table: location uri contains $specialChars") {
withTable("t") {
@ -1848,28 +1865,4 @@ class HiveDDLSuite
}
}
}
Seq("a b", "a:b", "a%b").foreach { specialChars =>
test(s"location uri contains $specialChars for database") {
try {
withTable("t") {
withTempDir { dir =>
val loc = new File(dir, specialChars)
spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
spark.sql("USE tmpdb")
Seq(1).toDF("a").write.saveAsTable("t")
val tblloc = new File(loc, "t")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val tblPath = new Path(tblloc.getAbsolutePath)
val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
assert(tblloc.listFiles().nonEmpty)
}
}
} finally {
spark.sql("DROP DATABASE IF EXISTS tmpdb")
}
}
}
}