From 1077f2e1def6266aee6ad6f0640a8f46cd273e21 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 Feb 2015 20:07:46 -0800 Subject: [PATCH] [SPARK-5578][SQL][DataFrame] Provide a convenient way for Scala users to use UDFs A more convenient way to define user-defined functions. Author: Reynold Xin Closes #4345 from rxin/defineUDF and squashes the following commits: 639c0f8 [Reynold Xin] udf tests. 0a0b339 [Reynold Xin] defineUDF -> udf. b452b8d [Reynold Xin] Fix UDF registration. d2e42c3 [Reynold Xin] SQLContext.udf.register() returns a UserDefinedFunction also. 4333605 [Reynold Xin] [SQL][DataFrame] defineUDF. --- .../classification/LogisticRegression.scala | 10 +- .../spark/ml/feature/StandardScaler.scala | 6 +- .../apache/spark/ml/recommendation/ALS.scala | 14 +- .../org/apache/spark/sql/DataFrame.scala | 4 +- .../org/apache/spark/sql/DataFrameImpl.scala | 2 +- .../main/scala/org/apache/spark/sql/Dsl.scala | 194 +++++---- ...gistration.scala => UDFRegistration.scala} | 405 +++++------------- .../spark/sql/UserDefinedFunction.scala | 39 ++ .../org/apache/spark/sql/DataFrameSuite.scala | 6 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 6 +- .../spark/sql/UserDefinedTypeSuite.scala | 5 +- 11 files changed, 261 insertions(+), 430 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/{UdfRegistration.scala => UDFRegistration.scala} (56%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 18be35ad59..df90078de1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -132,14 +132,14 @@ class LogisticRegressionModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap - val scoreFunction: Vector => Double = (v) => { + val scoreFunction = udf((v: Vector) => { val margin = BLAS.dot(v, weights) 1.0 / (1.0 + math.exp(-margin)) - } + } : Double) val t = map(threshold) - val predictFunction: Double => Double = (score) => { if (score > t) 1.0 else 0.0 } + val predictFunction = udf((score: Double) => { if (score > t) 1.0 else 0.0 } : Double) dataset - .select($"*", callUDF(scoreFunction, col(map(featuresCol))).as(map(scoreCol))) - .select($"*", callUDF(predictFunction, col(map(scoreCol))).as(map(predictionCol))) + .select($"*", scoreFunction(col(map(featuresCol))).as(map(scoreCol))) + .select($"*", predictFunction(col(map(scoreCol))).as(map(predictionCol))) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 01a4f5eb20..4745a7ae95 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -81,10 +81,8 @@ class StandardScalerModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap - val scale: (Vector) => Vector = (v) => { - scaler.transform(v) - } - dataset.select($"*", callUDF(scale, col(map(inputCol))).as(map(outputCol))) + val scale = udf((v: Vector) => { scaler.transform(v) } : Vector) + dataset.select($"*", scale(col(map(inputCol))).as(map(outputCol))) } private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 511cb2fe40..c7bec7a845 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -126,22 +126,20 @@ class ALSModel private[ml] ( val map = this.paramMap ++ paramMap val users = userFactors.toDataFrame("id", "features") val items = itemFactors.toDataFrame("id", "features") - val predict: (Seq[Float], Seq[Float]) => Float = (userFeatures, itemFeatures) => { + + // Register a UDF for DataFrame, and then + // create a new column named map(predictionCol) by running the predict UDF. + val predict = udf((userFeatures: Seq[Float], itemFeatures: Seq[Float]) => { if (userFeatures != null && itemFeatures != null) { blas.sdot(k, userFeatures.toArray, 1, itemFeatures.toArray, 1) } else { Float.NaN } - } - val inputColumns = dataset.schema.fieldNames - val prediction = callUDF(predict, users("features"), items("features")).as(map(predictionCol)) - val outputColumns = inputColumns.map(f => dataset(f)) :+ prediction + } : Float) dataset .join(users, dataset(map(userCol)) === users("id"), "left") .join(items, dataset(map(itemCol)) === items("id"), "left") - .select(outputColumns: _*) - // TODO: Just use a dataset("*") - // .select(dataset("*"), prediction) + .select(dataset("*"), predict(users("features"), items("features")).as(map(predictionCol))) } override private[ml] def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f3bc07ae52..732b685558 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -54,10 +54,10 @@ private[sql] object DataFrame { * }}} * * Note that the [[Column]] type can also be manipulated through its various functions. - * {{ + * {{{ * // The following creates a new column that increases everybody's age by 10. * people("age") + 10 // in Scala - * }} + * }}} * * A more concrete example: * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 0b0623dc1f..a52bfa59a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -329,7 +329,7 @@ private[sql] class DataFrameImpl protected[sql]( override def save(path: String): Unit = { val dataSourceName = sqlContext.conf.defaultDataSourceName - save(dataSourceName, ("path" -> path)) + save(dataSourceName, "path" -> path) } override def save( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala index 71365c776d..8cf59f0a1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala @@ -186,15 +186,13 @@ object Dsl { (0 to 22).map { x => val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"}) val typeTags = (1 to x).map(i => s"A$i: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) - val args = (1 to x).map(i => s"arg$i: Column").mkString(", ") - val argsInUdf = (1 to x).map(i => s"arg$i.expr").mkString(", ") println(s""" /** - * Call a Scala function of ${x} arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of ${x} arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[$typeTags](f: Function$x[$types]${if (args.length > 0) ", " + args else ""}): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq($argsInUdf)) + def udf[$typeTags](f: Function$x[$types]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) }""") } @@ -214,187 +212,187 @@ object Dsl { } */ /** - * Call a Scala function of 0 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 0 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag](f: Function0[RT]): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq()) + def udf[RT: TypeTag](f: Function0[RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 1 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 1 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT], arg1: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr)) + def udf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 2 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 2 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT], arg1: Column, arg2: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 3 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 3 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT], arg1: Column, arg2: Column, arg3: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](f: Function3[A1, A2, A3, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 4 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 4 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](f: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 5 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 5 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](f: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 6 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 6 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](f: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 7 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 7 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](f: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 8 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 8 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](f: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 9 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 9 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](f: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 10 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 10 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](f: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 11 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 11 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](f: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](f: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 12 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 12 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](f: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](f: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 13 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 13 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](f: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](f: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 14 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 14 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](f: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](f: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 15 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 15 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](f: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](f: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 16 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 16 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](f: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](f: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 17 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 17 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](f: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](f: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 18 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 18 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](f: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](f: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 19 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 19 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](f: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](f: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 20 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 20 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](f: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](f: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 21 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 21 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](f: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column, arg21: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr, arg21.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](f: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } /** - * Call a Scala function of 22 arguments as user-defined function (UDF), and automatically - * infer the data types based on the function's signature. + * Defines a user-defined function of 22 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. */ - def callUDF[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](f: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT], arg1: Column, arg2: Column, arg3: Column, arg4: Column, arg5: Column, arg6: Column, arg7: Column, arg8: Column, arg9: Column, arg10: Column, arg11: Column, arg12: Column, arg13: Column, arg14: Column, arg15: Column, arg16: Column, arg17: Column, arg18: Column, arg19: Column, arg20: Column, arg21: Column, arg22: Column): Column = { - ScalaUdf(f, ScalaReflection.schemaFor(typeTag[RT]).dataType, Seq(arg1.expr, arg2.expr, arg3.expr, arg4.expr, arg5.expr, arg6.expr, arg7.expr, arg8.expr, arg9.expr, arg10.expr, arg11.expr, arg12.expr, arg13.expr, arg14.expr, arg15.expr, arg16.expr, arg17.expr, arg18.expr, arg19.expr, arg20.expr, arg21.expr, arg22.expr)) + def udf[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](f: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT]): UserDefinedFunction = { + UserDefinedFunction(f, ScalaReflection.schemaFor(typeTag[RT]).dataType) } ////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala similarity index 56% rename from sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala rename to sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 1beb19437a..d8b0a3b26d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UdfRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -78,20 +78,21 @@ class UDFRegistration(sqlContext: SQLContext) extends Logging { // scalastyle:off - /* registerFunction 0-22 were generated by this script + /* register 0-22 were generated by this script (0 to 22).map { x => val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"}) val typeTags = (1 to x).map(i => s"A${i}: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) - val argDocs = (1 to x).map(i => s" * @tparam A$i type of the UDF argument at position $i.").foldLeft("")(_ + "\n" + _) println(s""" /** * Register a Scala closure of ${x} arguments as user-defined function (UDF). - * @tparam RT return type of UDF.$argDocs + * @tparam RT return type of UDF. */ - def register[$typeTags](name: String, func: Function$x[$types]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) }""") } @@ -116,462 +117,258 @@ class UDFRegistration(sqlContext: SQLContext) extends Logging { * Register a Scala closure of 0 arguments as user-defined function (UDF). * @tparam RT return type of UDF. */ - def register[RT: TypeTag](name: String, func: Function0[RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 1 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. */ - def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 2 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag](name: String, func: Function2[A1, A2, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 3 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag](name: String, func: Function3[A1, A2, A3, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 4 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag](name: String, func: Function4[A1, A2, A3, A4, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 5 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](name: String, func: Function5[A1, A2, A3, A4, A5, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag](name: String, func: Function5[A1, A2, A3, A4, A5, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 6 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](name: String, func: Function6[A1, A2, A3, A4, A5, A6, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag](name: String, func: Function6[A1, A2, A3, A4, A5, A6, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 7 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](name: String, func: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag](name: String, func: Function7[A1, A2, A3, A4, A5, A6, A7, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 8 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](name: String, func: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag](name: String, func: Function8[A1, A2, A3, A4, A5, A6, A7, A8, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 9 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](name: String, func: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag](name: String, func: Function9[A1, A2, A3, A4, A5, A6, A7, A8, A9, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 10 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](name: String, func: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag](name: String, func: Function10[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 11 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](name: String, func: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag](name: String, func: Function11[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 12 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](name: String, func: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag](name: String, func: Function12[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 13 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](name: String, func: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag](name: String, func: Function13[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 14 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](name: String, func: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag](name: String, func: Function14[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 15 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](name: String, func: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag](name: String, func: Function15[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 16 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. - * @tparam A16 type of the UDF argument at position 16. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](name: String, func: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag](name: String, func: Function16[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 17 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. - * @tparam A16 type of the UDF argument at position 16. - * @tparam A17 type of the UDF argument at position 17. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](name: String, func: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag](name: String, func: Function17[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 18 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. - * @tparam A16 type of the UDF argument at position 16. - * @tparam A17 type of the UDF argument at position 17. - * @tparam A18 type of the UDF argument at position 18. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](name: String, func: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag](name: String, func: Function18[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 19 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. - * @tparam A16 type of the UDF argument at position 16. - * @tparam A17 type of the UDF argument at position 17. - * @tparam A18 type of the UDF argument at position 18. - * @tparam A19 type of the UDF argument at position 19. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](name: String, func: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag](name: String, func: Function19[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 20 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. - * @tparam A16 type of the UDF argument at position 16. - * @tparam A17 type of the UDF argument at position 17. - * @tparam A18 type of the UDF argument at position 18. - * @tparam A19 type of the UDF argument at position 19. - * @tparam A20 type of the UDF argument at position 20. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](name: String, func: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag](name: String, func: Function20[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 21 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. - * @tparam A16 type of the UDF argument at position 16. - * @tparam A17 type of the UDF argument at position 17. - * @tparam A18 type of the UDF argument at position 18. - * @tparam A19 type of the UDF argument at position 19. - * @tparam A20 type of the UDF argument at position 20. - * @tparam A21 type of the UDF argument at position 21. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](name: String, func: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag](name: String, func: Function21[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } /** * Register a Scala closure of 22 arguments as user-defined function (UDF). * @tparam RT return type of UDF. - * @tparam A1 type of the UDF argument at position 1. - * @tparam A2 type of the UDF argument at position 2. - * @tparam A3 type of the UDF argument at position 3. - * @tparam A4 type of the UDF argument at position 4. - * @tparam A5 type of the UDF argument at position 5. - * @tparam A6 type of the UDF argument at position 6. - * @tparam A7 type of the UDF argument at position 7. - * @tparam A8 type of the UDF argument at position 8. - * @tparam A9 type of the UDF argument at position 9. - * @tparam A10 type of the UDF argument at position 10. - * @tparam A11 type of the UDF argument at position 11. - * @tparam A12 type of the UDF argument at position 12. - * @tparam A13 type of the UDF argument at position 13. - * @tparam A14 type of the UDF argument at position 14. - * @tparam A15 type of the UDF argument at position 15. - * @tparam A16 type of the UDF argument at position 16. - * @tparam A17 type of the UDF argument at position 17. - * @tparam A18 type of the UDF argument at position 18. - * @tparam A19 type of the UDF argument at position 19. - * @tparam A20 type of the UDF argument at position 20. - * @tparam A21 type of the UDF argument at position 21. - * @tparam A22 type of the UDF argument at position 22. */ - def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](name: String, func: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT]): Unit = { - def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor[RT].dataType, e) + def register[RT: TypeTag, A1: TypeTag, A2: TypeTag, A3: TypeTag, A4: TypeTag, A5: TypeTag, A6: TypeTag, A7: TypeTag, A8: TypeTag, A9: TypeTag, A10: TypeTag, A11: TypeTag, A12: TypeTag, A13: TypeTag, A14: TypeTag, A15: TypeTag, A16: TypeTag, A17: TypeTag, A18: TypeTag, A19: TypeTag, A20: TypeTag, A21: TypeTag, A22: TypeTag](name: String, func: Function22[A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16, A17, A18, A19, A20, A21, A22, RT]): UserDefinedFunction = { + val dataType = ScalaReflection.schemaFor[RT].dataType + def builder(e: Seq[Expression]) = ScalaUdf(func, dataType, e) functionRegistry.registerFunction(name, builder) + UserDefinedFunction(func, dataType) } + ////////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////////////////////////////////////////////////////////////// + /** * Register a user-defined function with 1 arguments. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala new file mode 100644 index 0000000000..8d7c2a1b83 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/UserDefinedFunction.scala @@ -0,0 +1,39 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.ScalaUdf +import org.apache.spark.sql.types.DataType + +/** + * A user-defined function. To create one, use the `udf` functions in [[Dsl]]. + * As an example: + * {{{ + * // Defined a UDF that returns true or false based on some numeric score. + * val predict = udf((score: Double) => if (score > 0.5) true else false) + * + * // Projects a column that adds a prediction column based on the score column. + * df.select( predict(df("score")) ) + * }}} + */ +case class UserDefinedFunction(f: AnyRef, dataType: DataType) { + + def apply(exprs: Column*): Column = { + Column(ScalaUdf(f, dataType, exprs.map(_.expr))) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f6b65a81ce..19d4f34e56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.Dsl._ import org.apache.spark.sql.types._ /* Implicits */ -import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.sql.test.TestSQLContext.{createDataFrame, logicalPlanToSparkQuery} import scala.language.postfixOps @@ -280,11 +280,11 @@ class DataFrameSuite extends QueryTest { } test("udf") { - val foo = (a: Int, b: String) => a.toString + b + val foo = udf((a: Int, b: String) => a.toString + b) checkAnswer( // SELECT *, foo(key, value) FROM testData - testData.select($"*", callUDF(foo, 'key, 'value)).limit(3), + testData.select($"*", foo('key, 'value)).limit(3), Row(1, "1", "11") :: Row(2, "2", "22") :: Row(3, "3", "33") :: Nil ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0501b47f08..8f3d4265a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.util.TimeZone +import org.apache.spark.sql.test.TestSQLContext import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.Dsl._ @@ -26,9 +27,8 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types._ -/* Implicits */ import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.sql.test.TestSQLContext.{udf => _, _} class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { @@ -794,7 +794,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { } test("SPARK-3371 Renaming a function expression with group by gives error") { - udf.register("len", (s: String) => s.length) + TestSQLContext.udf.register("len", (s: String) => s.length) checkAnswer( sql("SELECT len(value) as temp FROM testData WHERE key = 1 group by len(value)"), Row(1)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 0696a2335e..117a511734 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -21,7 +21,8 @@ import scala.beans.{BeanInfo, BeanProperty} import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dsl._ -import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext.{udf => _, _} import org.apache.spark.sql.types._ @@ -83,7 +84,7 @@ class UserDefinedTypeSuite extends QueryTest { } test("UDTs and UDFs") { - udf.register("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector]) + TestSQLContext.udf.register("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector]) pointsRDD.registerTempTable("points") checkAnswer( sql("SELECT testType(features) from points"),