[SPARK-33455][SQL][TEST] Add SubExprEliminationBenchmark for benchmarking subexpression elimination
### What changes were proposed in this pull request? This patch adds a benchmark `SubExprEliminationBenchmark` for benchmarking subexpression elimination feature. ### Why are the changes needed? We need a benchmark for subexpression elimination feature for change such as #30341. ### Does this PR introduce _any_ user-facing change? No, dev only. ### How was this patch tested? Unit test. Closes #30379 from viirya/SPARK-33455. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
parent
156704ba0d
commit
eea846b895
|
@ -0,0 +1,15 @@
|
|||
================================================================================================
|
||||
Benchmark for performance of subexpression elimination
|
||||
================================================================================================
|
||||
|
||||
Preparing data for benchmarking ...
|
||||
OpenJDK 64-Bit Server VM 11.0.9+11 on Mac OS X 10.15.6
|
||||
Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
|
||||
from_json as subExpr: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
-------------------------------------------------------------------------------------------------------------------------
|
||||
subexpressionElimination off, codegen on 26809 27731 898 0.0 268094225.4 1.0X
|
||||
subexpressionElimination off, codegen off 25117 26612 1357 0.0 251166638.4 1.1X
|
||||
subexpressionElimination on, codegen on 2582 2906 282 0.0 25819408.7 10.4X
|
||||
subexpressionElimination on, codegen off 25635 26131 804 0.0 256346873.1 1.0X
|
||||
|
||||
|
15
sql/core/benchmarks/SubExprEliminationBenchmark-results.txt
Normal file
15
sql/core/benchmarks/SubExprEliminationBenchmark-results.txt
Normal file
|
@ -0,0 +1,15 @@
|
|||
================================================================================================
|
||||
Benchmark for performance of subexpression elimination
|
||||
================================================================================================
|
||||
|
||||
Preparing data for benchmarking ...
|
||||
OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Mac OS X 10.15.6
|
||||
Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
|
||||
from_json as subExpr: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
|
||||
-------------------------------------------------------------------------------------------------------------------------
|
||||
subexpressionElimination off, codegen on 24841 25365 803 0.0 248412787.5 1.0X
|
||||
subexpressionElimination off, codegen off 25344 26205 941 0.0 253442656.5 1.0X
|
||||
subexpressionElimination on, codegen on 2883 3019 119 0.0 28833086.8 8.6X
|
||||
subexpressionElimination on, codegen off 24707 25688 903 0.0 247068775.9 1.0X
|
||||
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.sql.execution
|
||||
|
||||
import org.apache.spark.benchmark.Benchmark
|
||||
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
/**
|
||||
* The benchmarks aims to measure performance of the queries where there are subexpression
|
||||
* elimination or not.
|
||||
* To run this benchmark:
|
||||
* {{{
|
||||
* 1. without sbt:
|
||||
* bin/spark-submit --class <this class> --jars <spark core test jar>,
|
||||
* <spark catalyst test jar> <spark sql test jar>
|
||||
* 2. build/sbt "sql/test:runMain <this class>"
|
||||
* 3. generate result:
|
||||
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
|
||||
* Results will be written to "benchmarks/SubExprEliminationBenchmark-results.txt".
|
||||
* }}}
|
||||
*/
|
||||
object SubExprEliminationBenchmark extends SqlBasedBenchmark {
|
||||
import spark.implicits._
|
||||
|
||||
def withFromJson(rowsNum: Int, numIters: Int): Unit = {
|
||||
val benchmark = new Benchmark("from_json as subExpr", rowsNum, output = output)
|
||||
|
||||
withTempPath { path =>
|
||||
prepareDataInfo(benchmark)
|
||||
val numCols = 1000
|
||||
val schema = writeWideRow(path.getAbsolutePath, rowsNum, numCols)
|
||||
|
||||
val cols = (0 until numCols).map { idx =>
|
||||
from_json('value, schema).getField(s"col$idx")
|
||||
}
|
||||
|
||||
// We only benchmark subexpression performance under codegen/non-codegen, so disabling
|
||||
// json optimization.
|
||||
benchmark.addCase("subexpressionElimination off, codegen on", numIters) { _ =>
|
||||
withSQLConf(
|
||||
SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> "false",
|
||||
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
|
||||
SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY",
|
||||
SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> "false") {
|
||||
val df = spark.read
|
||||
.text(path.getAbsolutePath)
|
||||
.select(cols: _*)
|
||||
df.collect()
|
||||
}
|
||||
}
|
||||
|
||||
benchmark.addCase("subexpressionElimination off, codegen off", numIters) { _ =>
|
||||
withSQLConf(
|
||||
SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> "false",
|
||||
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
|
||||
SQLConf.CODEGEN_FACTORY_MODE.key -> "NO_CODEGEN",
|
||||
SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> "false") {
|
||||
val df = spark.read
|
||||
.text(path.getAbsolutePath)
|
||||
.select(cols: _*)
|
||||
df.collect()
|
||||
}
|
||||
}
|
||||
|
||||
benchmark.addCase("subexpressionElimination on, codegen on", numIters) { _ =>
|
||||
withSQLConf(
|
||||
SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> "true",
|
||||
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
|
||||
SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY",
|
||||
SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> "false") {
|
||||
val df = spark.read
|
||||
.text(path.getAbsolutePath)
|
||||
.select(cols: _*)
|
||||
df.collect()
|
||||
}
|
||||
}
|
||||
|
||||
benchmark.addCase("subexpressionElimination on, codegen off", numIters) { _ =>
|
||||
withSQLConf(
|
||||
SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> "true",
|
||||
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
|
||||
SQLConf.CODEGEN_FACTORY_MODE.key -> "NO_CODEGEN",
|
||||
SQLConf.JSON_EXPRESSION_OPTIMIZATION.key -> "false") {
|
||||
val df = spark.read
|
||||
.text(path.getAbsolutePath)
|
||||
.select(cols: _*)
|
||||
df.collect()
|
||||
}
|
||||
}
|
||||
|
||||
benchmark.run()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
|
||||
val numIters = 3
|
||||
runBenchmark("Benchmark for performance of subexpression elimination") {
|
||||
withFromJson(100, numIters)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,7 +22,9 @@ import org.apache.spark.internal.config.UI.UI_ENABLED
|
|||
import org.apache.spark.sql.{Dataset, SparkSession}
|
||||
import org.apache.spark.sql.SaveMode.Overwrite
|
||||
import org.apache.spark.sql.catalyst.plans.SQLHelper
|
||||
import org.apache.spark.sql.functions.lit
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
* Common base trait to run benchmark with the Dataset and DataFrame API.
|
||||
|
@ -66,4 +68,25 @@ trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper {
|
|||
ds.write.format("noop").mode(Overwrite).save()
|
||||
}
|
||||
}
|
||||
|
||||
protected def prepareDataInfo(benchmark: Benchmark): Unit = {
|
||||
// scalastyle:off println
|
||||
benchmark.out.println("Preparing data for benchmarking ...")
|
||||
// scalastyle:on println
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares a table with wide row for benchmarking. The table will be written into
|
||||
* the given path.
|
||||
*/
|
||||
protected def writeWideRow(path: String, rowsNum: Int, numCols: Int): StructType = {
|
||||
val fields = Seq.tabulate(numCols)(i => StructField(s"col$i", IntegerType))
|
||||
val schema = StructType(fields)
|
||||
|
||||
spark.range(rowsNum)
|
||||
.select(Seq.tabulate(numCols)(i => lit(i).as(s"col$i")): _*)
|
||||
.write.json(path)
|
||||
|
||||
schema
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,16 +39,9 @@ import org.apache.spark.sql.types._
|
|||
* Results will be written to "benchmarks/JSONBenchmark-results.txt".
|
||||
* }}}
|
||||
*/
|
||||
|
||||
object JsonBenchmark extends SqlBasedBenchmark {
|
||||
import spark.implicits._
|
||||
|
||||
private def prepareDataInfo(benchmark: Benchmark): Unit = {
|
||||
// scalastyle:off println
|
||||
benchmark.out.println("Preparing data for benchmarking ...")
|
||||
// scalastyle:on println
|
||||
}
|
||||
|
||||
def schemaInferring(rowsNum: Int, numIters: Int): Unit = {
|
||||
val benchmark = new Benchmark("JSON schema inferring", rowsNum, output = output)
|
||||
|
||||
|
@ -128,18 +121,6 @@ object JsonBenchmark extends SqlBasedBenchmark {
|
|||
.add("z", StringType)
|
||||
}
|
||||
|
||||
def writeWideRow(path: String, rowsNum: Int): StructType = {
|
||||
val colsNum = 1000
|
||||
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
|
||||
val schema = StructType(fields)
|
||||
|
||||
spark.range(rowsNum)
|
||||
.select(Seq.tabulate(colsNum)(i => lit(i).as(s"col$i")): _*)
|
||||
.write.json(path)
|
||||
|
||||
schema
|
||||
}
|
||||
|
||||
def countWideColumn(rowsNum: Int, numIters: Int): Unit = {
|
||||
val benchmark = new Benchmark("count a wide column", rowsNum, output = output)
|
||||
|
||||
|
@ -171,7 +152,7 @@ object JsonBenchmark extends SqlBasedBenchmark {
|
|||
|
||||
withTempPath { path =>
|
||||
prepareDataInfo(benchmark)
|
||||
val schema = writeWideRow(path.getAbsolutePath, rowsNum)
|
||||
val schema = writeWideRow(path.getAbsolutePath, rowsNum, 1000)
|
||||
|
||||
benchmark.addCase("No encoding", numIters) { _ =>
|
||||
spark.read
|
||||
|
|
Loading…
Reference in a new issue