From 6e8a4626117f0cb5535875f7181f56350ad4f195 Mon Sep 17 00:00:00 2001 From: PengLei Date: Tue, 5 Oct 2021 17:23:24 +0800 Subject: [PATCH] [SPARK-36841][SQL] Add ansi syntax `set catalog xxx` to change the current catalog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? 1、Add the statement of `set catalog xxx` to change the current catalog 2、Retain the `USE` statement to change the current catalog 3、Forcible loading the new catalog when change the new catalog. ### Why are the changes needed? Ansi SQL use `SET CATALOG XXX` statement to change the catalog. [DISCUSS](https://github.com/apache/spark/pull/34030#issuecomment-925936538) set-catalog ### Does this PR introduce _any_ user-facing change? Yes, User can use `SET CATALOG XXX` to change the current catalog ### How was this patch tested? Add ut testcase Closes #34096 from Peng-Lei/set-catalog-statement. Authored-by: PengLei Signed-off-by: Wenchen Fan --- docs/sql-ref-ansi-compliance.md | 1 + .../spark/sql/catalyst/parser/SqlBase.g4 | 6 ++++ .../connector/catalog/CatalogManager.scala | 1 + .../spark/sql/execution/SparkSqlParser.scala | 13 ++++++++ .../execution/command/SetCatalogCommand.scala | 33 +++++++++++++++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 16 +++++++++ .../execution/command/DDLParserSuite.scala | 12 +++++++ 7 files changed, 82 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCatalogCommand.scala diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 3767fddbf0..cec4ad6afc 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -328,6 +328,7 @@ Below is a list of all the keywords in Spark SQL. |CASCADE|non-reserved|non-reserved|non-reserved| |CASE|reserved|non-reserved|reserved| |CAST|reserved|non-reserved|reserved| +|CATALOG|non-reserved|non-reserved|non-reserved| |CHANGE|non-reserved|non-reserved|non-reserved| |CHECK|reserved|non-reserved|reserved| |CLEAR|non-reserved|non-reserved|non-reserved| diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index bd9f923f7a..886810e269 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -107,6 +107,7 @@ statement : query #statementDefault | ctes? dmlStatementNoWith #dmlStatement | USE NAMESPACE? multipartIdentifier #use + | SET CATALOG (identifier | STRING) #setCatalog | CREATE namespace (IF NOT EXISTS)? multipartIdentifier (commentSpec | locationSpec | @@ -1034,6 +1035,8 @@ alterColumnAction | setOrDrop=(SET | DROP) NOT NULL ; + + // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL. // - Reserved keywords: // Keywords that are reserved and can't be used as identifiers for table, view, column, @@ -1061,6 +1064,7 @@ ansiNonReserved | BY | CACHE | CASCADE + | CATALOG | CHANGE | CLEAR | CLUSTER @@ -1290,6 +1294,7 @@ nonReserved | CASCADE | CASE | CAST + | CATALOG | CHANGE | CHECK | CLEAR @@ -1544,6 +1549,7 @@ CACHE: 'CACHE'; CASCADE: 'CASCADE'; CASE: 'CASE'; CAST: 'CAST'; +CATALOG: 'CATALOG'; CHANGE: 'CHANGE'; CHECK: 'CHECK'; CLEAR: 'CLEAR'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index ff0ad2397b..92e682dee3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -120,6 +120,7 @@ class CatalogManager( def setCurrentCatalog(catalogName: String): Unit = synchronized { // `setCurrentCatalog` is noop if it doesn't switch to a different catalog. if (currentCatalog.name() != catalogName) { + catalog(catalogName) _currentCatalogName = Some(catalogName) _currentNamespace = None // Reset the current database of v1 `SessionCatalog` when switching current catalog, so that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index db7701996e..21aed5f2e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -239,6 +239,19 @@ class SparkSqlAstBuilder extends AstBuilder { ShowCurrentNamespaceCommand() } + /** + * Create a [[SetCatalogCommand]] logical command. + */ + override def visitSetCatalog(ctx: SetCatalogContext): LogicalPlan = withOrigin(ctx) { + if (ctx.identifier() != null) { + SetCatalogCommand(ctx.identifier().getText) + } else if (ctx.STRING() != null) { + SetCatalogCommand(string(ctx.STRING())) + } else { + throw new IllegalStateException("Invalid catalog name") + } + } + /** * Converts a multi-part identifier to a TableIdentifier. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCatalogCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCatalogCommand.scala new file mode 100644 index 0000000000..c7d057de9c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCatalogCommand.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Attribute + +/** + * The command for `SET CATALOG XXX` + */ +case class SetCatalogCommand(catalogName: String) extends LeafRunnableCommand { + override def output: Seq[Attribute] = Seq.empty + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sessionState.catalogManager.setCurrentCatalog(catalogName) + Seq.empty + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a326b82dba..2784354a96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2916,6 +2916,22 @@ class DataSourceV2SQLSuite } } + test("SPARK-36481: Test for SET CATALOG statement") { + val catalogManager = spark.sessionState.catalogManager + assert(catalogManager.currentCatalog.name() == SESSION_CATALOG_NAME) + + sql("SET CATALOG testcat") + assert(catalogManager.currentCatalog.name() == "testcat") + + sql("SET CATALOG testcat2") + assert(catalogManager.currentCatalog.name() == "testcat2") + + val errMsg = intercept[CatalogNotFoundException] { + sql("SET CATALOG not_exist_catalog") + }.getMessage + assert(errMsg.contains("Catalog 'not_exist_catalog' plugin class not found")) + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 37e558ab0c..c1e37314ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -404,4 +404,16 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { assert(fileFormat6.locationUri.isEmpty) assert(provider6 == Some("ORC")) } + + test("SET CATALOG") { + comparePlans( + parser.parsePlan("SET CATALOG abc"), + SetCatalogCommand("abc")) + comparePlans( + parser.parsePlan("SET CATALOG 'a b c'"), + SetCatalogCommand("a b c")) + comparePlans( + parser.parsePlan("SET CATALOG `a b c`"), + SetCatalogCommand("a b c")) + } }