[SPARK-19723][SQL] create datasource table with an non-existent location should work

## What changes were proposed in this pull request?

This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583)

As we discussed in that [PR](https://github.com/apache/spark/pull/16938)

The following DDL for datasource table with an non-existent location should work:
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path
```
Currently it will throw exception that path not exists for datasource table for datasource table

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #17055 from windpiger/CTDataSourcePathNotExists.
This commit is contained in:
windpiger 2017-03-10 20:59:32 -08:00 committed by Wenchen Fan
parent fb9beda546
commit f6fdf92d0d
3 changed files with 113 additions and 103 deletions

View file

@ -73,7 +73,8 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
className = table.provider.get,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
// As discussed in SPARK-19583, we don't check if the location is existed
catalogTable = Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames

View file

@ -230,7 +230,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
private def getDBPath(dbName: String): URI = {
val warehousePath = makeQualifiedPath(s"${spark.sessionState.conf.warehousePath}")
val warehousePath = makeQualifiedPath(spark.sessionState.conf.warehousePath)
new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
}
@ -1899,7 +1899,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
test("insert data to a data source table which has a not existed location should succeed") {
test("insert data to a data source table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@ -1939,7 +1939,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
test("insert into a data source table with no existed partition location should succeed") {
test("insert into a data source table with a non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@ -1966,7 +1966,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
test("read data from a data source table which has a not existed location should succeed") {
test("read data from a data source table which has a non-existing location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@ -1994,7 +1994,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
test("read data from a data source table with no existed partition location should succeed") {
test("read data from a data source table with non-existing partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
@ -2016,48 +2016,72 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
test("create datasource table with a non-existing location") {
withTable("t", "t1") {
withTempPath { dir =>
spark.sql(s"CREATE TABLE t(a int, b int) USING parquet LOCATION '$dir'")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
spark.sql("INSERT INTO TABLE t SELECT 1, 2")
assert(dir.exists())
checkAnswer(spark.table("t"), Row(1, 2))
}
// partition table
withTempPath { dir =>
spark.sql(s"CREATE TABLE t1(a int, b int) USING parquet PARTITIONED BY(a) LOCATION '$dir'")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
val partDir = new File(dir, "a=1")
assert(partDir.exists())
checkAnswer(spark.table("t1"), Row(2, 1))
}
}
}
Seq(true, false).foreach { shouldDelete =>
val tcName = if (shouldDelete) "non-existent" else "existed"
val tcName = if (shouldDelete) "non-existing" else "existed"
test(s"CTAS for external data source table with a $tcName location") {
withTable("t", "t1") {
withTempDir {
dir =>
if (shouldDelete) {
dir.delete()
}
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
withTempDir { dir =>
if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
// partition table
withTempDir {
dir =>
if (shouldDelete) {
dir.delete()
}
spark.sql(
s"""
|CREATE TABLE t1
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
withTempDir { dir =>
if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t1
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
val partDir = new File(dir, "a=3")
assert(partDir.exists())
checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
}
}
}

View file

@ -1663,43 +1663,73 @@ class HiveDDLSuite
}
}
test("create hive table with a non-existing location") {
withTable("t", "t1") {
withTempPath { dir =>
spark.sql(s"CREATE TABLE t(a int, b int) USING hive LOCATION '$dir'")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
spark.sql("INSERT INTO TABLE t SELECT 1, 2")
assert(dir.exists())
checkAnswer(spark.table("t"), Row(1, 2))
}
// partition table
withTempPath { dir =>
spark.sql(
s"""
|CREATE TABLE t1(a int, b int)
|USING hive
|PARTITIONED BY(a)
|LOCATION '$dir'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
spark.sql("INSERT INTO TABLE t1 PARTITION(a=1) SELECT 2")
val partDir = new File(dir, "a=1")
assert(partDir.exists())
checkAnswer(spark.table("t1"), Row(2, 1))
}
}
}
Seq(true, false).foreach { shouldDelete =>
val tcName = if (shouldDelete) "non-existent" else "existed"
test(s"CTAS for external data source table with a $tcName location") {
val tcName = if (shouldDelete) "non-existing" else "existed"
test(s"CTAS for external hive table with a $tcName location") {
withTable("t", "t1") {
withTempDir {
dir =>
if (shouldDelete) {
dir.delete()
}
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
withTempDir { dir =>
if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|USING hive
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
// partition table
withTempDir {
dir =>
if (shouldDelete) {
dir.delete()
}
}
// partition table
withTempDir { dir =>
if (shouldDelete) dir.delete()
spark.sql(
s"""
|CREATE TABLE t1
|USING parquet
|USING hive
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
@ -1707,51 +1737,6 @@ class HiveDDLSuite
assert(partDir.exists())
checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
}
}
}
test(s"CTAS for external hive table with a $tcName location") {
withTable("t", "t1") {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
withTempDir {
dir =>
if (shouldDelete) {
dir.delete()
}
spark.sql(
s"""
|CREATE TABLE t
|USING hive
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
checkAnswer(spark.table("t"), Row(3, 4, 1, 2))
}
// partition table
withTempDir {
dir =>
if (shouldDelete) {
dir.delete()
}
spark.sql(
s"""
|CREATE TABLE t1
|USING hive
|PARTITIONED BY(a, b)
|LOCATION '$dir'
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
val partDir = new File(dir, "a=3")
assert(partDir.exists())
checkAnswer(spark.table("t1"), Row(1, 2, 3, 4))
}
}
}