[SPARK-15108][SQL] Describe Permanent UDTF

#### What changes were proposed in this pull request?
When Describe a UDTF, the command returns a wrong result. The command is unable to find the function, which has been created and cataloged in the catalog but not in the functionRegistry.

This PR is to correct it. If the function is not in the functionRegistry, we will check the catalog for collecting the information of the UDTF function.

#### How was this patch tested?
Added test cases to verify the results

Author: gatorsmile <gatorsmile@gmail.com>

Closes #12885 from gatorsmile/showFunction.
This commit is contained in:
gatorsmile 2016-05-06 11:43:07 -07:00 committed by Yin Huai
parent 76ad04d9a0
commit 5c8fad7b9b
11 changed files with 91 additions and 31 deletions

View file

@ -37,5 +37,10 @@ class NoSuchPartitionException(
extends AnalysisException(
s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
class NoSuchFunctionException(db: String, func: String)
class NoSuchPermanentFunctionException(db: String, func: String)
extends AnalysisException(s"Function '$func' not found in database '$db'")
class NoSuchFunctionException(db: String, func: String)
extends AnalysisException(
s"Undefined function: '$func'. This function is neither a registered temporary function nor " +
s"a permanent function registered in the database '$db'.")

View file

@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, NoSuchPermanentFunctionException, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@ -644,9 +644,7 @@ class SessionCatalog(
}
protected def failFunctionLookup(name: String): Nothing = {
throw new AnalysisException(s"Undefined function: '$name'. This function is " +
s"neither a registered temporary function nor " +
s"a permanent function registered in the database '$currentDb'.")
throw new NoSuchFunctionException(db = currentDb, func = name)
}
/**
@ -709,7 +707,7 @@ class SessionCatalog(
externalCatalog.getFunction(currentDb, name.funcName)
} catch {
case e: AnalysisException => failFunctionLookup(name.funcName)
case e: NoSuchFunctionException => failFunctionLookup(name.funcName)
case e: NoSuchPermanentFunctionException => failFunctionLookup(name.funcName)
}
loadFunctionResources(catalogFunction.resources)
// Please note that qualifiedName is provided by the user. However,

View file

@ -103,11 +103,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
import ctx._
val functionName =
if (describeFuncName.STRING() != null) {
string(describeFuncName.STRING())
FunctionIdentifier(string(describeFuncName.STRING()), database = None)
} else if (describeFuncName.qualifiedName() != null) {
visitFunctionName(describeFuncName.qualifiedName).unquotedString
visitFunctionName(describeFuncName.qualifiedName)
} else {
describeFuncName.getText
FunctionIdentifier(describeFuncName.getText, database = None)
}
DescribeFunction(functionName, EXTENDED != null)
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType
@ -29,11 +30,12 @@ trait Command
/**
* Returned for the "DESCRIBE FUNCTION [EXTENDED] functionName" command.
*
* @param functionName The function to be described.
* @param isExtended True if "DESCRIBE FUNCTION EXTENDED" is used. Otherwise, false.
*/
private[sql] case class DescribeFunction(
functionName: String,
functionName: FunctionIdentifier,
isExtended: Boolean) extends LogicalPlan with Command {
override def children: Seq[LogicalPlan] = Seq.empty

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@ -68,7 +69,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
// Commands
assertNotSupportedInStreamingPlan(
"commmands",
DescribeFunction("func", true),
DescribeFunction(FunctionIdentifier("func", database = None), true),
outputMode = Append,
expectedMsgs = "commands" :: Nil)

View file

@ -57,10 +57,14 @@ class PlanParserSuite extends PlanTest {
}
test("describe function") {
assertEqual("describe function bar", DescribeFunction("bar", isExtended = false))
assertEqual("describe function extended bar", DescribeFunction("bar", isExtended = true))
assertEqual("describe function foo.bar", DescribeFunction("foo.bar", isExtended = false))
assertEqual("describe function extended f.bar", DescribeFunction("f.bar", isExtended = true))
assertEqual("describe function bar",
DescribeFunction(FunctionIdentifier("bar", database = None), isExtended = false))
assertEqual("describe function extended bar",
DescribeFunction(FunctionIdentifier("bar", database = None), isExtended = true))
assertEqual("describe function foo.bar",
DescribeFunction(FunctionIdentifier("bar", database = Option("foo")), isExtended = false))
assertEqual("describe function extended f.bar",
DescribeFunction(FunctionIdentifier("bar", database = Option("f")), isExtended = true))
}
test("set operations") {

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@ -82,7 +83,7 @@ case class CreateFunction(
* }}}
*/
case class DescribeFunction(
functionName: String,
functionName: FunctionIdentifier,
isExtended: Boolean) extends RunnableCommand {
override val output: Seq[Attribute] = {
@ -92,7 +93,7 @@ case class DescribeFunction(
private def replaceFunctionName(usage: String, functionName: String): String = {
if (usage == null) {
"To be added."
"N/A."
} else {
usage.replaceAll("_FUNC_", functionName)
}
@ -100,7 +101,7 @@ case class DescribeFunction(
override def run(sparkSession: SparkSession): Seq[Row] = {
// Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions.
functionName.toLowerCase match {
functionName.funcName.toLowerCase match {
case "<>" =>
Row(s"Function: $functionName") ::
Row(s"Usage: a <> b - Returns TRUE if a is not equal to b") :: Nil
@ -115,12 +116,13 @@ case class DescribeFunction(
Row(s"Function: case") ::
Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " +
s"When a = b, returns c; when a = d, return e; else return f") :: Nil
case _ => sparkSession.sessionState.functionRegistry.lookupFunction(functionName) match {
case Some(info) =>
case _ =>
try {
val info = sparkSession.sessionState.catalog.lookupFunctionInfo(functionName)
val result =
Row(s"Function: ${info.getName}") ::
Row(s"Class: ${info.getClassName}") ::
Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil
Row(s"Usage: ${replaceFunctionName(info.getUsage, info.getName)}") :: Nil
if (isExtended) {
result :+
@ -128,8 +130,8 @@ case class DescribeFunction(
} else {
result
}
case None => Seq(Row(s"Function: $functionName not found."))
} catch {
case _: NoSuchFunctionException => Seq(Row(s"Function: $functionName not found."))
}
}
}

View file

@ -90,7 +90,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-14415: All functions should have own descriptions") {
for (f <- sqlContext.sessionState.functionRegistry.listFunction()) {
if (!Seq("cube", "grouping", "grouping_id", "rollup", "window").contains(f)) {
checkKeywordsNotExist(sql(s"describe function `$f`"), "To be added.")
checkKeywordsNotExist(sql(s"describe function `$f`"), "N/A.")
}
}
}

View file

@ -534,7 +534,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
}
assert(rs1.next())
assert(rs1.getString(1) === "Usage: To be added.")
assert(rs1.getString(1) === "Usage: N/A.")
val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
@ -608,7 +608,7 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
}
assert(rs2.next())
assert(rs2.getString(1) === "Usage: To be added.")
assert(rs2.getString(1) === "Usage: N/A.")
} finally {
statement.executeQuery("DROP TEMPORARY FUNCTION udtf_count2")
}

View file

@ -229,7 +229,7 @@ private[hive] trait HiveClient {
/** Return an existing function in the database, assuming it exists. */
final def getFunction(db: String, name: String): CatalogFunction = {
getFunctionOption(db, name).getOrElse(throw new NoSuchFunctionException(db, name))
getFunctionOption(db, name).getOrElse(throw new NoSuchPermanentFunctionException(db, name))
}
/** Return an existing function in the database, or None if it doesn't exist. */

View file

@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@ -208,8 +207,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
Row("sha") :: Row("sha1") :: Row("sha2") :: Row("weekofyear") :: Nil)
}
test("describe functions") {
// The Spark SQL built-in functions
test("describe functions - built-in functions") {
checkKeywordsExist(sql("describe function extended upper"),
"Function: upper",
"Class: org.apache.spark.sql.catalyst.expressions.Upper",
@ -253,6 +251,56 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
"When a = b, returns c; when a = d, return e; else return f")
}
test("describe functions - user defined functions") {
withUserDefinedFunction("udtf_count" -> false) {
sql(
s"""
|CREATE FUNCTION udtf_count
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
|USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
""".stripMargin)
checkKeywordsExist(sql("describe function udtf_count"),
"Function: default.udtf_count",
"Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2",
"Usage: N/A")
checkAnswer(
sql("SELECT udtf_count(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Nil)
checkKeywordsExist(sql("describe function udtf_count"),
"Function: default.udtf_count",
"Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2",
"Usage: N/A")
}
}
test("describe functions - temporary user defined functions") {
withUserDefinedFunction("udtf_count_temp" -> true) {
sql(
s"""
|CREATE TEMPORARY FUNCTION udtf_count_temp
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
|USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
""".stripMargin)
checkKeywordsExist(sql("describe function udtf_count_temp"),
"Function: udtf_count_temp",
"Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2",
"Usage: N/A")
checkAnswer(
sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Nil)
checkKeywordsExist(sql("describe function udtf_count_temp"),
"Function: udtf_count_temp",
"Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2",
"Usage: N/A")
}
}
test("SPARK-5371: union with null and sum") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.registerTempTable("table1")