From 184074de2286475f60d3fd8a2299ac21b8893256 Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 22 Jul 2020 19:05:50 +0000 Subject: [PATCH] [SPARK-31999][SQL] Add REFRESH FUNCTION command ### What changes were proposed in this pull request? In Hive mode, permanent functions are shared with Hive metastore so that functions may be modified by other Hive client. With in long-lived spark scene, it's hard to update the change of function. Here are 2 reasons: * Spark cache the function in memory using `FunctionRegistry`. * User may not know the location or classname of udf when using `replace function`. Note that we use v2 command code path to add new command. ### Why are the changes needed? Give a easy way to make spark function registry sync with Hive metastore. Then we can call ``` refresh function functionName ``` ### Does this PR introduce _any_ user-facing change? Yes, new command. ### How was this patch tested? New UT. Closes #28840 from ulysses-you/SPARK-31999. Authored-by: ulysses Signed-off-by: Wenchen Fan --- docs/_data/menu-sql.yaml | 2 + docs/sql-ref-syntax-aux-cache-cache-table.md | 1 + docs/sql-ref-syntax-aux-cache-clear-cache.md | 1 + ...l-ref-syntax-aux-cache-refresh-function.md | 60 +++++++++++++++++++ .../sql-ref-syntax-aux-cache-refresh-table.md | 1 + docs/sql-ref-syntax-aux-cache-refresh.md | 1 + .../sql-ref-syntax-aux-cache-uncache-table.md | 1 + docs/sql-ref-syntax-aux-cache.md | 3 +- docs/sql-ref-syntax.md | 1 + .../spark/sql/catalyst/parser/SqlBase.g4 | 1 + .../sql/catalyst/analysis/Analyzer.scala | 6 ++ .../catalyst/analysis/v2ResolutionPlans.scala | 20 +++++++ .../sql/catalyst/catalog/SessionCatalog.scala | 8 +++ .../sql/catalyst/parser/AstBuilder.scala | 5 ++ .../catalyst/plans/logical/v2Commands.scala | 7 +++ .../catalog/CatalogV2Implicits.scala | 10 +++- .../sql/connector/catalog/LookupCatalog.scala | 29 ++++++++- .../sql/catalyst/parser/DDLParserSuite.scala | 11 +++- .../analysis/ResolveSessionCatalog.scala | 30 ++-------- .../sql/execution/command/functions.scala | 39 ++++++++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 13 ++++ .../sql/execution/command/DDLSuite.scala | 47 ++++++++++++++- 22 files changed, 265 insertions(+), 32 deletions(-) create mode 100644 docs/sql-ref-syntax-aux-cache-refresh-function.md diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml index 2d26326a00..ef5a53a69e 100644 --- a/docs/_data/menu-sql.yaml +++ b/docs/_data/menu-sql.yaml @@ -208,6 +208,8 @@ url: sql-ref-syntax-aux-cache-clear-cache.html - text: REFRESH TABLE url: sql-ref-syntax-aux-cache-refresh-table.html + - text: REFRESH FUNCTION + url: sql-ref-syntax-aux-cache-refresh-function.html - text: REFRESH url: sql-ref-syntax-aux-cache-refresh.html - text: DESCRIBE diff --git a/docs/sql-ref-syntax-aux-cache-cache-table.md b/docs/sql-ref-syntax-aux-cache-cache-table.md index fdef3d657d..8829016fc1 100644 --- a/docs/sql-ref-syntax-aux-cache-cache-table.md +++ b/docs/sql-ref-syntax-aux-cache-cache-table.md @@ -80,3 +80,4 @@ CACHE TABLE testCache OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM testDat * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-clear-cache.md b/docs/sql-ref-syntax-aux-cache-clear-cache.md index a27cd83c14..aae4e39600 100644 --- a/docs/sql-ref-syntax-aux-cache-clear-cache.md +++ b/docs/sql-ref-syntax-aux-cache-clear-cache.md @@ -41,3 +41,4 @@ CLEAR CACHE; * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-refresh-function.md b/docs/sql-ref-syntax-aux-cache-refresh-function.md new file mode 100644 index 0000000000..d91fc062eb --- /dev/null +++ b/docs/sql-ref-syntax-aux-cache-refresh-function.md @@ -0,0 +1,60 @@ +--- +layout: global +title: REFRESH FUNCTION +displayTitle: REFRESH FUNCTION +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Description + +`REFRESH FUNCTION` statement invalidates the cached function entry, which includes a class name +and resource location of the given function. The invalidated cache is populated right away. +Note that `REFRESH FUNCTION` only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. + +### Syntax + +```sql +REFRESH FUNCTION function_identifier +``` + +### Parameters + +* **function_identifier** + + Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, uses the current database. + + **Syntax:** `[ database_name. ] function_name` + +### Examples + +```sql +-- The cached entry of the function will be refreshed +-- The function is resolved from the current database as the function name is unqualified. +REFRESH FUNCTION func1; + +-- The cached entry of the function will be refreshed +-- The function is resolved from tempDB database as the function name is qualified. +REFRESH FUNCTION db1.func1; +``` + +### Related Statements + +* [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html) +* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) +* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) +* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) +* [REFRESH](sql-ref-syntax-aux-cache-refresh.html) diff --git a/docs/sql-ref-syntax-aux-cache-refresh-table.md b/docs/sql-ref-syntax-aux-cache-refresh-table.md index 8d4a804f88..cc35c0451d 100644 --- a/docs/sql-ref-syntax-aux-cache-refresh-table.md +++ b/docs/sql-ref-syntax-aux-cache-refresh-table.md @@ -57,3 +57,4 @@ REFRESH TABLE tempDB.view1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-refresh.md b/docs/sql-ref-syntax-aux-cache-refresh.md index b10e6fb47a..715bdcac3b 100644 --- a/docs/sql-ref-syntax-aux-cache-refresh.md +++ b/docs/sql-ref-syntax-aux-cache-refresh.md @@ -54,3 +54,4 @@ REFRESH "hdfs://path/to/table"; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) diff --git a/docs/sql-ref-syntax-aux-cache-uncache-table.md b/docs/sql-ref-syntax-aux-cache-uncache-table.md index 96a691e4c3..4456378cde 100644 --- a/docs/sql-ref-syntax-aux-cache-uncache-table.md +++ b/docs/sql-ref-syntax-aux-cache-uncache-table.md @@ -50,3 +50,4 @@ UNCACHE TABLE t1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache.md b/docs/sql-ref-syntax-aux-cache.md index 0ccb1c61a0..17a13e67e5 100644 --- a/docs/sql-ref-syntax-aux-cache.md +++ b/docs/sql-ref-syntax-aux-cache.md @@ -23,4 +23,5 @@ license: | * [UNCACHE TABLE statement](sql-ref-syntax-aux-cache-uncache-table.html) * [CLEAR CACHE statement](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE statement](sql-ref-syntax-aux-cache-refresh-table.html) - * [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html) \ No newline at end of file + * [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html) + * [REFRESH FUNCTION statement](sql-ref-syntax-aux-cache-refresh-function.html) diff --git a/docs/sql-ref-syntax.md b/docs/sql-ref-syntax.md index 4bf1858428..290523a2b1 100644 --- a/docs/sql-ref-syntax.md +++ b/docs/sql-ref-syntax.md @@ -83,6 +83,7 @@ Spark SQL is Apache Spark's module for working with structured data. The SQL Syn * [LIST JAR](sql-ref-syntax-aux-resource-mgmt-list-jar.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) + * [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) * [RESET](sql-ref-syntax-aux-conf-mgmt-reset.html) * [SET](sql-ref-syntax-aux-conf-mgmt-set.html) * [SHOW COLUMNS](sql-ref-syntax-aux-show-columns.html) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index bc7e982830..73dfdcc263 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -229,6 +229,7 @@ statement comment=(STRING | NULL) #commentNamespace | COMMENT ON TABLE multipartIdentifier IS comment=(STRING | NULL) #commentTable | REFRESH TABLE multipartIdentifier #refreshTable + | REFRESH FUNCTION multipartIdentifier #refreshFunction | REFRESH (STRING | .*?) #refreshResource | CACHE LAZY? TABLE multipartIdentifier (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a77d4f1c62..cfc31858d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1886,11 +1886,17 @@ class Analyzer( } /** + * Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s. * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + // Resolve functions with concrete relations from v2 catalog. + case UnresolvedFunc(multipartIdent) => + val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup") + ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName)) + case q: LogicalPlan => q transformExpressions { case u if !u.childrenResolved => u // Skip until children are resolved. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index f3d40c6d36..a16763f2cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog} @@ -50,6 +51,15 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN override def output: Seq[Attribute] = Nil } +/** + * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to + * [[ResolvedFunc]] during analysis. + */ +case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode { + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = Nil +} + /** * A plan containing resolved namespace. */ @@ -74,3 +84,13 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T case class ResolvedView(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } + +/** + * A plan containing resolved function. + */ +// TODO: create a generic representation for v1, v2 function, after we add function +// support to v2 catalog. For now we only need the identifier to fallback to v1 command. +case class ResolvedFunc(identifier: Identifier) + extends LeafNode { + override def output: Seq[Attribute] = Nil +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2b3f05f61b..ec0c34d4c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1351,6 +1351,14 @@ class SessionCatalog( functionRegistry.registerFunction(func, info, builder) } + /** + * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] + * Return true if function exists. + */ + def unregisterFunction(name: FunctionIdentifier): Boolean = { + functionRegistry.dropFunction(name) + } + /** * Drop a temporary function. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 13e528e796..5663741bae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3660,6 +3660,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx.REPLACE != null) } + override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) { + val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) + RefreshFunction(UnresolvedFunc(functionIdentifier)) + } + override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) { val comment = ctx.comment.getType match { case SqlBaseParser.NULL => "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b4120d9f64..137fc70397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -516,3 +516,10 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Comma case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. + */ +case class RefreshFunction(child: LogicalPlan) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index d90804f4b6..2ee760d4f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform} @@ -107,6 +107,14 @@ private[sql] object CatalogV2Implicits { throw new AnalysisException( s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.") } + + def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match { + case ns if ns.isEmpty => FunctionIdentifier(ident.name()) + case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName)) + case _ => + throw new AnalysisException( + s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.") + } } implicit class MultipartIdentifierHelper(parts: Seq[String]) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 10c15747ec..b84bf3e278 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** @@ -155,4 +155,31 @@ private[sql] trait LookupCatalog extends Logging { None } } + + // TODO: move function related v2 statements to the new framework. + def parseSessionCatalogFunctionIdentifier( + nameParts: Seq[String], + sql: String): FunctionIdentifier = { + if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { + return FunctionIdentifier(nameParts.head) + } + + nameParts match { + case SessionCatalogAndIdentifier(_, ident) => + if (nameParts.length == 1) { + // If there is only one name part, it means the current catalog is the session catalog. + // Here we don't fill the default database, to keep the error message unchanged for + // v1 commands. + FunctionIdentifier(nameParts.head, None) + } else { + ident.namespace match { + case Array(db) => FunctionIdentifier(ident.name, Some(db)) + case _ => + throw new AnalysisException(s"Unsupported function name '$ident'") + } + } + + case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index e802449a69..47f21a0a19 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -2109,6 +2109,15 @@ class DDLParserSuite extends AnalysisTest { "Operation not allowed: CREATE FUNCTION with resource type 'other'") } + test("REFRESH FUNCTION") { + parseCompare("REFRESH FUNCTION c", + RefreshFunction(UnresolvedFunc(Seq("c")))) + parseCompare("REFRESH FUNCTION b.c", + RefreshFunction(UnresolvedFunc(Seq("b", "c")))) + parseCompare("REFRESH FUNCTION a.b.c", + RefreshFunction(UnresolvedFunc(Seq("a", "b", "c")))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bc3f38a358..5717013b2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -622,33 +622,11 @@ class ResolveSessionCatalog( CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, replace) } - } - // TODO: move function related v2 statements to the new framework. - private def parseSessionCatalogFunctionIdentifier( - nameParts: Seq[String], - sql: String): FunctionIdentifier = { - if (nameParts.length == 1 && isTempFunction(nameParts.head)) { - return FunctionIdentifier(nameParts.head) - } - - nameParts match { - case SessionCatalogAndIdentifier(_, ident) => - if (nameParts.length == 1) { - // If there is only one name part, it means the current catalog is the session catalog. - // Here we don't fill the default database, to keep the error message unchanged for - // v1 commands. - FunctionIdentifier(nameParts.head, None) - } else { - ident.namespace match { - case Array(db) => FunctionIdentifier(ident.name, Some(db)) - case _ => - throw new AnalysisException(s"Unsupported function name '$ident'") - } - } - - case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") - } + case RefreshFunction(ResolvedFunc(identifier)) => + // Fallback to v1 command + val funcIdentifier = identifier.asFunctionIdentifier + RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 6fdc7f4a58..252d188ff8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + * REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( + databaseName: Option[String], + functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") + } + if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") + } + + val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) + // we only refresh the permanent function. + if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) + } else { + // clear cached function and throw exception + catalog.unregisterFunction(identifier) + throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) + } + + Seq.empty[Row] + } +} + object FunctionsCommand { // operators that do not have corresponding functions. // They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 85aea3ce41..d6c24e47e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2229,6 +2229,19 @@ class DataSourceV2SQLSuite "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) } + test("REFRESH FUNCTION: only support session catalog") { + val e = intercept[AnalysisException] { + sql("REFRESH FUNCTION testcat.ns1.ns2.fun") + } + assert(e.message.contains("function lookup is only supported in v1 catalog")) + + val e1 = intercept[AnalysisException] { + sql("REFRESH FUNCTION default.ns1.ns2.fun") + } + assert(e1.message.contains( + "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) + } + test("global temp view should not be masked by v2 catalog") { val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e4709e469d..faafcb7210 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -28,8 +28,8 @@ import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER @@ -3030,6 +3030,49 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("REFRESH FUNCTION") { + val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION md5") + }.getMessage + assert(msg.contains("Cannot refresh builtin function")) + + withUserDefinedFunction("func1" -> true) { + sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION func1") + }.getMessage + assert(msg.contains("Cannot refresh temporary function")) + } + + withUserDefinedFunction("func1" -> false) { + intercept[NoSuchFunctionException] { + sql("REFRESH FUNCTION func1") + } + + val func = FunctionIdentifier("func1", Some("default")) + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + + spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + intercept[NoSuchFunctionException] { + sql("REFRESH FUNCTION func1") + } + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + + val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) + spark.sessionState.catalog.createFunction(function, false) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + val err = intercept[AnalysisException] { + sql("REFRESH FUNCTION func1") + }.getMessage + assert(err.contains("Can not load class")) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + } + } } object FakeLocalFsFileSystem {