[SPARK-16213][SQL] Reduce runtime overhead of a program that creates an primitive array in DataFrame
## What changes were proposed in this pull request?
This PR reduces runtime overhead of a program the creates an primitive array in DataFrame by using the similar approach to #15044. Generated code performs boxing operation in an assignment from InternalRow to an `Object[]` temporary array (at Lines 051 and 061 in the generated code before without this PR). If we know that type of array elements is primitive, we apply the following optimizations:
1. Eliminate a pair of `isNullAt()` and a null assignment
2. Allocate an primitive array instead of `Object[]` (eliminate boxing operations)
3. Create `UnsafeArrayData` by using `UnsafeArrayWriter` to keep a primitive array in a row format instead of doing non-lightweight operations in constructor of `GenericArrayData`
The PR also performs the same things for `CreateMap`.
Here are performance results of [DataFrame programs](6bf54ec5e2/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala (L83-L112)
) by up to 17.9x over without this PR.
```
Without SPARK-16043
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Read a primitive array in DataFrame: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 3805 / 4150 0.0 507308.9 1.0X
Double 3593 / 3852 0.0 479056.9 1.1X
With SPARK-16043
Read a primitive array in DataFrame: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 213 / 271 0.0 28387.5 1.0X
Double 204 / 223 0.0 27250.9 1.0X
```
Note : #15780 is enabled for these measurements
An motivating example
``` java
val df = sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF
df.selectExpr("Array(value + 1.1d, value + 2.2d)").show
```
Generated code without this PR
``` java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private UnsafeRow serializefromobject_result;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */ private Object[] project_values;
/* 013 */ private UnsafeRow project_result;
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter;
/* 017 */
/* 018 */ public GeneratedIterator(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */ inputadapter_input = inputs[0];
/* 026 */ serializefromobject_result = new UnsafeRow(1);
/* 027 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 028 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 029 */ this.project_values = null;
/* 030 */ project_result = new UnsafeRow(1);
/* 031 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */ this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 034 */
/* 035 */ }
/* 036 */
/* 037 */ protected void processNext() throws java.io.IOException {
/* 038 */ while (inputadapter_input.hasNext()) {
/* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */ double inputadapter_value = inputadapter_row.getDouble(0);
/* 041 */
/* 042 */ final boolean project_isNull = false;
/* 043 */ this.project_values = new Object[2];
/* 044 */ boolean project_isNull1 = false;
/* 045 */
/* 046 */ double project_value1 = -1.0;
/* 047 */ project_value1 = inputadapter_value + 1.1D;
/* 048 */ if (false) {
/* 049 */ project_values[0] = null;
/* 050 */ } else {
/* 051 */ project_values[0] = project_value1;
/* 052 */ }
/* 053 */
/* 054 */ boolean project_isNull4 = false;
/* 055 */
/* 056 */ double project_value4 = -1.0;
/* 057 */ project_value4 = inputadapter_value + 2.2D;
/* 058 */ if (false) {
/* 059 */ project_values[1] = null;
/* 060 */ } else {
/* 061 */ project_values[1] = project_value4;
/* 062 */ }
/* 063 */
/* 064 */ final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 065 */ this.project_values = null;
/* 066 */ project_holder.reset();
/* 067 */
/* 068 */ project_rowWriter.zeroOutNullBytes();
/* 069 */
/* 070 */ if (project_isNull) {
/* 071 */ project_rowWriter.setNullAt(0);
/* 072 */ } else {
/* 073 */ // Remember the current cursor so that we can calculate how many bytes are
/* 074 */ // written later.
/* 075 */ final int project_tmpCursor = project_holder.cursor;
/* 076 */
/* 077 */ if (project_value instanceof UnsafeArrayData) {
/* 078 */ final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
/* 079 */ // grow the global buffer before writing data.
/* 080 */ project_holder.grow(project_sizeInBytes);
/* 081 */ ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 082 */ project_holder.cursor += project_sizeInBytes;
/* 083 */
/* 084 */ } else {
/* 085 */ final int project_numElements = project_value.numElements();
/* 086 */ project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 087 */
/* 088 */ for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 089 */ if (project_value.isNullAt(project_index)) {
/* 090 */ project_arrayWriter.setNullDouble(project_index);
/* 091 */ } else {
/* 092 */ final double project_element = project_value.getDouble(project_index);
/* 093 */ project_arrayWriter.write(project_index, project_element);
/* 094 */ }
/* 095 */ }
/* 096 */ }
/* 097 */
/* 098 */ project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 099 */ }
/* 100 */ project_result.setTotalSize(project_holder.totalSize());
/* 101 */ append(project_result);
/* 102 */ if (shouldStop()) return;
/* 103 */ }
/* 104 */ }
/* 105 */ }
```
Generated code with this PR
``` java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private UnsafeRow serializefromobject_result;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */ private UnsafeArrayData project_arrayData;
/* 013 */ private UnsafeRow project_result;
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder project_holder;
/* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter project_rowWriter;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter project_arrayWriter;
/* 017 */
/* 018 */ public GeneratedIterator(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */ inputadapter_input = inputs[0];
/* 026 */ serializefromobject_result = new UnsafeRow(1);
/* 027 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 028 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 029 */
/* 030 */ project_result = new UnsafeRow(1);
/* 031 */ this.project_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(project_result, 32);
/* 032 */ this.project_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_holder, 1);
/* 033 */ this.project_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 034 */
/* 035 */ }
/* 036 */
/* 037 */ protected void processNext() throws java.io.IOException {
/* 038 */ while (inputadapter_input.hasNext()) {
/* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */ double inputadapter_value = inputadapter_row.getDouble(0);
/* 041 */
/* 042 */ byte[] project_array = new byte[32];
/* 043 */ project_arrayData = new UnsafeArrayData();
/* 044 */ Platform.putLong(project_array, 16, 2);
/* 045 */ project_arrayData.pointTo(project_array, 16, 32);
/* 046 */
/* 047 */ boolean project_isNull1 = false;
/* 048 */
/* 049 */ double project_value1 = -1.0;
/* 050 */ project_value1 = inputadapter_value + 1.1D;
/* 051 */ if (false) {
/* 052 */ project_arrayData.setNullAt(0);
/* 053 */ } else {
/* 054 */ project_arrayData.setDouble(0, project_value1);
/* 055 */ }
/* 056 */
/* 057 */ boolean project_isNull4 = false;
/* 058 */
/* 059 */ double project_value4 = -1.0;
/* 060 */ project_value4 = inputadapter_value + 2.2D;
/* 061 */ if (false) {
/* 062 */ project_arrayData.setNullAt(1);
/* 063 */ } else {
/* 064 */ project_arrayData.setDouble(1, project_value4);
/* 065 */ }
/* 066 */ project_holder.reset();
/* 067 */
/* 068 */ // Remember the current cursor so that we can calculate how many bytes are
/* 069 */ // written later.
/* 070 */ final int project_tmpCursor = project_holder.cursor;
/* 071 */
/* 072 */ if (project_arrayData instanceof UnsafeArrayData) {
/* 073 */ final int project_sizeInBytes = ((UnsafeArrayData) project_arrayData).getSizeInBytes();
/* 074 */ // grow the global buffer before writing data.
/* 075 */ project_holder.grow(project_sizeInBytes);
/* 076 */ ((UnsafeArrayData) project_arrayData).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 077 */ project_holder.cursor += project_sizeInBytes;
/* 078 */
/* 079 */ } else {
/* 080 */ final int project_numElements = project_arrayData.numElements();
/* 081 */ project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 082 */
/* 083 */ for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 084 */ if (project_arrayData.isNullAt(project_index)) {
/* 085 */ project_arrayWriter.setNullDouble(project_index);
/* 086 */ } else {
/* 087 */ final double project_element = project_arrayData.getDouble(project_index);
/* 088 */ project_arrayWriter.write(project_index, project_element);
/* 089 */ }
/* 090 */ }
/* 091 */ }
/* 092 */
/* 093 */ project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 094 */ project_result.setTotalSize(project_holder.totalSize());
/* 095 */ append(project_result);
/* 096 */ if (shouldStop()) return;
/* 097 */ }
/* 098 */ }
/* 099 */ }
```
## How was this patch tested?
Added unit tests into `DataFrameComplexTypeSuite`
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #13909 from kiszk/SPARK-16213.