spark-instrumented-optimizer/python/pyspark/sql
Bryan Cutler ecaa495b1f [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance
## What changes were proposed in this pull request?

When executing `toPandas` with Arrow enabled, partitions that arrive in the JVM out-of-order must be buffered before they can be send to Python. This causes an excess of memory to be used in the driver JVM and increases the time it takes to complete because data must sit in the JVM waiting for preceding partitions to come in.

This change sends un-ordered partitions to Python as soon as they arrive in the JVM, followed by a list of partition indices so that Python can assemble the data in the correct order. This way, data is not buffered at the JVM and there is no waiting on particular partitions so performance will be increased.

Followup to #21546

## How was this patch tested?

Added new test with a large number of batches per partition, and test that forces a small delay in the first partition. These test that partitions are collected out-of-order and then are are put in the correct order in Python.

## Performance Tests - toPandas

Tests run on a 4 node standalone cluster with 32 cores total, 14.04.1-Ubuntu and OpenJDK 8
measured wall clock time to execute `toPandas()` and took the average best time of 5 runs/5 loops each.

Test code
```python
df = spark.range(1 << 25, numPartitions=32).toDF("id").withColumn("x1", rand()).withColumn("x2", rand()).withColumn("x3", rand()).withColumn("x4", rand())
for i in range(5):
	start = time.time()
	_ = df.toPandas()
	elapsed = time.time() - start
```

Spark config
```
spark.driver.memory 5g
spark.executor.memory 5g
spark.driver.maxResultSize 2g
spark.sql.execution.arrow.enabled true
```

Current Master w/ Arrow stream | This PR
---------------------|------------
5.16207 | 4.342533
5.133671 | 4.399408
5.147513 | 4.468471
5.105243 | 4.36524
5.018685 | 4.373791

Avg Master | Avg This PR
------------------|--------------
5.1134364 | 4.3898886

Speedup of **1.164821449**

Closes #22275 from BryanCutler/arrow-toPandas-oo-batches-SPARK-25274.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2018-12-06 10:07:28 -08:00
..
tests [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance 2018-12-06 10:07:28 -08:00
__init__.py [SPARK-22369][PYTHON][DOCS] Exposes catalog API documentation in PySpark 2017-11-02 15:22:52 +01:00
catalog.py [SPARK-24665][PYSPARK][FOLLOWUP] Use SQLConf in PySpark to manage all sql configs 2018-08-17 10:18:08 +08:00
column.py [SPARK-23847][PYTHON][SQL] Add asc_nulls_first, asc_nulls_last to PySpark 2018-04-08 12:09:06 +08:00
conf.py [SPARK-23698][PYTHON] Resolve undefined names in Python 3 2018-08-22 10:06:59 -07:00
context.py [SPARK-25540][SQL][PYSPARK] Make HiveContext in PySpark behave as the same as Scala. 2018-09-27 09:51:20 +08:00
dataframe.py [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send un-ordered record batches to improve performance 2018-12-06 10:07:28 -08:00
functions.py [SPARK-25829][SQL] remove duplicated map keys with last wins policy 2018-11-28 23:42:13 +08:00
group.py [SPARK-24722][SQL] pivot() with Column type argument 2018-08-04 14:17:32 +08:00
readwriter.py [SPARK-26108][SQL] Support custom lineSep in CSV datasource 2018-11-24 00:50:20 +09:00
session.py [SPARK-25255][PYTHON] Add getActiveSession to SparkSession in PySpark 2018-10-26 09:40:13 -07:00
streaming.py [SPARK-26108][SQL] Support custom lineSep in CSV datasource 2018-11-24 00:50:20 +09:00
types.py [SPARK-25238][PYTHON] lint-python: Fix W605 warnings for pycodestyle 2.4 2018-09-13 11:19:43 +08:00
udf.py [SPARK-25601][PYTHON] Register Grouped aggregate UDF Vectorized UDFs for SQL Statement 2018-10-04 09:36:23 +08:00
utils.py [SPARK-24721][SQL] Exclude Python UDFs filters in FileSourceStrategy 2018-08-28 10:57:13 +08:00
window.py [SPARK-25842][SQL] Deprecate rangeBetween APIs introduced in SPARK-21608 2018-10-26 13:17:24 +08:00