[SPARK-30413][SQL] Avoid WrappedArray roundtrip in GenericArrayData constructor, plus related optimization in ParquetMapConverter

### What changes were proposed in this pull request?

This PR implements a tiny performance optimization for a `GenericArrayData` constructor, avoiding an unnecessary roundtrip through `WrappedArray` when the provided value is already an array of objects.

It also fixes a related performance problem in `ParquetRowConverter`.

### Why are the changes needed?

`GenericArrayData` has a `this(seqOrArray: Any)` constructor, which was originally added in #13138 for use in `RowEncoder` (where we may not know concrete types until runtime) but is also called (perhaps unintentionally) in a few other code paths.

In this constructor's existing implementation, a call to `new WrappedArray(Array[Object](""))` is dispatched to the `this(seqOrArray: Any)` constructor, where we then call `this(array.toSeq)`: this wraps the provided array into a `WrappedArray`, which is subsequently unwrapped in a `this(seq.toArray)` call. For an interactive example, see https://scastie.scala-lang.org/7jOHydbNTaGSU677FWA8nA

This PR changes the `this(seqOrArray: Any)` constructor so that it calls the primary `this(array: Array[Any])` constructor, allowing us to save a `.toSeq.toArray` call; this comes at the cost of one additional `case` in the `match` statement (but I believe this has a negligible performance impact relative to the other savings).

As code cleanup, I also reverted the JVM 1.7 workaround from #14271.

I also fixed a related performance problem in `ParquetRowConverter`: previously, this code called `ArrayBasedMapData.apply` which, in turn, called the `this(Any)` constructor for `GenericArrayData`: this PR's micro-benchmarks show that this is _significantly_ slower than calling the `this(Array[Any])` constructor (and I also observed time spent here during other Parquet scan benchmarking work). To fix this performance problem, I replaced the call to the  `ArrayBasedMapData.apply` method with direct calls to the `ArrayBasedMapData` and `GenericArrayData` constructors.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

I tested this by running code in a debugger and by running microbenchmarks (which I've added to a new `GenericArrayDataBenchmark` in this PR):

- With JDK8 benchmarks: this PR's changes more than double the performance of calls to the `this(Any)` constructor. Even after improvements, however, calls to the `this(Array[Any])` constructor are still ~60x faster than calls to `this(Any)` when passing a non-primitive array (thereby motivating this patch's other change in `ParquetRowConverter`).
- With JDK11 benchmarks: the changes more-or-less completely eliminate the performance penalty associated with the `this(Any)` constructor.

Closes #27088 from JoshRosen/joshrosen/GenericArrayData-optimization.

Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Josh Rosen 2020-01-19 19:12:19 -08:00 committed by Dongjoon Hyun
parent 775fae4640
commit d50f8df929
5 changed files with 122 additions and 12 deletions

View file

@ -0,0 +1,10 @@
OpenJDK 64-Bit Server VM 11.0.5+10 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-8210Y CPU @ 1.60GHz
constructor: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
arrayOfAny 6 7 1 1770.9 0.6 1.0X
arrayOfAnyAsObject 6 7 2 1709.3 0.6 1.0X
arrayOfAnyAsSeq 5 6 2 2195.5 0.5 1.2X
arrayOfInt 452 469 13 22.1 45.2 0.0X
arrayOfIntAsObject 678 690 11 14.7 67.8 0.0X

View file

@ -0,0 +1,10 @@
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-8210Y CPU @ 1.60GHz
constructor: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
arrayOfAny 7 8 2 1471.6 0.7 1.0X
arrayOfAnyAsObject 197 207 9 50.7 19.7 0.0X
arrayOfAnyAsSeq 25 27 2 398.0 2.5 0.3X
arrayOfInt 613 630 15 16.3 61.3 0.0X
arrayOfIntAsObject 866 872 8 11.5 86.6 0.0X

View file

@ -23,16 +23,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
private object GenericArrayData {
// SPARK-16634: Workaround for JVM bug present in some 1.7 versions.
def anyToSeq(seqOrArray: Any): Seq[Any] = seqOrArray match {
case seq: Seq[Any] => seq
case array: Array[_] => array.toSeq
}
}
class GenericArrayData(val array: Array[Any]) extends ArrayData {
def this(seq: Seq[Any]) = this(seq.toArray)
@ -47,7 +37,11 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData {
def this(primitiveArray: Array[Byte]) = this(primitiveArray.toSeq)
def this(primitiveArray: Array[Boolean]) = this(primitiveArray.toSeq)
def this(seqOrArray: Any) = this(GenericArrayData.anyToSeq(seqOrArray))
def this(seqOrArray: Any) = this(seqOrArray match {
case seq: Seq[Any] => seq.toArray
case array: Array[Any] => array // array of objects, so no need to convert
case array: Array[_] => array.toSeq.toArray[Any] // array of primitives, so box them
})
override def copy(): ArrayData = {
val newValues = new Array[Any](array.length)

View file

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.util
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
/**
* Benchmark for [[GenericArrayData]].
* To run this benchmark:
* {{{
* 1. without sbt:
* bin/spark-submit --class <this class> --jars <spark core test jar> <spark catalyst test jar>
* 2. build/sbt "catalyst/test:runMain <this class>"
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "catalyst/test:runMain <this class>"
* Results will be written to "benchmarks/GenericArrayDataBenchmark-results.txt".
* }}}
*/
object GenericArrayDataBenchmark extends BenchmarkBase {
// Benchmarks of GenericArrayData's constructors (see SPARK-30413):
def constructorBenchmark(): Unit = {
val valuesPerIteration: Long = 1000 * 1000 * 10
val arraySize = 10
val benchmark = new Benchmark("constructor", valuesPerIteration, output = output)
benchmark.addCase("arrayOfAny") { _ =>
val arr: Array[Any] = new Array[Any](arraySize)
var n = 0
while (n < valuesPerIteration) {
new GenericArrayData(arr)
n += 1
}
}
benchmark.addCase("arrayOfAnyAsObject") { _ =>
val arr: Object = new Array[Any](arraySize)
var n = 0
while (n < valuesPerIteration) {
new GenericArrayData(arr)
n += 1
}
}
benchmark.addCase("arrayOfAnyAsSeq") { _ =>
val arr: Seq[Any] = new Array[Any](arraySize)
var n = 0
while (n < valuesPerIteration) {
new GenericArrayData(arr)
n += 1
}
}
benchmark.addCase("arrayOfInt") { _ =>
val arr: Array[Int] = new Array[Int](arraySize)
var n = 0
while (n < valuesPerIteration) {
new GenericArrayData(arr)
n += 1
}
}
benchmark.addCase("arrayOfIntAsObject") { _ =>
val arr: Object = new Array[Int](arraySize)
var n = 0
while (n < valuesPerIteration) {
new GenericArrayData(arr)
n += 1
}
}
benchmark.run()
}
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
constructorBenchmark()
}
}

View file

@ -591,7 +591,10 @@ private[parquet] class ParquetRowConverter(
// The parquet map may contains null or duplicated map keys. When it happens, the behavior is
// undefined.
// TODO (SPARK-26174): disallow it with a config.
updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray))
updater.set(
new ArrayBasedMapData(
new GenericArrayData(currentKeys.toArray),
new GenericArrayData(currentValues.toArray)))
}
override def start(): Unit = {