[SPARK-25589][SQL][TEST] Add BloomFilterBenchmark

## What changes were proposed in this pull request?

This PR aims to add `BloomFilterBenchmark`. For ORC data source, Apache Spark has been supporting for a long time. For Parquet data source, it's expected to be added with next Parquet release update.

## How was this patch tested?

Manual.

```scala
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.BloomFilterBenchmark"
```

Closes #22605 from dongjoon-hyun/SPARK-25589.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Dongjoon Hyun 2018-10-03 04:14:07 -07:00
parent 928d0739c4
commit 1a5d83bed8
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
2 changed files with 111 additions and 0 deletions

View file

@ -0,0 +1,24 @@
================================================================================================
ORC Write
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Write 100M rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Without bloom filter 16765 / 17587 6.0 167.7 1.0X
With bloom filter 20060 / 20626 5.0 200.6 0.8X
================================================================================================
ORC Read
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Read a row from 100M rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Without bloom filter 1857 / 1904 53.9 18.6 1.0X
With bloom filter 1399 / 1437 71.5 14.0 1.3X

View file

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.benchmark
import scala.util.Random
import org.apache.spark.benchmark.Benchmark
/**
* Benchmark to measure read performance with Bloom filters.
*
* Currently, only ORC supports bloom filters, we will add Parquet BM as soon as it becomes
* available.
*
* To run this benchmark:
* {{{
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/BloomFilterBenchmark-results.txt".
* }}}
*/
object BloomFilterBenchmark extends SqlBasedBenchmark {
import spark.implicits._
private val scaleFactor = 100
private val N = scaleFactor * 1000 * 1000
private val df = spark.range(N).map(_ => Random.nextInt)
private def writeBenchmark(): Unit = {
withTempPath { dir =>
val path = dir.getCanonicalPath
runBenchmark(s"ORC Write") {
val benchmark = new Benchmark(s"Write ${scaleFactor}M rows", N, output = output)
benchmark.addCase("Without bloom filter") { _ =>
df.write.mode("overwrite").orc(path + "/withoutBF")
}
benchmark.addCase("With bloom filter") { _ =>
df.write.mode("overwrite")
.option("orc.bloom.filter.columns", "value").orc(path + "/withBF")
}
benchmark.run()
}
}
}
private def readBenchmark(): Unit = {
withTempPath { dir =>
val path = dir.getCanonicalPath
df.write.orc(path + "/withoutBF")
df.write.option("orc.bloom.filter.columns", "value").orc(path + "/withBF")
runBenchmark(s"ORC Read") {
val benchmark = new Benchmark(s"Read a row from ${scaleFactor}M rows", N, output = output)
benchmark.addCase("Without bloom filter") { _ =>
spark.read.orc(path + "/withoutBF").where("value = 0").count
}
benchmark.addCase("With bloom filter") { _ =>
spark.read.orc(path + "/withBF").where("value = 0").count
}
benchmark.run()
}
}
}
override def runBenchmarkSuite(): Unit = {
writeBenchmark()
readBenchmark()
}
}