[SPARK-31897][SQL] Enable codegen for GenerateExec

### What changes were proposed in this pull request?
Enabling codegen for GenerateExec

### Why are the changes needed?
To leverage code generation for Generators

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- UT tests added

### Benchmark
```
case class Data(value1: Float, value2: Map[String, String], value3: String)
val path = "<path>"

val numRecords = Seq(10000000, 100000000)
numRecords.map {
  recordCount =>
    import java.util.concurrent.TimeUnit.NANOSECONDS

    val srcDF = spark.range(recordCount).map {
      x => Data(x.toFloat, Map(x.toString -> x.toString ), s"value3$x")
    }.select($"value1", explode($"value2"), $"value3")
    val start = System.nanoTime()
    srcDF
      .write
      .mode("overwrite")
      .parquet(s"$path/$recordCount")
    val end = System.nanoTime()
    val diff = end - start
    (recordCount, NANOSECONDS.toMillis(diff))
}
```
**With codegen**:
```
res0: Seq[(Int, Long)] = List((10000000,13989), (100000000,129625))
```
**Without codegen**:
```
res0: Seq[(Int, Long)] = List((10000000,15736), (100000000,150399))
```

Closes #28715 from karuppayya/SPARK-31897.

Lead-authored-by: Karuppayya Rajendran <karuppayya1990@gmail.com>
Co-authored-by: Karuppayya Rajendran <karuppayya.rajendran@apple.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Karuppayya Rajendran 2021-03-18 20:50:28 -07:00 committed by Liang-Chi Hsieh
parent 07ee73234f
commit 0a58029d52
5 changed files with 161 additions and 13 deletions

View file

@ -0,0 +1,12 @@
================================================================================================
GenerateExec benchmark
================================================================================================
OpenJDK 64-Bit Server VM 11.0.9+11-LTS on Mac OS X 10.16
Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
GenerateExec Benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
GenerateExec Benchmark wholestage off 72694 72900 292 1.4 726.9 1.0X
GenerateExec Benchmark wholestage on 29207 30182 562 3.4 292.1 2.5X

View file

@ -0,0 +1,12 @@
================================================================================================
GenerateExec benchmark
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_271-b09 on Mac OS X 10.16
Intel(R) Core(TM) i9-9880H CPU @ 2.30GHz
GenerateExec Benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
GenerateExec Benchmark wholestage off 88194 88334 199 1.1 881.9 1.0X
GenerateExec Benchmark wholestage on 43805 44271 326 2.3 438.0 2.0X

View file

@ -124,7 +124,7 @@ case class GenerateExec(
}
}
override def supportCodegen: Boolean = false
override def supportCodegen: Boolean = generator.supportCodegen
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
@ -137,16 +137,13 @@ case class GenerateExec(
override def needCopyResult: Boolean = true
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
// Add input rows to the values when we are joining
val values = if (requiredChildOutput.nonEmpty) {
input
} else {
Seq.empty
}
val requiredAttrSet = AttributeSet(requiredChildOutput)
val requiredInput = child.output.zip(input).filter {
case (attr, _) => requiredAttrSet.contains(attr)
}.map(_._2)
boundGenerator match {
case e: CollectionGenerator => codeGenCollection(ctx, e, values, row)
case g => codeGenTraversableOnce(ctx, g, values, row)
case e: CollectionGenerator => codeGenCollection(ctx, e, requiredInput, row)
case g => codeGenTraversableOnce(ctx, g, requiredInput, row)
}
}
@ -244,7 +241,7 @@ case class GenerateExec(
private def codeGenTraversableOnce(
ctx: CodegenContext,
e: Expression,
input: Seq[ExprCode],
requiredInput: Seq[ExprCode],
row: ExprCode): String = {
// Generate the code for the generator
@ -280,7 +277,7 @@ case class GenerateExec(
| boolean $hasNext = $iterator.hasNext();
| InternalRow $current = (InternalRow)($hasNext? $iterator.next() : null);
| $outerVal = false;
| ${consume(ctx, input ++ values)}
| ${consume(ctx, requiredInput ++ values)}
|}
""".stripMargin
} else {
@ -290,7 +287,7 @@ case class GenerateExec(
|while ($iterator.hasNext()) {
| $numOutput.add(1);
| InternalRow $current = (InternalRow)($iterator.next());
| ${consume(ctx, input ++ values)}
| ${consume(ctx, requiredInput ++ values)}
|}
""".stripMargin
}

View file

@ -50,6 +50,85 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
assert(df.collect() === Array(Row(9, 4.5)))
}
testWithWholeStageCodegenOnAndOff("GenerateExec should be" +
" included in WholeStageCodegen") { codegenEnabled =>
import testImplicits._
val arrayData = Seq(("James", Seq("Java", "Scala"), Map("hair" -> "black", "eye" -> "brown")))
val df = arrayData.toDF("name", "knownLanguages", "properties")
// Array - explode
var expDF = df.select($"name", explode($"knownLanguages"), $"properties")
var plan = expDF.queryExecution.executedPlan
assert(plan.find {
case stage: WholeStageCodegenExec =>
stage.find(_.isInstanceOf[GenerateExec]).isDefined
case _ => !codegenEnabled.toBoolean
}.isDefined)
checkAnswer(expDF, Array(Row("James", "Java", Map("hair" -> "black", "eye" -> "brown")),
Row("James", "Scala", Map("hair" -> "black", "eye" -> "brown"))))
// Map - explode
expDF = df.select($"name", $"knownLanguages", explode($"properties"))
plan = expDF.queryExecution.executedPlan
assert(plan.find {
case stage: WholeStageCodegenExec =>
stage.find(_.isInstanceOf[GenerateExec]).isDefined
case _ => !codegenEnabled.toBoolean
}.isDefined)
checkAnswer(expDF,
Array(Row("James", List("Java", "Scala"), "hair", "black"),
Row("James", List("Java", "Scala"), "eye", "brown")))
// Array - posexplode
expDF = df.select($"name", posexplode($"knownLanguages"))
plan = expDF.queryExecution.executedPlan
assert(plan.find {
case stage: WholeStageCodegenExec =>
stage.find(_.isInstanceOf[GenerateExec]).isDefined
case _ => !codegenEnabled.toBoolean
}.isDefined)
checkAnswer(expDF,
Array(Row("James", 0, "Java"), Row("James", 1, "Scala")))
// Map - posexplode
expDF = df.select($"name", posexplode($"properties"))
plan = expDF.queryExecution.executedPlan
assert(plan.find {
case stage: WholeStageCodegenExec =>
stage.find(_.isInstanceOf[GenerateExec]).isDefined
case _ => !codegenEnabled.toBoolean
}.isDefined)
checkAnswer(expDF,
Array(Row("James", 0, "hair", "black"), Row("James", 1, "eye", "brown")))
// Array - explode , selecting all columns
expDF = df.select($"*", explode($"knownLanguages"))
plan = expDF.queryExecution.executedPlan
assert(plan.find {
case stage: WholeStageCodegenExec =>
stage.find(_.isInstanceOf[GenerateExec]).isDefined
case _ => !codegenEnabled.toBoolean
}.isDefined)
checkAnswer(expDF,
Array(Row("James", Seq("Java", "Scala"), Map("hair" -> "black", "eye" -> "brown"), "Java"),
Row("James", Seq("Java", "Scala"), Map("hair" -> "black", "eye" -> "brown"), "Scala")))
// Map - explode, selecting all columns
expDF = df.select($"*", explode($"properties"))
plan = expDF.queryExecution.executedPlan
assert(plan.find {
case stage: WholeStageCodegenExec =>
stage.find(_.isInstanceOf[GenerateExec]).isDefined
case _ => !codegenEnabled.toBoolean
}.isDefined)
checkAnswer(expDF,
Array(
Row("James", List("Java", "Scala"),
Map("hair" -> "black", "eye" -> "brown"), "hair", "black"),
Row("James", List("Java", "Scala"),
Map("hair" -> "black", "eye" -> "brown"), "eye", "brown")))
}
test("Aggregate with grouping keys should be included in WholeStageCodegen") {
val df = spark.range(3).groupBy(col("id") * 2).count().orderBy(col("id") * 2)
val plan = df.queryExecution.executedPlan

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.benchmark
import org.apache.spark.sql.functions.explode
/**
* Benchmark to measure performance for generate exec operator.
* To run this benchmark:
* {{{
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/GenerateExecBenchmark-results.txt".
* }}}
*/
case class Data(value1: Float, value2: Map[String, String], value3: String)
object GenerateExecBenchmark extends SqlBasedBenchmark {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("GenerateExec benchmark") {
import spark.implicits._
val numRecords = 100000000
codegenBenchmark("GenerateExec Benchmark", numRecords) {
val srcDF = spark.range(numRecords).map {
x => Data(x.toFloat, Map(x.toString -> x.toString), s"value3$x")
}.select($"value1", explode($"value2"), $"value3")
srcDF.noop()
}
}
}
}