[SPARK-28916][SQL] Split subexpression elimination functions code for Generate[Mutable|Unsafe]Projection

### What changes were proposed in this pull request?

The PR proposes to split the code for subexpression elimination before inlining the function calls all in the apply method for `Generate[Mutable|Unsafe]Projection`.

### Why are the changes needed?

Before this PR, code generation can fail due to the 64KB code size limit if a lot of subexpression elimination functions are generated. The added UT is a reproducer for the issue (thanks to the JIRA reporter and HyukjinKwon for it).

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

added UT

Closes #25642 from mgaido91/SPARK-28916.

Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Marco Gaido 2019-09-09 13:30:56 +08:00 committed by Wenchen Fan
parent 0ed9fae457
commit c411579355
4 changed files with 27 additions and 5 deletions

View file

@ -403,13 +403,14 @@ class CodegenContext {
* equivalentExpressions will match the tree containing `col1 + col2` and it will only * equivalentExpressions will match the tree containing `col1 + col2` and it will only
* be evaluated once. * be evaluated once.
*/ */
val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions private val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions
// Foreach expression that is participating in subexpression elimination, the state to use. // Foreach expression that is participating in subexpression elimination, the state to use.
var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState] // Visible for testing.
private[expressions] var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState]
// The collection of sub-expression result resetting methods that need to be called on each row. // The collection of sub-expression result resetting methods that need to be called on each row.
val subexprFunctions = mutable.ArrayBuffer.empty[String] private val subexprFunctions = mutable.ArrayBuffer.empty[String]
val outerClassName = "OuterClass" val outerClassName = "OuterClass"
@ -993,6 +994,15 @@ class CodegenContext {
} }
} }
/**
* Returns the code for subexpression elimination after splitting it if necessary.
*/
def subexprFunctionsCode: String = {
// Whole-stage codegen's subexpression elimination is handled in another code path
assert(currentVars == null || subexprFunctions.isEmpty)
splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW))
}
/** /**
* Perform a function which generates a sequence of ExprCodes with a given mapping between * Perform a function which generates a sequence of ExprCodes with a given mapping between
* expressions and common expressions, instead of using the mapping in current context. * expressions and common expressions, instead of using the mapping in current context.

View file

@ -92,7 +92,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
} }
// Evaluate all the subexpressions. // Evaluate all the subexpressions.
val evalSubexpr = ctx.subexprFunctions.mkString("\n") val evalSubexpr = ctx.subexprFunctionsCode
val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1)) val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1))
val allUpdates = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._2)) val allUpdates = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._2))

View file

@ -299,7 +299,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
v => s"$v = new $rowWriterClass(${expressions.length}, ${numVarLenFields * 32});") v => s"$v = new $rowWriterClass(${expressions.length}, ${numVarLenFields * 32});")
// Evaluate all the subexpression. // Evaluate all the subexpression.
val evalSubexpr = ctx.subexprFunctions.mkString("\n") val evalSubexpr = ctx.subexprFunctionsCode
val writeExpressions = writeExpressionsToBuffer( val writeExpressions = writeExpressionsToBuffer(
ctx, ctx.INPUT_ROW, exprEvals, exprSchemas, rowWriter, isTopLevel = true) ctx, ctx.INPUT_ROW, exprEvals, exprSchemas, rowWriter, isTopLevel = true)

View file

@ -545,6 +545,18 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
} }
assert(appender.seenMessage) assert(appender.seenMessage)
} }
test("SPARK-28916: subexrepssion elimination can cause 64kb code limit on UnsafeProjection") {
val numOfExprs = 10000
val exprs = (0 to numOfExprs).flatMap(colIndex =>
Seq(Add(BoundReference(colIndex, DoubleType, true),
BoundReference(numOfExprs + colIndex, DoubleType, true)),
Add(BoundReference(colIndex, DoubleType, true),
BoundReference(numOfExprs + colIndex, DoubleType, true))))
// these should not fail to compile due to 64K limit
GenerateUnsafeProjection.generate(exprs, true)
GenerateMutableProjection.generate(exprs, true)
}
} }
case class HugeCodeIntExpression(value: Int) extends Expression { case class HugeCodeIntExpression(value: Int) extends Expression {