[SPARK-34209][SQL] Delegate table name validation to the session catalog

### What changes were proposed in this pull request?

Delegate table name validation to the session catalog

### Why are the changes needed?

Queerying of tables with nested namespaces.

### Does this PR introduce _any_ user-facing change?

SQL queries of nested namespace queries

### How was this patch tested?

Unit tests updated.

Closes #31427 from holdenk/SPARK-34209-delegate-table-name-validation-to-the-catalog.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
This commit is contained in:
Holden Karau 2021-02-09 10:15:16 -08:00
parent b2dc38b654
commit cf7a13c363
3 changed files with 38 additions and 27 deletions

View file

@ -57,14 +57,9 @@ private[sql] trait LookupCatalog extends Logging {
* Extract session catalog and identifier from a multi-part identifier.
*/
object SessionCatalogAndIdentifier {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] = parts match {
case CatalogAndIdentifier(catalog, ident) if CatalogV2Util.isSessionCatalog(catalog) =>
if (ident.namespace.length != 1) {
throw new AnalysisException(
s"The namespace in session catalog must have exactly one name part: ${parts.quoted}")
}
Some(catalog, ident)
case _ => None
}

View file

@ -2085,8 +2085,7 @@ class DataSourceV2SQLSuite
val e1 = intercept[AnalysisException] {
sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
assert(e1.message.contains("Unsupported function name 'default.ns1.ns2.fun'"))
}
test("SHOW FUNCTIONS not valid v1 namespace") {
@ -2107,8 +2106,7 @@ class DataSourceV2SQLSuite
val e1 = intercept[AnalysisException] {
sql("DROP FUNCTION default.ns1.ns2.fun")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
assert(e1.message.contains("Unsupported function name 'default.ns1.ns2.fun'"))
}
test("CREATE FUNCTION: only support session catalog") {
@ -2120,8 +2118,7 @@ class DataSourceV2SQLSuite
val e1 = intercept[AnalysisException] {
sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
assert(e1.message.contains("Unsupported function name 'default.ns1.ns2.fun'"))
}
test("REFRESH FUNCTION: only support session catalog") {
@ -2134,7 +2131,7 @@ class DataSourceV2SQLSuite
sql("REFRESH FUNCTION default.ns1.ns2.fun")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
"Unsupported function name 'default.ns1.ns2.fun'"))
}
test("global temp view should not be masked by v2 catalog") {
@ -2172,7 +2169,7 @@ class DataSourceV2SQLSuite
sql(s"CREATE TABLE $globalTempDB.ns1.ns2.tbl (id bigint, data string) USING json")
}
assert(e.message.contains(
"The namespace in session catalog must have exactly one name part: global_temp.ns1.ns2.tbl"))
"global_temp.ns1.ns2.tbl is not a valid TableIdentifier as it has more than 2 name parts."))
}
test("table name same as catalog can be used") {
@ -2201,10 +2198,29 @@ class DataSourceV2SQLSuite
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
val t = "spark_catalog.t"
def verify(sql: String): Unit = {
val e = intercept[AnalysisException](spark.sql(sql))
assert(e.message.contains(
s"The namespace in session catalog must have exactly one name part: $t"))
assert(e.message.contains(s"Table or view not found: $t"),
s"Error message did not contain expected text while evaluting $sql")
}
def verifyView(sql: String): Unit = {
val e = intercept[AnalysisException](spark.sql(sql))
assert(e.message.contains(s"View not found: $t"),
s"Error message did not contain expected text while evaluting $sql")
}
def verifyTable(sql: String): Unit = {
val e = intercept[AnalysisException](spark.sql(sql))
assert(e.message.contains(s"Table not found: $t"),
s"Error message did not contain expected text while evaluting $sql")
}
def verifyGeneric(sql: String): Unit = {
val e = intercept[AnalysisException](spark.sql(sql))
assert(e.message.contains(s"not found: $t"),
s"Error message did not contain expected text while evaluting $sql")
}
verify(s"select * from $t")
@ -2212,16 +2228,16 @@ class DataSourceV2SQLSuite
verify(s"REFRESH TABLE $t")
verify(s"DESCRIBE $t i")
verify(s"DROP TABLE $t")
verify(s"DROP VIEW $t")
verify(s"ANALYZE TABLE $t COMPUTE STATISTICS")
verify(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
verify(s"MSCK REPAIR TABLE $t")
verify(s"LOAD DATA INPATH 'filepath' INTO TABLE $t")
verify(s"SHOW CREATE TABLE $t")
verify(s"SHOW CREATE TABLE $t AS SERDE")
verify(s"CACHE TABLE $t")
verify(s"UNCACHE TABLE $t")
verify(s"TRUNCATE TABLE $t")
verifyView(s"DROP VIEW $t")
verifyGeneric(s"ANALYZE TABLE $t COMPUTE STATISTICS")
verifyGeneric(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
verifyTable(s"MSCK REPAIR TABLE $t")
verifyTable(s"LOAD DATA INPATH 'filepath' INTO TABLE $t")
verifyGeneric(s"SHOW CREATE TABLE $t")
verifyGeneric(s"SHOW CREATE TABLE $t AS SERDE")
verifyGeneric(s"CACHE TABLE $t")
verifyGeneric(s"UNCACHE TABLE $t")
verifyGeneric(s"TRUNCATE TABLE $t")
verify(s"SHOW COLUMNS FROM $t")
}
}
@ -2369,7 +2385,7 @@ class DataSourceV2SQLSuite
sql(s"CACHE TABLE $sessionCatalogName.v")
)
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: spark_catalog.v"))
"Table or view not found: spark_catalog.v"))
}
val e2 = intercept[AnalysisException] {
sql(s"CREATE TEMP VIEW $sessionCatalogName.v AS SELECT 1")

View file

@ -1789,7 +1789,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql("SHOW COLUMNS IN tbl FROM a.b.c")
}.getMessage
assert(message.contains(
"The namespace in session catalog must have exactly one name part: a.b.c.tbl"))
"Table or view not found: a.b.c.tbl"))
}
}