[SPARK-25476][SPARK-25510][TEST] Refactor AggregateBenchmark and add a new trait to better support Dataset and DataFrame API

## What changes were proposed in this pull request?

This PR does 2 things:
1. Add a new trait(`SqlBasedBenchmark`) to better support Dataset and DataFrame API.
2. Refactor `AggregateBenchmark` to use main method. Generate benchmark result:
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.AggregateBenchmark"
```

## How was this patch tested?

manual tests

Closes #22484 from wangyum/SPARK-25476.

Lead-authored-by: Yuming Wang <yumwang@ebay.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Yuming Wang 2018-10-01 07:32:40 -07:00 committed by Dongjoon Hyun
parent 30f5d0f2dd
commit b96fd44f0e
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
3 changed files with 687 additions and 567 deletions

View file

@ -0,0 +1,143 @@
================================================================================================
aggregate without grouping
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
agg w/o group wholestage off 65374 / 70665 32.1 31.2 1.0X
agg w/o group wholestage on 1178 / 1209 1779.8 0.6 55.5X
================================================================================================
stat functions
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
stddev wholestage off 8667 / 8851 12.1 82.7 1.0X
stddev wholestage on 1266 / 1273 82.8 12.1 6.8X
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
kurtosis wholestage off 41218 / 41231 2.5 393.1 1.0X
kurtosis wholestage on 1347 / 1357 77.8 12.8 30.6X
================================================================================================
aggregate with linear keys
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 9309 / 9389 9.0 111.0 1.0X
codegen = T hashmap = F 4417 / 4435 19.0 52.7 2.1X
codegen = T hashmap = T 1289 / 1298 65.1 15.4 7.2X
================================================================================================
aggregate with randomized keys
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 11424 / 11426 7.3 136.2 1.0X
codegen = T hashmap = F 6441 / 6496 13.0 76.8 1.8X
codegen = T hashmap = T 2333 / 2344 36.0 27.8 4.9X
================================================================================================
aggregate with string key
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Aggregate w string key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 4751 / 4890 4.4 226.5 1.0X
codegen = T hashmap = F 3146 / 3182 6.7 150.0 1.5X
codegen = T hashmap = T 2211 / 2261 9.5 105.4 2.1X
================================================================================================
aggregate with decimal key
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Aggregate w decimal key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 3029 / 3062 6.9 144.4 1.0X
codegen = T hashmap = F 1534 / 1569 13.7 73.2 2.0X
codegen = T hashmap = T 575 / 578 36.5 27.4 5.3X
================================================================================================
aggregate with multiple key types
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Aggregate w multiple keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 7506 / 7521 2.8 357.9 1.0X
codegen = T hashmap = F 4791 / 4808 4.4 228.5 1.6X
codegen = T hashmap = T 3553 / 3585 5.9 169.4 2.1X
================================================================================================
max function bytecode size of wholestagecodegen
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 608 / 656 1.1 927.1 1.0X
codegen = T hugeMethodLimit = 10000 402 / 419 1.6 613.5 1.5X
codegen = T hugeMethodLimit = 1500 616 / 619 1.1 939.9 1.0X
================================================================================================
cube
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
cube wholestage off 3229 / 3237 1.6 615.9 1.0X
cube wholestage on 1285 / 1306 4.1 245.2 2.5X
================================================================================================
hash and BytesToBytesMap
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
UnsafeRowhash 328 / 330 64.0 15.6 1.0X
murmur3 hash 167 / 167 125.4 8.0 2.0X
fast hash 84 / 85 249.0 4.0 3.9X
arrayEqual 192 / 192 109.3 9.1 1.7X
Java HashMap (Long) 144 / 147 145.9 6.9 2.3X
Java HashMap (two ints) 147 / 153 142.3 7.0 2.2X
Java HashMap (UnsafeRow) 785 / 788 26.7 37.4 0.4X
LongToUnsafeRowMap (opt=false) 456 / 457 46.0 21.8 0.7X
LongToUnsafeRowMap (opt=true) 125 / 125 168.3 5.9 2.6X
BytesToBytesMap (off Heap) 885 / 885 23.7 42.2 0.4X
BytesToBytesMap (on Heap) 860 / 864 24.4 41.0 0.4X
Aggregate HashMap 56 / 56 373.9 2.7 5.8X

View file

@ -34,231 +34,184 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
/**
* Benchmark to measure performance for aggregate primitives.
* To run this:
* build/sbt "sql/test-only *benchmark.AggregateBenchmark"
*
* Benchmarks in this file are skipped in normal builds.
* To run this benchmark:
* {{{
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/AggregateBenchmark-results.txt".
* }}}
*/
class AggregateBenchmark extends BenchmarkWithCodegen {
object AggregateBenchmark extends SqlBasedBenchmark {
ignore("aggregate without grouping") {
override def benchmark(): Unit = {
runBenchmark("aggregate without grouping") {
val N = 500L << 22
val benchmark = new Benchmark("agg without grouping", N)
runBenchmark("agg w/o group", N) {
sparkSession.range(N).selectExpr("sum(id)").collect()
codegenBenchmark("agg w/o group", N) {
spark.range(N).selectExpr("sum(id)").collect()
}
/*
agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
agg w/o group wholestage off 30136 / 31885 69.6 14.4 1.0X
agg w/o group wholestage on 1851 / 1860 1132.9 0.9 16.3X
*/
}
ignore("stat functions") {
runBenchmark("stat functions") {
val N = 100L << 20
runBenchmark("stddev", N) {
sparkSession.range(N).groupBy().agg("id" -> "stddev").collect()
codegenBenchmark("stddev", N) {
spark.range(N).groupBy().agg("id" -> "stddev").collect()
}
runBenchmark("kurtosis", N) {
sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect()
codegenBenchmark("kurtosis", N) {
spark.range(N).groupBy().agg("id" -> "kurtosis").collect()
}
}
/*
Using ImperativeAggregate (as implemented in Spark 1.6):
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate
-------------------------------------------------------------------------------
stddev w/o codegen 2019.04 10.39 1.00 X
stddev w codegen 2097.29 10.00 0.96 X
kurtosis w/o codegen 2108.99 9.94 0.96 X
kurtosis w codegen 2090.69 10.03 0.97 X
Using DeclarativeAggregate:
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
stddev codegen=false 5630 / 5776 18.0 55.6 1.0X
stddev codegen=true 1259 / 1314 83.0 12.0 4.5X
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X
kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X
*/
}
ignore("aggregate with linear keys") {
runBenchmark("aggregate with linear keys") {
val N = 20 << 22
val benchmark = new Benchmark("Aggregate w keys", N)
val benchmark = new Benchmark("Aggregate w keys", N, output = output)
def f(): Unit = {
sparkSession.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
spark.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
}
benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
benchmark.addCase("codegen = F", numIters = 2) { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f()
}
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
f()
}
}
benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
f()
}
}
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 6619 / 6780 12.7 78.9 1.0X
codegen = T hashmap = F 3935 / 4059 21.3 46.9 1.7X
codegen = T hashmap = T 897 / 971 93.5 10.7 7.4X
*/
}
ignore("aggregate with randomized keys") {
runBenchmark("aggregate with randomized keys") {
val N = 20 << 22
val benchmark = new Benchmark("Aggregate w keys", N)
sparkSession.range(N).selectExpr("id", "floor(rand() * 10000) as k")
val benchmark = new Benchmark("Aggregate w keys", N, output = output)
spark.range(N).selectExpr("id", "floor(rand() * 10000) as k")
.createOrReplaceTempView("test")
def f(): Unit = sparkSession.sql("select k, k, sum(id) from test group by k, k").collect()
def f(): Unit = spark.sql("select k, k, sum(id) from test group by k, k").collect()
benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false)
benchmark.addCase("codegen = F", numIters = 2) { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f()
}
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
f()
}
}
benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
f()
}
}
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 7445 / 7517 11.3 88.7 1.0X
codegen = T hashmap = F 4672 / 4703 18.0 55.7 1.6X
codegen = T hashmap = T 1764 / 1958 47.6 21.0 4.2X
*/
}
ignore("aggregate with string key") {
runBenchmark("aggregate with string key") {
val N = 20 << 20
val benchmark = new Benchmark("Aggregate w string key", N)
def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023 as string) as k")
val benchmark = new Benchmark("Aggregate w string key", N, output = output)
def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 1023 as string) as k")
.groupBy("k").count().collect()
benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
benchmark.addCase("codegen = F", numIters = 2) { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f()
}
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
benchmark.addCase("codegen = T hashmap = F", numIters = 3) { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
f()
}
}
benchmark.addCase("codegen = T hashmap = T", numIters = 5) { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
f()
}
}
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w string key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 3307 / 3376 6.3 157.7 1.0X
codegen = T hashmap = F 2364 / 2471 8.9 112.7 1.4X
codegen = T hashmap = T 1740 / 1841 12.0 83.0 1.9X
*/
}
ignore("aggregate with decimal key") {
runBenchmark("aggregate with decimal key") {
val N = 20 << 20
val benchmark = new Benchmark("Aggregate w decimal key", N)
def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k")
val benchmark = new Benchmark("Aggregate w decimal key", N, output = output)
def f(): Unit = spark.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k")
.groupBy("k").count().collect()
benchmark.addCase(s"codegen = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
benchmark.addCase("codegen = F") { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f()
}
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
benchmark.addCase("codegen = T hashmap = F") { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
f()
}
}
benchmark.addCase("codegen = T hashmap = T") { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
f()
}
}
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w decimal key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 2756 / 2817 7.6 131.4 1.0X
codegen = T hashmap = F 1580 / 1647 13.3 75.4 1.7X
codegen = T hashmap = T 641 / 662 32.7 30.6 4.3X
*/
}
ignore("aggregate with multiple key types") {
runBenchmark("aggregate with multiple key types") {
val N = 20 << 20
val benchmark = new Benchmark("Aggregate w multiple keys", N)
def f(): Unit = sparkSession.range(N)
val benchmark = new Benchmark("Aggregate w multiple keys", N, output = output)
def f(): Unit = spark.range(N)
.selectExpr(
"id",
"(id & 1023) as k1",
@ -271,43 +224,39 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
.sum()
.collect()
benchmark.addCase(s"codegen = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
benchmark.addCase("codegen = F") { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f()
}
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
benchmark.addCase("codegen = T hashmap = F") { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "false",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "false") {
f()
}
}
benchmark.addCase("codegen = T hashmap = T") { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> "true",
"spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") {
f()
}
}
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w decimal key: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 5885 / 6091 3.6 280.6 1.0X
codegen = T hashmap = F 3625 / 4009 5.8 172.8 1.6X
codegen = T hashmap = T 3204 / 3271 6.5 152.8 1.8X
*/
}
ignore("max function bytecode size of wholestagecodegen") {
runBenchmark("max function bytecode size of wholestagecodegen") {
val N = 20 << 15
val benchmark = new Benchmark("max function bytecode size", N)
def f(): Unit = sparkSession.range(N)
val benchmark = new Benchmark("max function bytecode size", N, output = output)
def f(): Unit = spark.range(N)
.selectExpr(
"id",
"(id & 1023) as k1",
@ -335,61 +284,47 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
.sum()
.collect()
benchmark.addCase("codegen = F") { iter =>
sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
benchmark.addCase("codegen = F") { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f()
}
benchmark.addCase("codegen = T hugeMethodLimit = 10000") { iter =>
sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000")
f()
}
benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter =>
sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500")
benchmark.addCase("codegen = T hugeMethodLimit = 10000") { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "10000") {
f()
}
}
benchmark.addCase("codegen = T hugeMethodLimit = 1500") { _ =>
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "1500") {
f()
}
}
benchmark.run()
/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2
Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 709 / 803 0.9 1082.1 1.0X
codegen = T hugeMethodLimit = 10000 3485 / 3548 0.2 5317.7 0.2X
codegen = T hugeMethodLimit = 1500 636 / 701 1.0 969.9 1.1X
*/
}
ignore("cube") {
runBenchmark("cube") {
val N = 5 << 20
runBenchmark("cube", N) {
sparkSession.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2")
codegenBenchmark("cube", N) {
spark.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2")
.cube("k1", "k2").sum("id").collect()
}
/**
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
cube codegen=false 3188 / 3392 1.6 608.2 1.0X
cube codegen=true 1239 / 1394 4.2 236.3 2.6X
*/
}
ignore("hash and BytesToBytesMap") {
runBenchmark("hash and BytesToBytesMap") {
val N = 20 << 20
val benchmark = new Benchmark("BytesToBytesMap", N)
val benchmark = new Benchmark("BytesToBytesMap", N, output = output)
benchmark.addCase("UnsafeRowhash") { iter =>
benchmark.addCase("UnsafeRowhash") { _ =>
var i = 0
val keyBytes = new Array[Byte](16)
val key = new UnsafeRow(1)
@ -404,7 +339,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
}
benchmark.addCase("murmur3 hash") { iter =>
benchmark.addCase("murmur3 hash") { _ =>
var i = 0
val keyBytes = new Array[Byte](16)
val key = new UnsafeRow(1)
@ -419,7 +354,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
}
benchmark.addCase("fast hash") { iter =>
benchmark.addCase("fast hash") { _ =>
var i = 0
val keyBytes = new Array[Byte](16)
val key = new UnsafeRow(1)
@ -437,7 +372,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
}
benchmark.addCase("arrayEqual") { iter =>
benchmark.addCase("arrayEqual") { _ =>
var i = 0
val keyBytes = new Array[Byte](16)
val valueBytes = new Array[Byte](16)
@ -456,7 +391,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
}
benchmark.addCase("Java HashMap (Long)") { iter =>
benchmark.addCase("Java HashMap (Long)") { _ =>
var i = 0
val keyBytes = new Array[Byte](16)
val valueBytes = new Array[Byte](16)
@ -479,7 +414,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
}
benchmark.addCase("Java HashMap (two ints) ") { iter =>
benchmark.addCase("Java HashMap (two ints) ") { _ =>
var i = 0
val valueBytes = new Array[Byte](16)
val value = new UnsafeRow(1)
@ -503,7 +438,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
}
benchmark.addCase("Java HashMap (UnsafeRow)") { iter =>
benchmark.addCase("Java HashMap (UnsafeRow)") { _ =>
var i = 0
val keyBytes = new Array[Byte](16)
val valueBytes = new Array[Byte](16)
@ -531,7 +466,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
Seq(false, true).foreach { optimized =>
benchmark.addCase(s"LongToUnsafeRowMap (opt=$optimized)") { iter =>
benchmark.addCase(s"LongToUnsafeRowMap (opt=$optimized)") { _ =>
var i = 0
val valueBytes = new Array[Byte](16)
val value = new UnsafeRow(1)
@ -567,7 +502,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
Seq("off", "on").foreach { heap =>
benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { iter =>
benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ =>
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == "off"}")
@ -576,7 +511,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
Long.MaxValue,
1),
0)
val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L<<20)
val map = new BytesToBytesMap(taskMemoryManager, 1024, 64L << 20)
val keyBytes = new Array[Byte](16)
val valueBytes = new Array[Byte](16)
val key = new UnsafeRow(1)
@ -609,7 +544,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
}
}
benchmark.addCase("Aggregate HashMap") { iter =>
benchmark.addCase("Aggregate HashMap") { _ =>
var i = 0
val numKeys = 65536
val schema = new StructType()
@ -630,25 +565,7 @@ class AggregateBenchmark extends BenchmarkWithCodegen {
i += 1
}
}
/*
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
UnsafeRow hash 267 / 284 78.4 12.8 1.0X
murmur3 hash 102 / 129 205.5 4.9 2.6X
fast hash 79 / 96 263.8 3.8 3.4X
arrayEqual 164 / 172 128.2 7.8 1.6X
Java HashMap (Long) 321 / 399 65.4 15.3 0.8X
Java HashMap (two ints) 328 / 363 63.9 15.7 0.8X
Java HashMap (UnsafeRow) 1140 / 1200 18.4 54.3 0.2X
LongToUnsafeRowMap (opt=false) 378 / 400 55.5 18.0 0.7X
LongToUnsafeRowMap (opt=true) 144 / 152 145.2 6.9 1.9X
BytesToBytesMap (off Heap) 1300 / 1616 16.1 62.0 0.2X
BytesToBytesMap (on Heap) 1165 / 1202 18.0 55.5 0.2X
Aggregate HashMap 121 / 131 173.3 5.8 2.2X
*/
benchmark.run()
}
}
}

View file

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.benchmark
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.internal.SQLConf
/**
* Common base trait to run benchmark with the Dataset and DataFrame API.
*/
trait SqlBasedBenchmark extends BenchmarkBase with SQLHelper {
protected val spark: SparkSession = getSparkSession
/** Subclass can override this function to build their own SparkSession */
def getSparkSession: SparkSession = {
SparkSession.builder()
.master("local[1]")
.appName(this.getClass.getCanonicalName)
.config(SQLConf.SHUFFLE_PARTITIONS.key, 1)
.config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1)
.getOrCreate()
}
/** Runs function `f` with whole stage codegen on and off. */
final def codegenBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
val benchmark = new Benchmark(name, cardinality, output = output)
benchmark.addCase(s"$name wholestage off", numIters = 2) { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
f
}
}
benchmark.addCase(s"$name wholestage on", numIters = 5) { _ =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
f
}
}
benchmark.run()
}
}