[SPARK-33430][SQL] Support namespaces in JDBC v2 Table Catalog

### What changes were proposed in this pull request?
Add namespaces support in JDBC v2 Table Catalog by making ```JDBCTableCatalog``` extends```SupportsNamespaces```

### Why are the changes needed?
make v2 JDBC implementation complete

### Does this PR introduce _any_ user-facing change?
Yes. Add the following to  ```JDBCTableCatalog```

- listNamespaces
- listNamespaces(String[] namespace)
- namespaceExists(String[] namespace)
- loadNamespaceMetadata(String[] namespace)
- createNamespace
- alterNamespace
- dropNamespace

### How was this patch tested?
Add new docker tests

Closes #30473 from huaxingao/name_space.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Huaxin Gao 2020-12-04 07:23:35 +00:00 committed by Wenchen Fan
parent e02324f2dd
commit 15579ba1f8
5 changed files with 317 additions and 9 deletions

View file

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc.v2
import java.sql.Connection
import scala.collection.JavaConverters._
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.tags.DockerTest
/**
* To run this test suite for a specific version (e.g., postgres:13.0):
* {{{
* POSTGRES_DOCKER_IMAGE_NAME=postgres:13.0
* ./build/sbt -Pdocker-integration-tests "testOnly *v2.PostgresNamespaceSuite"
* }}}
*/
@DockerTest
class PostgresNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespaceTest {
override val db = new DatabaseOnDocker {
override val imageName = sys.env.getOrElse("POSTGRES_DOCKER_IMAGE_NAME", "postgres:13.0-alpine")
override val env = Map(
"POSTGRES_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort = 5432
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass"
}
val map = new CaseInsensitiveStringMap(
Map("url" -> db.getJdbcUrl(dockerIp, externalPort),
"driver" -> "org.postgresql.Driver").asJava)
catalog.initialize("postgresql", map)
override def dataPreparation(conn: Connection): Unit = {}
override def builtinNamespaces: Array[Array[String]] = {
Array(Array("information_schema"), Array("pg_catalog"), Array("public"))
}
}

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc.v2
import scala.collection.JavaConverters._
import org.apache.log4j.Level
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.NamespaceChange
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.tags.DockerTest
@DockerTest
private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession {
val catalog = new JDBCTableCatalog()
def builtinNamespaces: Array[Array[String]]
test("listNamespaces: basic behavior") {
catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava)
assert(catalog.listNamespaces() === Array(Array("foo")) ++ builtinNamespaces)
assert(catalog.listNamespaces(Array("foo")) === Array())
assert(catalog.namespaceExists(Array("foo")) === true)
val logAppender = new LogAppender("catalog comment")
withLogAppender(logAppender) {
catalog.alterNamespace(Array("foo"), NamespaceChange
.setProperty("comment", "comment for foo"))
catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment"))
}
val createCommentWarning = logAppender.loggingEvents
.filter(_.getLevel == Level.WARN)
.map(_.getRenderedMessage)
.exists(_.contains("catalog comment"))
assert(createCommentWarning === false)
catalog.dropNamespace(Array("foo"))
assert(catalog.namespaceExists(Array("foo")) === false)
assert(catalog.listNamespaces() === builtinNamespaces)
val msg = intercept[AnalysisException] {
catalog.listNamespaces(Array("foo"))
}.getMessage
assert(msg.contains("Namespace 'foo' not found"))
}
}

View file

@ -927,6 +927,55 @@ object JdbcUtils extends Logging {
}
}
/**
* Creates a namespace.
*/
def createNamespace(
conn: Connection,
options: JDBCOptions,
namespace: String,
comment: String): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, s"CREATE SCHEMA ${dialect.quoteIdentifier(namespace)}")
if (!comment.isEmpty) createNamespaceComment(conn, options, namespace, comment)
}
def createNamespaceComment(
conn: Connection,
options: JDBCOptions,
namespace: String,
comment: String): Unit = {
val dialect = JdbcDialects.get(options.url)
try {
executeStatement(
conn, options, dialect.getSchemaCommentQuery(namespace, comment))
} catch {
case e: Exception =>
logWarning("Cannot create JDBC catalog comment. The catalog comment will be ignored.")
}
}
def removeNamespaceComment(
conn: Connection,
options: JDBCOptions,
namespace: String): Unit = {
val dialect = JdbcDialects.get(options.url)
try {
executeStatement(conn, options, dialect.removeSchemaCommentQuery(namespace))
} catch {
case e: Exception =>
logWarning("Cannot drop JDBC catalog comment.")
}
}
/**
* Drops a namespace from the JDBC database.
*/
def dropNamespace(conn: Connection, options: JDBCOptions, namespace: String): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}")
}
private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = {
val statement = conn.createStatement
try {

View file

@ -17,13 +17,16 @@
package org.apache.spark.sql.execution.datasources.v2.jdbc
import java.sql.{Connection, SQLException}
import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JDBCRDD, JdbcUtils}
import org.apache.spark.sql.internal.SQLConf
@ -31,7 +34,8 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class JDBCTableCatalog extends TableCatalog with Logging {
class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
private var catalogName: String = null
private var options: JDBCOptions = _
@ -125,12 +129,12 @@ class JDBCTableCatalog extends TableCatalog with Logging {
if (!properties.isEmpty) {
properties.asScala.map {
case (k, v) => k match {
case "comment" => tableComment = v
case "provider" =>
case TableCatalog.PROP_COMMENT => tableComment = v
case TableCatalog.PROP_PROVIDER =>
throw new AnalysisException("CREATE TABLE ... USING ... is not supported in" +
" JDBC catalog.")
case "owner" => // owner is ignored. It is default to current user name.
case "location" =>
case TableCatalog.PROP_OWNER => // owner is ignored. It is default to current user name.
case TableCatalog.PROP_LOCATION =>
throw new AnalysisException("CREATE TABLE ... LOCATION ... is not supported in" +
" JDBC catalog.")
case _ => tableProperties = tableProperties + " " + s"$k $v"
@ -171,6 +175,132 @@ class JDBCTableCatalog extends TableCatalog with Logging {
}
}
override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
case Array(db) =>
withConnection { conn =>
val rs = conn.getMetaData.getSchemas(null, db)
while (rs.next()) {
if (rs.getString(1) == db) return true;
}
false
}
case _ => false
}
override def listNamespaces(): Array[Array[String]] = {
withConnection { conn =>
val schemaBuilder = ArrayBuilder.make[Array[String]]
val rs = conn.getMetaData.getSchemas()
while (rs.next()) {
schemaBuilder += Array(rs.getString(1))
}
schemaBuilder.result
}
}
override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
namespace match {
case Array() =>
listNamespaces()
case Array(_) if namespaceExists(namespace) =>
Array()
case _ =>
throw new NoSuchNamespaceException(namespace)
}
}
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
namespace match {
case Array(db) =>
if (!namespaceExists(namespace)) throw new NoSuchNamespaceException(db)
mutable.HashMap[String, String]().asJava
case _ =>
throw new NoSuchNamespaceException(namespace)
}
}
override def createNamespace(
namespace: Array[String],
metadata: util.Map[String, String]): Unit = namespace match {
case Array(db) if !namespaceExists(namespace) =>
var comment = ""
if (!metadata.isEmpty) {
metadata.asScala.map {
case (k, v) => k match {
case SupportsNamespaces.PROP_COMMENT => comment = v
case SupportsNamespaces.PROP_OWNER => // ignore
case SupportsNamespaces.PROP_LOCATION =>
throw new AnalysisException("CREATE NAMESPACE ... LOCATION ... is not supported in" +
" JDBC catalog.")
case _ =>
throw new AnalysisException(s"CREATE NAMESPACE with property $k is not supported in" +
" JDBC catalog.")
}
}
}
withConnection { conn =>
classifyException(s"Failed create name space: $db") {
JdbcUtils.createNamespace(conn, options, db, comment)
}
}
case Array(_) =>
throw new NamespaceAlreadyExistsException(namespace)
case _ =>
throw new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}")
}
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
namespace match {
case Array(db) =>
changes.foreach {
case set: NamespaceChange.SetProperty =>
if (set.property() == SupportsNamespaces.PROP_COMMENT) {
withConnection { conn =>
JdbcUtils.createNamespaceComment(conn, options, db, set.value)
}
} else {
throw new AnalysisException(s"SET NAMESPACE with property ${set.property} " +
"is not supported in JDBC catalog.")
}
case unset: NamespaceChange.RemoveProperty =>
if (unset.property() == SupportsNamespaces.PROP_COMMENT) {
withConnection { conn =>
JdbcUtils.removeNamespaceComment(conn, options, db)
}
} else {
throw new AnalysisException(s"Remove NAMESPACE property ${unset.property} " +
"is not supported in JDBC catalog.")
}
case _ =>
throw new AnalysisException(s"Unsupported NamespaceChange $changes in JDBC catalog.")
}
case _ =>
throw new NoSuchNamespaceException(namespace)
}
}
override def dropNamespace(namespace: Array[String]): Boolean = namespace match {
case Array(db) if namespaceExists(namespace) =>
if (listTables(Array(db)).nonEmpty) {
throw new IllegalStateException(s"Namespace ${namespace.quoted} is not empty")
}
withConnection { conn =>
classifyException(s"Failed drop name space: $db") {
JdbcUtils.dropNamespace(conn, options, db)
true
}
}
case _ =>
throw new NoSuchNamespaceException(namespace)
}
private def checkNamespace(namespace: Array[String]): Unit = {
// In JDBC there is no nested database/schema
if (namespace.length > 1) {

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc
import java.sql.{Connection, Date, SQLFeatureNotSupportedException, Timestamp}
import java.sql.{Connection, Date, Timestamp}
import scala.collection.mutable.ArrayBuilder
@ -232,7 +232,7 @@ abstract class JdbcDialect extends Serializable with Logging{
val name = updateNull.fieldNames
updateClause += getUpdateColumnNullabilityQuery(tableName, name(0), updateNull.nullable())
case _ =>
throw new SQLFeatureNotSupportedException(s"Unsupported TableChange $change")
throw new AnalysisException(s"Unsupported TableChange $change in JDBC catalog.")
}
}
updateClause.result()
@ -270,6 +270,14 @@ abstract class JdbcDialect extends Serializable with Logging{
s"COMMENT ON TABLE $table IS '$comment'"
}
def getSchemaCommentQuery(schema: String, comment: String): String = {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS '$comment'"
}
def removeSchemaCommentQuery(schema: String): String = {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL"
}
/**
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
* @param message The error message to be placed to the returned exception.