[SPARK-26580][SQL] remove Scala 2.11 hack for Scala UDF

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/22732 , we tried our best to keep the behavior of Scala UDF unchanged in Spark 2.4.

However, since Spark 3.0, Scala 2.12 is the default. The trick that was used to keep the behavior unchanged doesn't work with Scala 2.12.

This PR proposes to remove the Scala 2.11 hack, as it's not useful.

## How was this patch tested?

existing tests.

Closes #23498 from cloud-fan/udf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2019-01-11 14:52:13 +08:00
parent 98e831d321
commit 1f1d98c6fa
6 changed files with 23 additions and 40 deletions

View file

@ -43,6 +43,8 @@ displayTitle: Spark SQL Upgrading Guide
- Since Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Set JSON option `inferTimestamp` to `false` to disable such type inferring.
- In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.
## Upgrading From Spark SQL 2.3 to 2.4
- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.

View file

@ -957,23 +957,6 @@ trait ScalaReflection extends Logging {
tpe.dealias.erasure.typeSymbol.asClass.fullName
}
/**
* Returns the nullability of the input parameter types of the scala function object.
*
* Note that this only works with Scala 2.11, and the information returned may be inaccurate if
* used with a different Scala version.
*/
def getParameterTypeNullability(func: AnyRef): Seq[Boolean] = {
if (!Properties.versionString.contains("2.11")) {
logWarning(s"Scala ${Properties.versionString} cannot get type nullability correctly via " +
"reflection, thus Spark cannot add proper input null check for UDF. To avoid this " +
"problem, use the typed UDF interfaces instead.")
}
val methods = func.getClass.getMethods.filter(m => m.getName == "apply" && !m.isBridge)
assert(methods.length == 1)
methods.head.getParameterTypes.map(!_.isPrimitive)
}
/**
* Returns the parameter names and types for the primary constructor of this type.
*

View file

@ -54,18 +54,6 @@ case class ScalaUDF(
udfDeterministic: Boolean = true)
extends Expression with NonSQLExpression with UserDefinedExpression {
// The constructor for SPARK 2.1 and 2.2
def this(
function: AnyRef,
dataType: DataType,
children: Seq[Expression],
inputTypes: Seq[DataType],
udfName: Option[String]) = {
this(
function, dataType, children, ScalaReflection.getParameterTypeNullability(function),
inputTypes, udfName, nullable = true, udfDeterministic = true)
}
override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic)
override def toString: String =

View file

@ -102,17 +102,7 @@ private[sql] case class SparkUserDefinedFunction(
// It's possible that some of the inputs don't have a specific type(e.g. `Any`), skip type
// check and null check for them.
val inputTypes = inputSchemas.map(_.map(_.dataType).getOrElse(AnyDataType))
val inputsNullSafe = if (inputSchemas.isEmpty) {
// This is for backward compatibility of `functions.udf(AnyRef, DataType)`. We need to
// do reflection of the lambda function object and see if its arguments are nullable or not.
// This doesn't work for Scala 2.12 and we should consider removing this workaround, as Spark
// uses Scala 2.12 by default since 3.0.
ScalaReflection.getParameterTypeNullability(f)
} else {
inputSchemas.map(_.map(_.nullable).getOrElse(true))
}
val inputsNullSafe = inputSchemas.map(_.map(_.nullable).getOrElse(true))
ScalaUDF(
f,
dataType,

View file

@ -4250,6 +4250,13 @@ object functions {
* By default the returned UDF is deterministic. To change it to nondeterministic, call the
* API `UserDefinedFunction.asNondeterministic()`.
*
* Note that, although the Scala closure can have primitive-type function argument, it doesn't
* work well with null values. Because the Scala closure is passed in as Any type, there is no
* type information for the function arguments. Without the type information, Spark may blindly
* pass null to the Scala closure with primitive-type argument, and the closure will see the
* default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`,
* the result is 0 for null input.
*
* @param f A closure in Scala
* @param dataType The output data type of the UDF
*

View file

@ -423,6 +423,19 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
}
test("SPARK-25044 Verify null input handling for primitive types - with udf(Any, DataType)") {
val f = udf((x: Int) => x, IntegerType)
checkAnswer(
Seq(new Integer(1), null).toDF("x").select(f($"x")),
Row(1) :: Row(0) :: Nil)
val f2 = udf((x: Double) => x, DoubleType)
checkAnswer(
Seq(new java.lang.Double(1.1), null).toDF("x").select(f2($"x")),
Row(1.1) :: Row(0.0) :: Nil)
}
test("SPARK-26308: udf with decimal") {
val df1 = spark.createDataFrame(
sparkContext.parallelize(Seq(Row(new BigDecimal("2011000000000002456556")))),