[SPARK-30039][SQL] CREATE FUNCTION should do multi-catalog resolution

### What changes were proposed in this pull request?

Add CreateFunctionStatement and make CREATE FUNCTION go through the same catalog/table resolution framework of v2 commands.

### Why are the changes needed?

It's important to make all the commands have the same table resolution behavior, to avoid confusing
CREATE FUNCTION namespace.function

### Does this PR introduce any user-facing change?

Yes. When running CREATE FUNCTION namespace.function Spark fails the command if the current catalog is set to a v2 catalog.

### How was this patch tested?

Unit tests.

Closes #26890 from planga82/feature/SPARK-30039_CreateFunctionV2Command.

Authored-by: Pablo Langa <soypab@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Pablo Langa 2020-01-08 00:38:15 +08:00 committed by Wenchen Fan
parent f399d655c4
commit 9479887ba1
7 changed files with 127 additions and 153 deletions

View file

@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@ -3456,6 +3456,36 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx.TEMPORARY != null)
}
/**
* Create a CREATE FUNCTION statement.
*
* For example:
* {{{
* CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
* AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
* }}}
*/
override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
val resources = ctx.resource.asScala.map { resource =>
val resourceType = resource.identifier.getText.toLowerCase(Locale.ROOT)
resourceType match {
case "jar" | "file" | "archive" =>
FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
case other =>
operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
}
}
val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
CreateFunctionStatement(
functionIdentifier,
string(ctx.className),
resources,
ctx.TEMPORARY != null,
ctx.EXISTS != null,
ctx.REPLACE != null)
}
override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) {
val comment = ctx.comment.getType match {
case SqlBaseParser.NULL => ""

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.analysis.ViewType
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@ -529,3 +529,14 @@ case class DropFunctionStatement(
functionName: Seq[String],
ifExists: Boolean,
isTemp: Boolean) extends ParsedStatement
/**
* CREATE FUNCTION statement, as parsed from SQL
*/
case class CreateFunctionStatement(
functionName: Seq[String],
className: String,
resources: Seq[FunctionResource],
isTemp: Boolean,
ignoreIfExists: Boolean,
replace: Boolean) extends ParsedStatement

View file

@ -21,7 +21,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
@ -1925,7 +1925,43 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(
parsePlan("DROP TEMPORARY FUNCTION IF EXISTS a.b.c"),
DropFunctionStatement(Seq("a", "b", "c"), true, true))
}
test("CREATE FUNCTION") {
parseCompare("CREATE FUNCTION a as 'fun'",
CreateFunctionStatement(Seq("a"), "fun", Seq(), false, false, false))
parseCompare("CREATE FUNCTION a.b.c as 'fun'",
CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, false, false))
parseCompare("CREATE OR REPLACE FUNCTION a.b.c as 'fun'",
CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, false, true))
parseCompare("CREATE TEMPORARY FUNCTION a.b.c as 'fun'",
CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), true, false, false))
parseCompare("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'",
CreateFunctionStatement(Seq("a", "b", "c"), "fun", Seq(), false, true, false))
parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j'",
CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(JarResource, "j")),
false, false, false))
parseCompare("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'",
CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(ArchiveResource, "a")),
false, false, false))
parseCompare("CREATE FUNCTION a as 'fun' USING FILE 'f'",
CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(FileResource, "f")),
false, false, false))
parseCompare("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'",
CreateFunctionStatement(Seq("a"), "fun", Seq(FunctionResource(JarResource, "j"),
FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")),
false, false, false))
intercept("CREATE FUNCTION a as 'fun' USING OTHER 'o'",
"Operation not allowed: CREATE FUNCTION with resource type 'other'")
}
private case class TableSpec(

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
@ -491,48 +491,48 @@ class ResolveSessionCatalog(
tbl.asTableIdentifier,
propertyKey)
case DescribeFunctionStatement(CatalogAndIdentifier(catalog, functionIdent), extended) =>
val functionIdentifier = if (isSessionCatalog(catalog)) {
functionIdent.asMultipartIdentifier match {
case Seq(db, fn) => FunctionIdentifier(fn, Some(db))
case Seq(fn) => FunctionIdentifier(fn, None)
case _ =>
throw new AnalysisException(s"Unsupported function name '${functionIdent.quoted}'")
}
} else {
throw new AnalysisException ("DESCRIBE FUNCTION is only supported in v1 catalog")
}
DescribeFunctionCommand(functionIdentifier, extended)
case DescribeFunctionStatement(CatalogAndIdentifier(catalog, ident), extended) =>
val functionIdent =
parseSessionCatalogFunctionIdentifier("DESCRIBE FUNCTION", catalog, ident)
DescribeFunctionCommand(functionIdent, extended)
case ShowFunctionsStatement(userScope, systemScope, pattern, fun) =>
val (database, function) = fun match {
case Some(CatalogAndIdentifier(catalog, functionIdent)) =>
if (isSessionCatalog(catalog)) {
functionIdent.asMultipartIdentifier match {
case Seq(db, fn) => (Some(db), Some(fn))
case Seq(fn) => (None, Some(fn))
case _ =>
throw new AnalysisException(s"Unsupported function name '${functionIdent.quoted}'")
}
} else {
throw new AnalysisException ("SHOW FUNCTIONS is only supported in v1 catalog")
}
case Some(CatalogAndIdentifier(catalog, ident)) =>
val FunctionIdentifier(fn, db) =
parseSessionCatalogFunctionIdentifier("SHOW FUNCTIONS", catalog, ident)
(db, Some(fn))
case None => (None, pattern)
}
ShowFunctionsCommand(database, function, userScope, systemScope)
case DropFunctionStatement(CatalogAndIdentifier(catalog, functionIdent), ifExists, isTemp) =>
if (isSessionCatalog(catalog)) {
val (database, function) = functionIdent.asMultipartIdentifier match {
case Seq(db, fn) => (Some(db), fn)
case Seq(fn) => (None, fn)
case _ =>
throw new AnalysisException(s"Unsupported function name '${functionIdent.quoted}'")
}
DropFunctionCommand(database, function, ifExists, isTemp)
} else {
throw new AnalysisException("DROP FUNCTION is only supported in v1 catalog")
case DropFunctionStatement(CatalogAndIdentifier(catalog, ident), ifExists, isTemp) =>
val FunctionIdentifier(function, database) =
parseSessionCatalogFunctionIdentifier("DROP FUNCTION", catalog, ident)
DropFunctionCommand(database, function, ifExists, isTemp)
case CreateFunctionStatement(CatalogAndIdentifier(catalog, ident),
className, resources, isTemp, ignoreIfExists, replace) =>
val FunctionIdentifier(function, database) =
parseSessionCatalogFunctionIdentifier("CREATE FUNCTION", catalog, ident)
CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists,
replace)
}
private def parseSessionCatalogFunctionIdentifier(
sql: String,
catalog: CatalogPlugin,
functionIdent: Identifier): FunctionIdentifier = {
if (isSessionCatalog(catalog)) {
functionIdent.asMultipartIdentifier match {
case Seq(db, fn) => FunctionIdentifier(fn, Some(db))
case Seq(fn) => FunctionIdentifier(fn, None)
case _ =>
throw new AnalysisException(s"Unsupported function name '${functionIdent.quoted}'")
}
} else {
throw new AnalysisException(s"$sql is only supported in v1 catalog")
}
}
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {

View file

@ -223,38 +223,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
/**
* Create a [[CreateFunctionCommand]] command.
*
* For example:
* {{{
* CREATE [OR REPLACE] [TEMPORARY] FUNCTION [IF NOT EXISTS] [db_name.]function_name
* AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri']];
* }}}
*/
override def visitCreateFunction(ctx: CreateFunctionContext): LogicalPlan = withOrigin(ctx) {
val resources = ctx.resource.asScala.map { resource =>
val resourceType = resource.identifier.getText.toLowerCase(Locale.ROOT)
resourceType match {
case "jar" | "file" | "archive" =>
FunctionResource(FunctionResourceType.fromString(resourceType), string(resource.STRING))
case other =>
operationNotAllowed(s"CREATE FUNCTION with resource type '$resourceType'", ctx)
}
}
// Extract database, name & alias.
val functionIdentifier = visitFunctionName(ctx.multipartIdentifier)
CreateFunctionCommand(
functionIdentifier.database,
functionIdentifier.funcName,
string(ctx.className),
resources,
ctx.TEMPORARY != null,
ctx.EXISTS != null,
ctx.REPLACE != null)
}
/**
* Convert a nested constants list into a sequence of string sequences.
*/

View file

@ -1840,6 +1840,18 @@ class DataSourceV2SQLSuite
assert(e1.message.contains("Unsupported function name 'default.ns1.ns2.fun'"))
}
test("CREATE FUNCTION: only support session catalog") {
val e = intercept[AnalysisException] {
sql("CREATE FUNCTION testcat.ns1.ns2.fun as 'f'")
}
assert(e.message.contains("CREATE FUNCTION is only supported in v1 catalog"))
val e1 = intercept[AnalysisException] {
sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'")
}
assert(e1.message.contains("Unsupported function name 'default.ns1.ns2.fun'"))
}
test("global temp view should not be masked by v2 catalog") {
val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)

View file

@ -87,89 +87,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
containsThesePhrases = Seq("key_without_value"))
}
test("create function") {
val sql1 =
"""
|CREATE TEMPORARY FUNCTION helloworld as
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
|JAR '/path/to/jar2'
""".stripMargin
val sql2 =
"""
|CREATE FUNCTION hello.world as
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
|FILE '/path/to/file'
""".stripMargin
val sql3 =
"""
|CREATE OR REPLACE TEMPORARY FUNCTION helloworld3 as
|'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar1',
|JAR '/path/to/jar2'
""".stripMargin
val sql4 =
"""
|CREATE OR REPLACE FUNCTION hello.world1 as
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
|FILE '/path/to/file'
""".stripMargin
val sql5 =
"""
|CREATE FUNCTION IF NOT EXISTS hello.world2 as
|'com.matthewrathbone.example.SimpleUDFExample' USING ARCHIVE '/path/to/archive',
|FILE '/path/to/file'
""".stripMargin
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val parsed4 = parser.parsePlan(sql4)
val parsed5 = parser.parsePlan(sql5)
val expected1 = CreateFunctionCommand(
None,
"helloworld",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
isTemp = true, ignoreIfExists = false, replace = false)
val expected2 = CreateFunctionCommand(
Some("hello"),
"world",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
isTemp = false, ignoreIfExists = false, replace = false)
val expected3 = CreateFunctionCommand(
None,
"helloworld3",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
isTemp = true, ignoreIfExists = false, replace = true)
val expected4 = CreateFunctionCommand(
Some("hello"),
"world1",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
isTemp = false, ignoreIfExists = false, replace = true)
val expected5 = CreateFunctionCommand(
Some("hello"),
"world2",
"com.matthewrathbone.example.SimpleUDFExample",
Seq(
FunctionResource(FunctionResourceType.fromString("archive"), "/path/to/archive"),
FunctionResource(FunctionResourceType.fromString("file"), "/path/to/file")),
isTemp = false, ignoreIfExists = true, replace = false)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
comparePlans(parsed4, expected4)
comparePlans(parsed5, expected5)
}
test("create hive table - table file format") {
val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile",
"sequencefile", "rcfile", "textfile")