diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml index 2d26326a00..ef5a53a69e 100644 --- a/docs/_data/menu-sql.yaml +++ b/docs/_data/menu-sql.yaml @@ -208,6 +208,8 @@ url: sql-ref-syntax-aux-cache-clear-cache.html - text: REFRESH TABLE url: sql-ref-syntax-aux-cache-refresh-table.html + - text: REFRESH FUNCTION + url: sql-ref-syntax-aux-cache-refresh-function.html - text: REFRESH url: sql-ref-syntax-aux-cache-refresh.html - text: DESCRIBE diff --git a/docs/sql-ref-syntax-aux-cache-cache-table.md b/docs/sql-ref-syntax-aux-cache-cache-table.md index fdef3d657d..8829016fc1 100644 --- a/docs/sql-ref-syntax-aux-cache-cache-table.md +++ b/docs/sql-ref-syntax-aux-cache-cache-table.md @@ -80,3 +80,4 @@ CACHE TABLE testCache OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM testDat * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-clear-cache.md b/docs/sql-ref-syntax-aux-cache-clear-cache.md index a27cd83c14..aae4e39600 100644 --- a/docs/sql-ref-syntax-aux-cache-clear-cache.md +++ b/docs/sql-ref-syntax-aux-cache-clear-cache.md @@ -41,3 +41,4 @@ CLEAR CACHE; * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-refresh-function.md b/docs/sql-ref-syntax-aux-cache-refresh-function.md new file mode 100644 index 0000000000..d91fc062eb --- /dev/null +++ b/docs/sql-ref-syntax-aux-cache-refresh-function.md @@ -0,0 +1,60 @@ +--- +layout: global +title: REFRESH FUNCTION +displayTitle: REFRESH FUNCTION +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Description + +`REFRESH FUNCTION` statement invalidates the cached function entry, which includes a class name +and resource location of the given function. The invalidated cache is populated right away. +Note that `REFRESH FUNCTION` only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. + +### Syntax + +```sql +REFRESH FUNCTION function_identifier +``` + +### Parameters + +* **function_identifier** + + Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, uses the current database. + + **Syntax:** `[ database_name. ] function_name` + +### Examples + +```sql +-- The cached entry of the function will be refreshed +-- The function is resolved from the current database as the function name is unqualified. +REFRESH FUNCTION func1; + +-- The cached entry of the function will be refreshed +-- The function is resolved from tempDB database as the function name is qualified. +REFRESH FUNCTION db1.func1; +``` + +### Related Statements + +* [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html) +* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) +* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) +* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) +* [REFRESH](sql-ref-syntax-aux-cache-refresh.html) diff --git a/docs/sql-ref-syntax-aux-cache-refresh-table.md b/docs/sql-ref-syntax-aux-cache-refresh-table.md index 8d4a804f88..cc35c0451d 100644 --- a/docs/sql-ref-syntax-aux-cache-refresh-table.md +++ b/docs/sql-ref-syntax-aux-cache-refresh-table.md @@ -57,3 +57,4 @@ REFRESH TABLE tempDB.view1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache-refresh.md b/docs/sql-ref-syntax-aux-cache-refresh.md index b10e6fb47a..715bdcac3b 100644 --- a/docs/sql-ref-syntax-aux-cache-refresh.md +++ b/docs/sql-ref-syntax-aux-cache-refresh.md @@ -54,3 +54,4 @@ REFRESH "hdfs://path/to/table"; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) diff --git a/docs/sql-ref-syntax-aux-cache-uncache-table.md b/docs/sql-ref-syntax-aux-cache-uncache-table.md index 96a691e4c3..4456378cde 100644 --- a/docs/sql-ref-syntax-aux-cache-uncache-table.md +++ b/docs/sql-ref-syntax-aux-cache-uncache-table.md @@ -50,3 +50,4 @@ UNCACHE TABLE t1; * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) +* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) \ No newline at end of file diff --git a/docs/sql-ref-syntax-aux-cache.md b/docs/sql-ref-syntax-aux-cache.md index 0ccb1c61a0..17a13e67e5 100644 --- a/docs/sql-ref-syntax-aux-cache.md +++ b/docs/sql-ref-syntax-aux-cache.md @@ -23,4 +23,5 @@ license: | * [UNCACHE TABLE statement](sql-ref-syntax-aux-cache-uncache-table.html) * [CLEAR CACHE statement](sql-ref-syntax-aux-cache-clear-cache.html) * [REFRESH TABLE statement](sql-ref-syntax-aux-cache-refresh-table.html) - * [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html) \ No newline at end of file + * [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html) + * [REFRESH FUNCTION statement](sql-ref-syntax-aux-cache-refresh-function.html) diff --git a/docs/sql-ref-syntax.md b/docs/sql-ref-syntax.md index 4bf1858428..290523a2b1 100644 --- a/docs/sql-ref-syntax.md +++ b/docs/sql-ref-syntax.md @@ -83,6 +83,7 @@ Spark SQL is Apache Spark's module for working with structured data. The SQL Syn * [LIST JAR](sql-ref-syntax-aux-resource-mgmt-list-jar.html) * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) + * [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html) * [RESET](sql-ref-syntax-aux-conf-mgmt-reset.html) * [SET](sql-ref-syntax-aux-conf-mgmt-set.html) * [SHOW COLUMNS](sql-ref-syntax-aux-show-columns.html) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index bc7e982830..73dfdcc263 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -229,6 +229,7 @@ statement comment=(STRING | NULL) #commentNamespace | COMMENT ON TABLE multipartIdentifier IS comment=(STRING | NULL) #commentTable | REFRESH TABLE multipartIdentifier #refreshTable + | REFRESH FUNCTION multipartIdentifier #refreshFunction | REFRESH (STRING | .*?) #refreshResource | CACHE LAZY? TABLE multipartIdentifier (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a77d4f1c62..cfc31858d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1886,11 +1886,17 @@ class Analyzer( } /** + * Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s. * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + // Resolve functions with concrete relations from v2 catalog. + case UnresolvedFunc(multipartIdent) => + val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup") + ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName)) + case q: LogicalPlan => q transformExpressions { case u if !u.childrenResolved => u // Skip until children are resolved. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index f3d40c6d36..a16763f2cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.catalog.CatalogFunction import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog} @@ -50,6 +51,15 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN override def output: Seq[Attribute] = Nil } +/** + * Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to + * [[ResolvedFunc]] during analysis. + */ +case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode { + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = Nil +} + /** * A plan containing resolved namespace. */ @@ -74,3 +84,13 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T case class ResolvedView(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } + +/** + * A plan containing resolved function. + */ +// TODO: create a generic representation for v1, v2 function, after we add function +// support to v2 catalog. For now we only need the identifier to fallback to v1 command. +case class ResolvedFunc(identifier: Identifier) + extends LeafNode { + override def output: Seq[Attribute] = Nil +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2b3f05f61b..ec0c34d4c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1351,6 +1351,14 @@ class SessionCatalog( functionRegistry.registerFunction(func, info, builder) } + /** + * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] + * Return true if function exists. + */ + def unregisterFunction(name: FunctionIdentifier): Boolean = { + functionRegistry.dropFunction(name) + } + /** * Drop a temporary function. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 13e528e796..5663741bae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3660,6 +3660,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx.REPLACE != null) } + override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) { + val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier) + RefreshFunction(UnresolvedFunc(functionIdentifier)) + } + override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) { val comment = ctx.comment.getType match { case SqlBaseParser.NULL => "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b4120d9f64..137fc70397 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -516,3 +516,10 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Comma case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. + */ +case class RefreshFunction(child: LogicalPlan) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index d90804f4b6..2ee760d4f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform} @@ -107,6 +107,14 @@ private[sql] object CatalogV2Implicits { throw new AnalysisException( s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.") } + + def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match { + case ns if ns.isEmpty => FunctionIdentifier(ident.name()) + case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName)) + case _ => + throw new AnalysisException( + s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.") + } } implicit class MultipartIdentifierHelper(parts: Seq[String]) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 10c15747ec..b84bf3e278 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** @@ -155,4 +155,31 @@ private[sql] trait LookupCatalog extends Logging { None } } + + // TODO: move function related v2 statements to the new framework. + def parseSessionCatalogFunctionIdentifier( + nameParts: Seq[String], + sql: String): FunctionIdentifier = { + if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { + return FunctionIdentifier(nameParts.head) + } + + nameParts match { + case SessionCatalogAndIdentifier(_, ident) => + if (nameParts.length == 1) { + // If there is only one name part, it means the current catalog is the session catalog. + // Here we don't fill the default database, to keep the error message unchanged for + // v1 commands. + FunctionIdentifier(nameParts.head, None) + } else { + ident.namespace match { + case Array(db) => FunctionIdentifier(ident.name, Some(db)) + case _ => + throw new AnalysisException(s"Unsupported function name '$ident'") + } + } + + case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index e802449a69..47f21a0a19 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -2109,6 +2109,15 @@ class DDLParserSuite extends AnalysisTest { "Operation not allowed: CREATE FUNCTION with resource type 'other'") } + test("REFRESH FUNCTION") { + parseCompare("REFRESH FUNCTION c", + RefreshFunction(UnresolvedFunc(Seq("c")))) + parseCompare("REFRESH FUNCTION b.c", + RefreshFunction(UnresolvedFunc(Seq("b", "c")))) + parseCompare("REFRESH FUNCTION a.b.c", + RefreshFunction(UnresolvedFunc(Seq("a", "b", "c")))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bc3f38a358..5717013b2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -622,33 +622,11 @@ class ResolveSessionCatalog( CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, replace) } - } - // TODO: move function related v2 statements to the new framework. - private def parseSessionCatalogFunctionIdentifier( - nameParts: Seq[String], - sql: String): FunctionIdentifier = { - if (nameParts.length == 1 && isTempFunction(nameParts.head)) { - return FunctionIdentifier(nameParts.head) - } - - nameParts match { - case SessionCatalogAndIdentifier(_, ident) => - if (nameParts.length == 1) { - // If there is only one name part, it means the current catalog is the session catalog. - // Here we don't fill the default database, to keep the error message unchanged for - // v1 commands. - FunctionIdentifier(nameParts.head, None) - } else { - ident.namespace match { - case Array(db) => FunctionIdentifier(ident.name, Some(db)) - case _ => - throw new AnalysisException(s"Unsupported function name '$ident'") - } - } - - case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") - } + case RefreshFunction(ResolvedFunc(identifier)) => + // Fallback to v1 command + val funcIdentifier = identifier.asFunctionIdentifier + RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 6fdc7f4a58..252d188ff8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + * REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( + databaseName: Option[String], + functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") + } + if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") + } + + val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) + // we only refresh the permanent function. + if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) + } else { + // clear cached function and throw exception + catalog.unregisterFunction(identifier) + throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) + } + + Seq.empty[Row] + } +} + object FunctionsCommand { // operators that do not have corresponding functions. // They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 85aea3ce41..d6c24e47e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2229,6 +2229,19 @@ class DataSourceV2SQLSuite "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) } + test("REFRESH FUNCTION: only support session catalog") { + val e = intercept[AnalysisException] { + sql("REFRESH FUNCTION testcat.ns1.ns2.fun") + } + assert(e.message.contains("function lookup is only supported in v1 catalog")) + + val e1 = intercept[AnalysisException] { + sql("REFRESH FUNCTION default.ns1.ns2.fun") + } + assert(e1.message.contains( + "The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun")) + } + test("global temp view should not be masked by v2 catalog") { val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e4709e469d..faafcb7210 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -28,8 +28,8 @@ import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER @@ -3030,6 +3030,49 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("REFRESH FUNCTION") { + val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION md5") + }.getMessage + assert(msg.contains("Cannot refresh builtin function")) + + withUserDefinedFunction("func1" -> true) { + sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION func1") + }.getMessage + assert(msg.contains("Cannot refresh temporary function")) + } + + withUserDefinedFunction("func1" -> false) { + intercept[NoSuchFunctionException] { + sql("REFRESH FUNCTION func1") + } + + val func = FunctionIdentifier("func1", Some("default")) + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + + spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + intercept[NoSuchFunctionException] { + sql("REFRESH FUNCTION func1") + } + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + + val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) + spark.sessionState.catalog.createFunction(function, false) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + val err = intercept[AnalysisException] { + sql("REFRESH FUNCTION func1") + }.getMessage + assert(err.contains("Can not load class")) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + } + } } object FakeLocalFsFileSystem {