[SPARK-36841][SQL] Add ansi syntax set catalog xxx to change the current catalog

### What changes were proposed in this pull request?
1、Add the statement of `set catalog xxx` to change the current catalog
2、Retain the `USE` statement to change the current catalog
3、Forcible loading the new catalog when change the new catalog.

### Why are the changes needed?
Ansi SQL use `SET CATALOG XXX` statement to change the catalog.

[DISCUSS](https://github.com/apache/spark/pull/34030#issuecomment-925936538)

<img width="521" alt="set-catalog" src="https://user-images.githubusercontent.com/41178002/134658562-4e4dd879-b6e5-484c-9461-6345c3faaf2e.png">

### Does this PR introduce _any_ user-facing change?
Yes, User can use `SET CATALOG XXX` to change the current catalog

### How was this patch tested?
Add ut testcase

Closes #34096 from Peng-Lei/set-catalog-statement.

Authored-by: PengLei <peng.8lei@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
PengLei 2021-10-05 17:23:24 +08:00 committed by Wenchen Fan
parent fb919afac7
commit 6e8a462611
7 changed files with 82 additions and 0 deletions

View file

@ -328,6 +328,7 @@ Below is a list of all the keywords in Spark SQL.
|CASCADE|non-reserved|non-reserved|non-reserved|
|CASE|reserved|non-reserved|reserved|
|CAST|reserved|non-reserved|reserved|
|CATALOG|non-reserved|non-reserved|non-reserved|
|CHANGE|non-reserved|non-reserved|non-reserved|
|CHECK|reserved|non-reserved|reserved|
|CLEAR|non-reserved|non-reserved|non-reserved|

View file

@ -107,6 +107,7 @@ statement
: query #statementDefault
| ctes? dmlStatementNoWith #dmlStatement
| USE NAMESPACE? multipartIdentifier #use
| SET CATALOG (identifier | STRING) #setCatalog
| CREATE namespace (IF NOT EXISTS)? multipartIdentifier
(commentSpec |
locationSpec |
@ -1034,6 +1035,8 @@ alterColumnAction
| setOrDrop=(SET | DROP) NOT NULL
;
// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
// - Reserved keywords:
// Keywords that are reserved and can't be used as identifiers for table, view, column,
@ -1061,6 +1064,7 @@ ansiNonReserved
| BY
| CACHE
| CASCADE
| CATALOG
| CHANGE
| CLEAR
| CLUSTER
@ -1290,6 +1294,7 @@ nonReserved
| CASCADE
| CASE
| CAST
| CATALOG
| CHANGE
| CHECK
| CLEAR
@ -1544,6 +1549,7 @@ CACHE: 'CACHE';
CASCADE: 'CASCADE';
CASE: 'CASE';
CAST: 'CAST';
CATALOG: 'CATALOG';
CHANGE: 'CHANGE';
CHECK: 'CHECK';
CLEAR: 'CLEAR';

View file

@ -120,6 +120,7 @@ class CatalogManager(
def setCurrentCatalog(catalogName: String): Unit = synchronized {
// `setCurrentCatalog` is noop if it doesn't switch to a different catalog.
if (currentCatalog.name() != catalogName) {
catalog(catalogName)
_currentCatalogName = Some(catalogName)
_currentNamespace = None
// Reset the current database of v1 `SessionCatalog` when switching current catalog, so that

View file

@ -239,6 +239,19 @@ class SparkSqlAstBuilder extends AstBuilder {
ShowCurrentNamespaceCommand()
}
/**
* Create a [[SetCatalogCommand]] logical command.
*/
override def visitSetCatalog(ctx: SetCatalogContext): LogicalPlan = withOrigin(ctx) {
if (ctx.identifier() != null) {
SetCatalogCommand(ctx.identifier().getText)
} else if (ctx.STRING() != null) {
SetCatalogCommand(string(ctx.STRING()))
} else {
throw new IllegalStateException("Invalid catalog name")
}
}
/**
* Converts a multi-part identifier to a TableIdentifier.
*

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
/**
* The command for `SET CATALOG XXX`
*/
case class SetCatalogCommand(catalogName: String) extends LeafRunnableCommand {
override def output: Seq[Attribute] = Seq.empty
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalogManager.setCurrentCatalog(catalogName)
Seq.empty
}
}

View file

@ -2916,6 +2916,22 @@ class DataSourceV2SQLSuite
}
}
test("SPARK-36481: Test for SET CATALOG statement") {
val catalogManager = spark.sessionState.catalogManager
assert(catalogManager.currentCatalog.name() == SESSION_CATALOG_NAME)
sql("SET CATALOG testcat")
assert(catalogManager.currentCatalog.name() == "testcat")
sql("SET CATALOG testcat2")
assert(catalogManager.currentCatalog.name() == "testcat2")
val errMsg = intercept[CatalogNotFoundException] {
sql("SET CATALOG not_exist_catalog")
}.getMessage
assert(errMsg.contains("Catalog 'not_exist_catalog' plugin class not found"))
}
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")

View file

@ -404,4 +404,16 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
assert(fileFormat6.locationUri.isEmpty)
assert(provider6 == Some("ORC"))
}
test("SET CATALOG") {
comparePlans(
parser.parsePlan("SET CATALOG abc"),
SetCatalogCommand("abc"))
comparePlans(
parser.parsePlan("SET CATALOG 'a b c'"),
SetCatalogCommand("a b c"))
comparePlans(
parser.parsePlan("SET CATALOG `a b c`"),
SetCatalogCommand("a b c"))
}
}