[SPARK-22494][SQL] Fix 64KB limit exception with Coalesce and AtleastNNonNulls
## What changes were proposed in this pull request? Both `Coalesce` and `AtLeastNNonNulls` can cause the 64KB limit exception when used with a lot of arguments and/or complex expressions. This PR splits their expressions in order to avoid the issue. ## How was this patch tested? Added UTs Author: Marco Gaido <marcogaido91@gmail.com> Author: Marco Gaido <mgaido@hortonworks.com> Closes #19720 from mgaido91/SPARK-22494.
This commit is contained in:
parent
ed885e7a65
commit
4e7f07e255
|
@ -72,14 +72,10 @@ case class Coalesce(children: Seq[Expression]) extends Expression {
|
|||
}
|
||||
|
||||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
|
||||
val first = children(0)
|
||||
val rest = children.drop(1)
|
||||
val firstEval = first.genCode(ctx)
|
||||
ev.copy(code = s"""
|
||||
${firstEval.code}
|
||||
boolean ${ev.isNull} = ${firstEval.isNull};
|
||||
${ctx.javaType(dataType)} ${ev.value} = ${firstEval.value};""" +
|
||||
rest.map { e =>
|
||||
ctx.addMutableState("boolean", ev.isNull, "")
|
||||
ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
|
||||
|
||||
val evals = children.map { e =>
|
||||
val eval = e.genCode(ctx)
|
||||
s"""
|
||||
if (${ev.isNull}) {
|
||||
|
@ -90,7 +86,12 @@ case class Coalesce(children: Seq[Expression]) extends Expression {
|
|||
}
|
||||
}
|
||||
"""
|
||||
}.mkString("\n"))
|
||||
}
|
||||
|
||||
ev.copy(code = s"""
|
||||
${ev.isNull} = true;
|
||||
${ev.value} = ${ctx.defaultValue(dataType)};
|
||||
${ctx.splitExpressions(ctx.INPUT_ROW, evals)}""")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,7 +358,7 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate
|
|||
|
||||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
|
||||
val nonnull = ctx.freshName("nonnull")
|
||||
val code = children.map { e =>
|
||||
val evals = children.map { e =>
|
||||
val eval = e.genCode(ctx)
|
||||
e.dataType match {
|
||||
case DoubleType | FloatType =>
|
||||
|
@ -379,7 +380,26 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate
|
|||
}
|
||||
"""
|
||||
}
|
||||
}.mkString("\n")
|
||||
}
|
||||
|
||||
val code = if (ctx.INPUT_ROW == null || ctx.currentVars != null) {
|
||||
evals.mkString("\n")
|
||||
} else {
|
||||
ctx.splitExpressions(evals, "atLeastNNonNulls",
|
||||
("InternalRow", ctx.INPUT_ROW) :: ("int", nonnull) :: Nil,
|
||||
returnType = "int",
|
||||
makeSplitFunction = { body =>
|
||||
s"""
|
||||
$body
|
||||
return $nonnull;
|
||||
"""
|
||||
},
|
||||
foldFunctions = { funcCalls =>
|
||||
funcCalls.map(funcCall => s"$nonnull = $funcCall;").mkString("\n")
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
ev.copy(code = s"""
|
||||
int $nonnull = 0;
|
||||
$code
|
||||
|
|
|
@ -149,4 +149,14 @@ class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
|
|||
checkEvaluation(AtLeastNNonNulls(3, nullOnly), true, EmptyRow)
|
||||
checkEvaluation(AtLeastNNonNulls(4, nullOnly), false, EmptyRow)
|
||||
}
|
||||
|
||||
test("Coalesce should not throw 64kb exception") {
|
||||
val inputs = (1 to 2500).map(x => Literal(s"x_$x"))
|
||||
checkEvaluation(Coalesce(inputs), "x_1")
|
||||
}
|
||||
|
||||
test("AtLeastNNonNulls should not throw 64kb exception") {
|
||||
val inputs = (1 to 4000).map(x => Literal(s"x_$x"))
|
||||
checkEvaluation(AtLeastNNonNulls(1, inputs), true)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue