From 4e7f07e2550fa995cc37406173a937033135cf3b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 16 Nov 2017 18:19:13 +0100 Subject: [PATCH] [SPARK-22494][SQL] Fix 64KB limit exception with Coalesce and AtleastNNonNulls ## What changes were proposed in this pull request? Both `Coalesce` and `AtLeastNNonNulls` can cause the 64KB limit exception when used with a lot of arguments and/or complex expressions. This PR splits their expressions in order to avoid the issue. ## How was this patch tested? Added UTs Author: Marco Gaido Author: Marco Gaido Closes #19720 from mgaido91/SPARK-22494. --- .../expressions/nullExpressions.scala | 42 ++++++++++++++----- .../expressions/NullExpressionsSuite.scala | 10 +++++ 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala index 62786e13bd..4aeab2c3ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala @@ -72,14 +72,10 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val first = children(0) - val rest = children.drop(1) - val firstEval = first.genCode(ctx) - ev.copy(code = s""" - ${firstEval.code} - boolean ${ev.isNull} = ${firstEval.isNull}; - ${ctx.javaType(dataType)} ${ev.value} = ${firstEval.value};""" + - rest.map { e => + ctx.addMutableState("boolean", ev.isNull, "") + ctx.addMutableState(ctx.javaType(dataType), ev.value, "") + + val evals = children.map { e => val eval = e.genCode(ctx) s""" if (${ev.isNull}) { @@ -90,7 +86,12 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } } """ - }.mkString("\n")) + } + + ev.copy(code = s""" + ${ev.isNull} = true; + ${ev.value} = ${ctx.defaultValue(dataType)}; + ${ctx.splitExpressions(ctx.INPUT_ROW, evals)}""") } } @@ -357,7 +358,7 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val nonnull = ctx.freshName("nonnull") - val code = children.map { e => + val evals = children.map { e => val eval = e.genCode(ctx) e.dataType match { case DoubleType | FloatType => @@ -379,7 +380,26 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate } """ } - }.mkString("\n") + } + + val code = if (ctx.INPUT_ROW == null || ctx.currentVars != null) { + evals.mkString("\n") + } else { + ctx.splitExpressions(evals, "atLeastNNonNulls", + ("InternalRow", ctx.INPUT_ROW) :: ("int", nonnull) :: Nil, + returnType = "int", + makeSplitFunction = { body => + s""" + $body + return $nonnull; + """ + }, + foldFunctions = { funcCalls => + funcCalls.map(funcCall => s"$nonnull = $funcCall;").mkString("\n") + } + ) + } + ev.copy(code = s""" int $nonnull = 0; $code diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala index 394c0a091e..40ef7770da 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NullExpressionsSuite.scala @@ -149,4 +149,14 @@ class NullExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(AtLeastNNonNulls(3, nullOnly), true, EmptyRow) checkEvaluation(AtLeastNNonNulls(4, nullOnly), false, EmptyRow) } + + test("Coalesce should not throw 64kb exception") { + val inputs = (1 to 2500).map(x => Literal(s"x_$x")) + checkEvaluation(Coalesce(inputs), "x_1") + } + + test("AtLeastNNonNulls should not throw 64kb exception") { + val inputs = (1 to 4000).map(x => Literal(s"x_$x")) + checkEvaluation(AtLeastNNonNulls(1, inputs), true) + } }