[SPARK-31999][SQL] Add REFRESH FUNCTION command

### What changes were proposed in this pull request?

In Hive mode, permanent functions are shared with Hive metastore so that functions may be modified by other Hive client. With in long-lived spark scene, it's hard to update the change of function.

Here are 2 reasons:
* Spark cache the function in memory using `FunctionRegistry`.
* User may not know the location or classname of udf when using `replace function`.

Note that we use v2 command code path to add new command.

### Why are the changes needed?

Give a easy way to make spark function registry sync with Hive metastore.
Then we can call
```
refresh function functionName
```

### Does this PR introduce _any_ user-facing change?

Yes, new command.

### How was this patch tested?

New UT.

Closes #28840 from ulysses-you/SPARK-31999.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
ulysses 2020-07-22 19:05:50 +00:00 committed by Wenchen Fan
parent cd16a10475
commit 184074de22
22 changed files with 265 additions and 32 deletions

View file

@ -208,6 +208,8 @@
url: sql-ref-syntax-aux-cache-clear-cache.html
- text: REFRESH TABLE
url: sql-ref-syntax-aux-cache-refresh-table.html
- text: REFRESH FUNCTION
url: sql-ref-syntax-aux-cache-refresh-function.html
- text: REFRESH
url: sql-ref-syntax-aux-cache-refresh.html
- text: DESCRIBE

View file

@ -80,3 +80,4 @@ CACHE TABLE testCache OPTIONS ('storageLevel' 'DISK_ONLY') SELECT * FROM testDat
* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html)
* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html)
* [REFRESH](sql-ref-syntax-aux-cache-refresh.html)
* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html)

View file

@ -41,3 +41,4 @@ CLEAR CACHE;
* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html)
* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html)
* [REFRESH](sql-ref-syntax-aux-cache-refresh.html)
* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html)

View file

@ -0,0 +1,60 @@
---
layout: global
title: REFRESH FUNCTION
displayTitle: REFRESH FUNCTION
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
### Description
`REFRESH FUNCTION` statement invalidates the cached function entry, which includes a class name
and resource location of the given function. The invalidated cache is populated right away.
Note that `REFRESH FUNCTION` only works for permanent functions. Refreshing native functions or temporary functions will cause an exception.
### Syntax
```sql
REFRESH FUNCTION function_identifier
```
### Parameters
* **function_identifier**
Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, uses the current database.
**Syntax:** `[ database_name. ] function_name`
### Examples
```sql
-- The cached entry of the function will be refreshed
-- The function is resolved from the current database as the function name is unqualified.
REFRESH FUNCTION func1;
-- The cached entry of the function will be refreshed
-- The function is resolved from tempDB database as the function name is qualified.
REFRESH FUNCTION db1.func1;
```
### Related Statements
* [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html)
* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html)
* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html)
* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html)
* [REFRESH](sql-ref-syntax-aux-cache-refresh.html)

View file

@ -57,3 +57,4 @@ REFRESH TABLE tempDB.view1;
* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html)
* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html)
* [REFRESH](sql-ref-syntax-aux-cache-refresh.html)
* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html)

View file

@ -54,3 +54,4 @@ REFRESH "hdfs://path/to/table";
* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html)
* [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html)
* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html)
* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html)

View file

@ -50,3 +50,4 @@ UNCACHE TABLE t1;
* [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html)
* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html)
* [REFRESH](sql-ref-syntax-aux-cache-refresh.html)
* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html)

View file

@ -23,4 +23,5 @@ license: |
* [UNCACHE TABLE statement](sql-ref-syntax-aux-cache-uncache-table.html)
* [CLEAR CACHE statement](sql-ref-syntax-aux-cache-clear-cache.html)
* [REFRESH TABLE statement](sql-ref-syntax-aux-cache-refresh-table.html)
* [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html)
* [REFRESH statement](sql-ref-syntax-aux-cache-refresh.html)
* [REFRESH FUNCTION statement](sql-ref-syntax-aux-cache-refresh-function.html)

View file

@ -83,6 +83,7 @@ Spark SQL is Apache Spark's module for working with structured data. The SQL Syn
* [LIST JAR](sql-ref-syntax-aux-resource-mgmt-list-jar.html)
* [REFRESH](sql-ref-syntax-aux-cache-refresh.html)
* [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html)
* [REFRESH FUNCTION](sql-ref-syntax-aux-cache-refresh-function.html)
* [RESET](sql-ref-syntax-aux-conf-mgmt-reset.html)
* [SET](sql-ref-syntax-aux-conf-mgmt-set.html)
* [SHOW COLUMNS](sql-ref-syntax-aux-show-columns.html)

View file

@ -229,6 +229,7 @@ statement
comment=(STRING | NULL) #commentNamespace
| COMMENT ON TABLE multipartIdentifier IS comment=(STRING | NULL) #commentTable
| REFRESH TABLE multipartIdentifier #refreshTable
| REFRESH FUNCTION multipartIdentifier #refreshFunction
| REFRESH (STRING | .*?) #refreshResource
| CACHE LAZY? TABLE multipartIdentifier
(OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable

View file

@ -1886,11 +1886,17 @@ class Analyzer(
}
/**
* Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s.
* Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
*/
object ResolveFunctions extends Rule[LogicalPlan] {
val trimWarningEnabled = new AtomicBoolean(true)
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
// Resolve functions with concrete relations from v2 catalog.
case UnresolvedFunc(multipartIdent) =>
val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, "function lookup")
ResolvedFunc(Identifier.of(funcIdent.database.toArray, funcIdent.funcName))
case q: LogicalPlan =>
q transformExpressions {
case u if !u.childrenResolved => u // Skip until children are resolved.

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, SupportsNamespaces, Table, TableCatalog}
@ -50,6 +51,15 @@ case class UnresolvedTableOrView(multipartIdentifier: Seq[String]) extends LeafN
override def output: Seq[Attribute] = Nil
}
/**
* Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to
* [[ResolvedFunc]] during analysis.
*/
case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode {
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
/**
* A plan containing resolved namespace.
*/
@ -74,3 +84,13 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T
case class ResolvedView(identifier: Identifier) extends LeafNode {
override def output: Seq[Attribute] = Nil
}
/**
* A plan containing resolved function.
*/
// TODO: create a generic representation for v1, v2 function, after we add function
// support to v2 catalog. For now we only need the identifier to fallback to v1 command.
case class ResolvedFunc(identifier: Identifier)
extends LeafNode {
override def output: Seq[Attribute] = Nil
}

View file

@ -1351,6 +1351,14 @@ class SessionCatalog(
functionRegistry.registerFunction(func, info, builder)
}
/**
* Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]]
* Return true if function exists.
*/
def unregisterFunction(name: FunctionIdentifier): Boolean = {
functionRegistry.dropFunction(name)
}
/**
* Drop a temporary function.
*/

View file

@ -3660,6 +3660,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx.REPLACE != null)
}
override def visitRefreshFunction(ctx: RefreshFunctionContext): LogicalPlan = withOrigin(ctx) {
val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
RefreshFunction(UnresolvedFunc(functionIdentifier))
}
override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) {
val comment = ctx.comment.getType match {
case SqlBaseParser.NULL => ""

View file

@ -516,3 +516,10 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Comma
case class CommentOnTable(child: LogicalPlan, comment: String) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
/**
* The logical plan of the REFRESH FUNCTION command that works for v2 catalogs.
*/
case class RefreshFunction(child: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.connector.catalog
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform}
@ -107,6 +107,14 @@ private[sql] object CatalogV2Implicits {
throw new AnalysisException(
s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.")
}
def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
case ns if ns.isEmpty => FunctionIdentifier(ident.name())
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
case _ =>
throw new AnalysisException(
s"$quoted is not a valid FunctionIdentifier as it has more than 2 name parts.")
}
}
implicit class MultipartIdentifierHelper(parts: Seq[String]) {

View file

@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
/**
@ -155,4 +155,31 @@ private[sql] trait LookupCatalog extends Logging {
None
}
}
// TODO: move function related v2 statements to the new framework.
def parseSessionCatalogFunctionIdentifier(
nameParts: Seq[String],
sql: String): FunctionIdentifier = {
if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) {
return FunctionIdentifier(nameParts.head)
}
nameParts match {
case SessionCatalogAndIdentifier(_, ident) =>
if (nameParts.length == 1) {
// If there is only one name part, it means the current catalog is the session catalog.
// Here we don't fill the default database, to keep the error message unchanged for
// v1 commands.
FunctionIdentifier(nameParts.head, None)
} else {
ident.namespace match {
case Array(db) => FunctionIdentifier(ident.name, Some(db))
case _ =>
throw new AnalysisException(s"Unsupported function name '$ident'")
}
}
case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog")
}
}
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
@ -2109,6 +2109,15 @@ class DDLParserSuite extends AnalysisTest {
"Operation not allowed: CREATE FUNCTION with resource type 'other'")
}
test("REFRESH FUNCTION") {
parseCompare("REFRESH FUNCTION c",
RefreshFunction(UnresolvedFunc(Seq("c"))))
parseCompare("REFRESH FUNCTION b.c",
RefreshFunction(UnresolvedFunc(Seq("b", "c"))))
parseCompare("REFRESH FUNCTION a.b.c",
RefreshFunction(UnresolvedFunc(Seq("a", "b", "c"))))
}
private case class TableSpec(
name: Seq[String],
schema: Option[StructType],

View file

@ -622,33 +622,11 @@ class ResolveSessionCatalog(
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
replace)
}
}
// TODO: move function related v2 statements to the new framework.
private def parseSessionCatalogFunctionIdentifier(
nameParts: Seq[String],
sql: String): FunctionIdentifier = {
if (nameParts.length == 1 && isTempFunction(nameParts.head)) {
return FunctionIdentifier(nameParts.head)
}
nameParts match {
case SessionCatalogAndIdentifier(_, ident) =>
if (nameParts.length == 1) {
// If there is only one name part, it means the current catalog is the session catalog.
// Here we don't fill the default database, to keep the error message unchanged for
// v1 commands.
FunctionIdentifier(nameParts.head, None)
} else {
ident.namespace match {
case Array(db) => FunctionIdentifier(ident.name, Some(db))
case _ =>
throw new AnalysisException(s"Unsupported function name '$ident'")
}
}
case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog")
}
case RefreshFunction(ResolvedFunc(identifier)) =>
// Fallback to v1 command
val funcIdentifier = identifier.asFunctionIdentifier
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
}
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match {

View file

@ -236,6 +236,45 @@ case class ShowFunctionsCommand(
}
}
/**
* A command for users to refresh the persistent function.
* The syntax of using this command in SQL is:
* {{{
* REFRESH FUNCTION functionName
* }}}
*/
case class RefreshFunctionCommand(
databaseName: Option[String],
functionName: String)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) {
throw new AnalysisException(s"Cannot refresh builtin function $functionName")
}
if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) {
throw new AnalysisException(s"Cannot refresh temporary function $functionName")
}
val identifier = FunctionIdentifier(
functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase)))
// we only refresh the permanent function.
if (catalog.isPersistentFunction(identifier)) {
// register overwrite function.
val func = catalog.getFunctionMetadata(identifier)
catalog.registerFunction(func, true)
} else {
// clear cached function and throw exception
catalog.unregisterFunction(identifier)
throw new NoSuchFunctionException(identifier.database.get, identifier.funcName)
}
Seq.empty[Row]
}
}
object FunctionsCommand {
// operators that do not have corresponding functions.
// They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand`

View file

@ -2229,6 +2229,19 @@ class DataSourceV2SQLSuite
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
}
test("REFRESH FUNCTION: only support session catalog") {
val e = intercept[AnalysisException] {
sql("REFRESH FUNCTION testcat.ns1.ns2.fun")
}
assert(e.message.contains("function lookup is only supported in v1 catalog"))
val e1 = intercept[AnalysisException] {
sql("REFRESH FUNCTION default.ns1.ns2.fun")
}
assert(e1.message.contains(
"The namespace in session catalog must have exactly one name part: default.ns1.ns2.fun"))
}
test("global temp view should not be masked by v2 catalog") {
val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)

View file

@ -28,8 +28,8 @@ import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.internal.config
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
@ -3030,6 +3030,49 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}
test("REFRESH FUNCTION") {
val msg = intercept[AnalysisException] {
sql("REFRESH FUNCTION md5")
}.getMessage
assert(msg.contains("Cannot refresh builtin function"))
withUserDefinedFunction("func1" -> true) {
sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
val msg = intercept[AnalysisException] {
sql("REFRESH FUNCTION func1")
}.getMessage
assert(msg.contains("Cannot refresh temporary function"))
}
withUserDefinedFunction("func1" -> false) {
intercept[NoSuchFunctionException] {
sql("REFRESH FUNCTION func1")
}
val func = FunctionIdentifier("func1", Some("default"))
sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
assert(!spark.sessionState.catalog.isRegisteredFunction(func))
sql("REFRESH FUNCTION func1")
assert(spark.sessionState.catalog.isRegisteredFunction(func))
spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1")
assert(spark.sessionState.catalog.isRegisteredFunction(func))
intercept[NoSuchFunctionException] {
sql("REFRESH FUNCTION func1")
}
assert(!spark.sessionState.catalog.isRegisteredFunction(func))
val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty)
spark.sessionState.catalog.createFunction(function, false)
assert(!spark.sessionState.catalog.isRegisteredFunction(func))
val err = intercept[AnalysisException] {
sql("REFRESH FUNCTION func1")
}.getMessage
assert(err.contains("Can not load class"))
assert(!spark.sessionState.catalog.isRegisteredFunction(func))
}
}
}
object FakeLocalFsFileSystem {