[SPARK-25490][SQL][TEST] Fix OOM of KryoBenchmark due to large 2D array and refactor it to use main method

## What changes were proposed in this pull request?

Before the code changes, I tried to run it with 8G memory:
```
build/sbt -mem 8000  "core/testOnly org.apache.spark.serializer.KryoBenchmark"
```
Still I got got OOM.

This is because the lengths of the arrays are random
669ade3a8e/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala (L90-L91)

And the 2D array is usually large: `10000 * Random.nextInt(0, 10000)`

This PR is to fix it and refactor it to use main method.

The benchmark result is also reason compared to the original one.

## How was this patch tested?

Run with
```
bin/spark-submit --class org.apache.spark.serializer.KryoBenchmark core/target/scala-2.11/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar
```
and
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt  "core/test:runMain org.apache.spark.serializer.KryoBenchmark"

Closes #22663 from gengliangwang/kyroBenchmark.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Gengliang Wang 2018-10-24 16:56:17 -05:00 committed by Sean Owen
parent f83fedc9f2
commit b2e3256256
2 changed files with 64 additions and 42 deletions

View file

@ -0,0 +1,29 @@
================================================================================================
Benchmark Kryo Unsafe vs safe Serialization
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
basicTypes: Int with unsafe:true 138 / 149 7.2 138.0 1.0X
basicTypes: Long with unsafe:true 168 / 173 6.0 167.7 0.8X
basicTypes: Float with unsafe:true 153 / 174 6.5 153.1 0.9X
basicTypes: Double with unsafe:true 161 / 185 6.2 161.1 0.9X
Array: Int with unsafe:true 2 / 3 409.7 2.4 56.5X
Array: Long with unsafe:true 4 / 5 232.5 4.3 32.1X
Array: Float with unsafe:true 3 / 4 367.3 2.7 50.7X
Array: Double with unsafe:true 4 / 5 228.5 4.4 31.5X
Map of string->Double with unsafe:true 38 / 45 26.5 37.8 3.7X
basicTypes: Int with unsafe:false 176 / 187 5.7 175.9 0.8X
basicTypes: Long with unsafe:false 191 / 203 5.2 191.2 0.7X
basicTypes: Float with unsafe:false 166 / 176 6.0 166.2 0.8X
basicTypes: Double with unsafe:false 174 / 190 5.7 174.3 0.8X
Array: Int with unsafe:false 19 / 26 52.9 18.9 7.3X
Array: Long with unsafe:false 27 / 31 37.7 26.5 5.2X
Array: Float with unsafe:false 8 / 10 124.3 8.0 17.2X
Array: Double with unsafe:false 12 / 13 83.6 12.0 11.5X
Map of string->Double with unsafe:false 38 / 42 26.1 38.3 3.6X

View file

@ -20,58 +20,48 @@ package org.apache.spark.serializer
import scala.reflect.ClassTag
import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.serializer.KryoTest._
class KryoBenchmark extends SparkFunSuite {
val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10)
/**
* Benchmark for Kryo Unsafe vs safe Serialization.
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar>
* 2. build/sbt "core/test:runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
* Results will be written to "benchmarks/KryoBenchmark-results.txt".
* }}}
*/
object KryoBenchmark extends BenchmarkBase {
ignore(s"Benchmark Kryo Unsafe vs safe Serialization") {
Seq (true, false).foreach (runBenchmark)
benchmark.run()
// scalastyle:off
/*
Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
basicTypes: Int with unsafe:true 151 / 170 104.2 9.6 1.0X
basicTypes: Long with unsafe:true 175 / 191 89.8 11.1 0.9X
basicTypes: Float with unsafe:true 177 / 184 88.8 11.3 0.9X
basicTypes: Double with unsafe:true 193 / 216 81.4 12.3 0.8X
Array: Int with unsafe:true 513 / 587 30.7 32.6 0.3X
Array: Long with unsafe:true 1211 / 1358 13.0 77.0 0.1X
Array: Float with unsafe:true 890 / 964 17.7 56.6 0.2X
Array: Double with unsafe:true 1335 / 1428 11.8 84.9 0.1X
Map of string->Double with unsafe:true 931 / 988 16.9 59.2 0.2X
basicTypes: Int with unsafe:false 197 / 217 79.9 12.5 0.8X
basicTypes: Long with unsafe:false 219 / 240 71.8 13.9 0.7X
basicTypes: Float with unsafe:false 208 / 217 75.7 13.2 0.7X
basicTypes: Double with unsafe:false 208 / 225 75.6 13.2 0.7X
Array: Int with unsafe:false 2559 / 2681 6.1 162.7 0.1X
Array: Long with unsafe:false 3425 / 3516 4.6 217.8 0.0X
Array: Float with unsafe:false 2025 / 2134 7.8 128.7 0.1X
Array: Double with unsafe:false 2241 / 2358 7.0 142.5 0.1X
Map of string->Double with unsafe:false 1044 / 1085 15.1 66.4 0.1X
*/
// scalastyle:on
val N = 1000000
override def runBenchmarkSuite(): Unit = {
val name = "Benchmark Kryo Unsafe vs safe Serialization"
runBenchmark(name) {
val benchmark = new Benchmark(name, N, 10, output = output)
Seq(true, false).foreach(useUnsafe => run(useUnsafe, benchmark))
benchmark.run()
}
}
private def runBenchmark(useUnsafe: Boolean): Unit = {
private def run(useUnsafe: Boolean, benchmark: Benchmark): Unit = {
def check[T: ClassTag](t: T, ser: SerializerInstance): Int = {
if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0
if (ser.deserialize[T](ser.serialize(t)) == t) 1 else 0
}
// Benchmark Primitives
val basicTypeCount = 1000000
def basicTypes[T: ClassTag](name: String, gen: () => T): Unit = {
lazy val ser = createSerializer(useUnsafe)
val arrayOfBasicType: Array[T] = Array.fill(basicTypeCount)(gen())
val arrayOfBasicType: Array[T] = Array.fill(N)(gen())
benchmark.addCase(s"basicTypes: $name with unsafe:$useUnsafe") { _ =>
var sum = 0L
var i = 0
while (i < basicTypeCount) {
while (i < N) {
sum += check(arrayOfBasicType(i), ser)
i += 1
}
@ -84,11 +74,12 @@ class KryoBenchmark extends SparkFunSuite {
basicTypes("Double", () => Random.nextDouble())
// Benchmark Array of Primitives
val arrayCount = 10000
val arrayCount = 4000
val arrayLength = N / arrayCount
def basicTypeArray[T: ClassTag](name: String, gen: () => T): Unit = {
lazy val ser = createSerializer(useUnsafe)
val arrayOfArrays: Array[Array[T]] =
Array.fill(arrayCount)(Array.fill[T](Random.nextInt(arrayCount))(gen()))
Array.fill(arrayCount)(Array.fill[T](arrayLength + Random.nextInt(arrayLength / 4))(gen()))
benchmark.addCase(s"Array: $name with unsafe:$useUnsafe") { _ =>
var sum = 0L
@ -107,11 +98,13 @@ class KryoBenchmark extends SparkFunSuite {
basicTypeArray("Double", () => Random.nextDouble())
// Benchmark Maps
val mapsCount = 1000
val mapsCount = 200
val mapKeyLength = 20
val mapLength = N / mapsCount / mapKeyLength
lazy val ser = createSerializer(useUnsafe)
val arrayOfMaps: Array[Map[String, Double]] = Array.fill(mapsCount) {
Array.fill(Random.nextInt(mapsCount)) {
(Random.nextString(mapsCount / 10), Random.nextDouble())
Array.fill(mapLength + Random.nextInt(mapLength / 4)) {
(Random.nextString(mapKeyLength), Random.nextDouble())
}.toMap
}