spark-instrumented-optimizer/python/pyspark
mn-mikke e6b466084c [SPARK-23736][SQL] Extending the concat function to support array columns
## What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and covers:
- Concat expression has been extended to support array columns
- A Python wrapper

## How was this patch tested?
New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite
- typeCoercion/native/concat.sql

## Codegen examples
### Primitive-type elements
```
val df = Seq(
  (Seq(1 ,2), Seq(3, 4)),
  (Seq(1, 2, 3), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 070 */               project_numElements,
/* 071 */               4);
/* 072 */             if (project_size > 2147483632) {
/* 073 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_size +
/* 074 */                 " bytes of data due to exceeding the limit 2147483632 bytes" +
/* 075 */                 " for UnsafeArrayData.");
/* 076 */             }
/* 077 */
/* 078 */             byte[] project_array = new byte[(int)project_size];
/* 079 */             UnsafeArrayData project_arrayData = new UnsafeArrayData();
/* 080 */             Platform.putLong(project_array, 16, project_numElements);
/* 081 */             project_arrayData.pointTo(project_array, 16, (int)project_size);
/* 082 */             int project_counter = 0;
/* 083 */             for (int y = 0; y < 2; y++) {
/* 084 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 085 */                 if (args[y].isNullAt(z)) {
/* 086 */                   project_arrayData.setNullAt(project_counter);
/* 087 */                 } else {
/* 088 */                   project_arrayData.setInt(
/* 089 */                     project_counter,
/* 090 */                     args[y].getInt(z)
/* 091 */                   );
/* 092 */                 }
/* 093 */                 project_counter++;
/* 094 */               }
/* 095 */             }
/* 096 */             return project_arrayData;
/* 097 */           }
/* 098 */         }.concat(project_args);
/* 099 */         boolean project_isNull = project_value == null;
```

### Non-primitive-type elements
```
val df = Seq(
  (Seq("aa" ,"bb"), Seq("ccc", "ddd")),
  (Seq("x", "y"), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             Object[] project_arrayObjects = new Object[(int)project_numElements];
/* 070 */             int project_counter = 0;
/* 071 */             for (int y = 0; y < 2; y++) {
/* 072 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 073 */                 project_arrayObjects[project_counter] = args[y].getUTF8String(z);
/* 074 */                 project_counter++;
/* 075 */               }
/* 076 */             }
/* 077 */             return new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObjects);
/* 078 */           }
/* 079 */         }.concat(project_args);
/* 080 */         boolean project_isNull = project_value == null;
```

Author: mn-mikke <mrkAha12346github>

Closes #20858 from mn-mikke/feature/array-api-concat_arrays-to-master.
2018-04-20 14:58:11 +09:00
..
ml [SPARK-21741][ML][PYSPARK] Python API for DataFrame-based multivariate summarizer 2018-04-17 10:11:08 -07:00
mllib [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
sql [SPARK-23736][SQL] Extending the concat function to support array columns 2018-04-20 14:58:11 +09:00
streaming [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener 2018-04-19 10:00:57 +08:00
__init__.py [SPARK-23328][PYTHON] Disallow default value None in na.replace/replace when 'to_replace' is not a dictionary 2018-02-09 14:21:10 +08:00
_globals.py [SPARK-23328][PYTHON] Disallow default value None in na.replace/replace when 'to_replace' is not a dictionary 2018-02-09 14:21:10 +08:00
accumulators.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
broadcast.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
cloudpickle.py [SPARK-23159][PYTHON] Update cloudpickle to v0.4.3 2018-03-08 20:19:55 +09:00
conf.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
context.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
daemon.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
files.py [SPARK-3309] [PySpark] Put all public API in __all__ 2014-09-03 11:49:45 -07:00
find_spark_home.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
heapq3.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
java_gateway.py [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas 2017-11-13 13:16:01 +09:00
join.py [SPARK-14202] [PYTHON] Use generator expression instead of list comp in python_full_outer_jo… 2016-03-28 14:51:36 -07:00
profiler.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
rdd.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
rddsampler.py [SPARK-4897] [PySpark] Python 3 support 2015-04-16 16:20:57 -07:00
resultiterable.py [SPARK-3074] [PySpark] support groupByKey() with single huge key 2015-04-09 17:07:23 -07:00
serializers.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
shell.py [SPARK-19570][PYSPARK] Allow to disable hive in pyspark shell 2017-04-12 10:54:50 -07:00
shuffle.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00
statcounter.py [SPARK-6919] [PYSPARK] Add asDict method to StatCounter 2015-09-29 13:38:15 -07:00
status.py [SPARK-4172] [PySpark] Progress API in Python 2015-02-17 13:36:43 -08:00
storagelevel.py [SPARK-13992][CORE][PYSPARK][FOLLOWUP] Update OFF_HEAP semantics for Java api and Python api 2016-04-12 23:06:55 -07:00
taskcontext.py [SPARK-18576][PYTHON] Add basic TaskContext information to PySpark 2016-12-20 15:51:21 -08:00
tests.py [SPARK-23517][PYTHON] Make pyspark.util._exception_message produce the trace from Java side by Py4JJavaError 2018-03-01 00:44:13 +09:00
traceback_utils.py [SPARK-1087] Move python traceback utilities into new traceback_utils.py file. 2014-09-15 19:28:17 -07:00
util.py [SPARK-23700][PYTHON] Cleanup imports in pyspark.sql 2018-03-26 12:42:32 +09:00
version.py [SPARK-23028] Bump master branch version to 2.4.0-SNAPSHOT 2018-01-13 00:37:59 +08:00
worker.py [SPARK-23522][PYTHON] always use sys.exit over builtin exit 2018-03-08 20:38:34 +09:00