spark-instrumented-optimizer/python/pyspark/sql
David Li 9b875ceada [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas
### What changes were proposed in this pull request?

Creating a Pandas dataframe via Apache Arrow currently can use twice as much memory as the final result, because during the conversion, both Pandas and Arrow retain a copy of the data. Arrow has a "self-destruct" mode now (Arrow >= 0.16) to avoid this, by freeing each column after conversion. This PR integrates support for this in toPandas, handling a couple of edge cases:

self_destruct has no effect unless the memory is allocated appropriately, which is handled in the Arrow serializer here. Essentially, the issue is that self_destruct frees memory column-wise, but Arrow record batches are oriented row-wise:

```
Record batch 0: allocation 0: column 0 chunk 0, column 1 chunk 0, ...
Record batch 1: allocation 1: column 0 chunk 1, column 1 chunk 1, ...
```

In this scenario, Arrow will drop references to all of column 0's chunks, but no memory will actually be freed, as the chunks were just slices of an underlying allocation. The PR copies each column into its own allocation so that memory is instead arranged as so:

```
Record batch 0: allocation 0 column 0 chunk 0, allocation 1 column 1 chunk 0, ...
Record batch 1: allocation 2 column 0 chunk 1, allocation 3 column 1 chunk 1, ...
```

The optimization is disabled by default, and can be enabled with the Spark SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled" set to "true". We can't always apply this optimization because it's more likely to generate a dataframe with immutable buffers, which Pandas doesn't always handle well, and because it is slower overall (since it only converts one column at a time instead of in parallel).

### Why are the changes needed?

This lets us load larger datasets - in particular, with N bytes of memory, before we could never load a dataset bigger than N/2 bytes; now the overhead is more like N/1.25 or so.

### Does this PR introduce _any_ user-facing change?

Yes - it adds a new SQL conf "spark.sql.execution.arrow.pyspark.selfDestruct.enabled"

### How was this patch tested?

See the [mailing list](http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Reducing-memory-usage-of-toPandas-with-Arrow-quot-self-destruct-quot-option-td30149.html) - it was tested with Python memory_profiler. Unit tests added to check memory within certain bounds and correctness with the option enabled.

Closes #29818 from lidavidm/spark-32953.

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2021-02-10 09:58:46 -08:00
..
avro [SPARK-34300][PYSPARK][DOCS][MINOR] Fix some typos and syntax issues in docstrings and output of dev/lint-python 2021-02-02 09:30:50 +09:00
pandas [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas 2021-02-10 09:58:46 -08:00
tests [SPARK-32953][PYTHON][SQL] Add Arrow self_destruct support to toPandas 2021-02-10 09:58:46 -08:00
__init__.py [SPARK-32138] Drop Python 2.7, 3.4 and 3.5 2020-07-14 11:22:44 +09:00
__init__.pyi [SPARK-32714][PYTHON] Initial pyspark-stubs port 2020-09-24 14:15:36 +09:00
_typing.pyi [SPARK-32714][PYTHON] Initial pyspark-stubs port 2020-09-24 14:15:36 +09:00
catalog.py [SPARK-33730][PYTHON] Standardize warning types 2021-01-18 09:32:55 +09:00
catalog.pyi [SPARK-32714][PYTHON] Initial pyspark-stubs port 2020-09-24 14:15:36 +09:00
column.py [SPARK-33730][PYTHON] Standardize warning types 2021-01-18 09:32:55 +09:00
column.pyi [SPARK-33457][PYTHON] Adjust mypy configuration 2020-11-25 09:27:04 +09:00
conf.py [SPARK-32138] Drop Python 2.7, 3.4 and 3.5 2020-07-14 11:22:44 +09:00
conf.pyi [SPARK-32714][PYTHON] Initial pyspark-stubs port 2020-09-24 14:15:36 +09:00
context.py [SPARK-34157][SQL] Unify output of SHOW TABLES and pass output attributes properly 2021-02-08 08:39:58 +00:00
context.pyi [SPARK-33457][PYTHON] Adjust mypy configuration 2020-11-25 09:27:04 +09:00
dataframe.py [PYTHON][MINOR] Fix docstring of DataFrame.join 2021-02-06 09:08:49 -06:00
dataframe.pyi [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*) 2020-11-03 10:00:49 +09:00
functions.py [SPARK-34300][PYSPARK][DOCS][MINOR] Fix some typos and syntax issues in docstrings and output of dev/lint-python 2021-02-02 09:30:50 +09:00
functions.pyi [SPARK-34306][SQL][PYTHON][R] Use Snake naming rule across the function APIs 2021-02-02 09:29:40 +09:00
group.py [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*) 2020-11-03 10:00:49 +09:00
group.pyi [SPARK-32714][PYTHON] Initial pyspark-stubs port 2020-09-24 14:15:36 +09:00
readwriter.py [SPARK-34377][SQL] Add new parquet datasource options to control datetime rebasing in read 2021-02-08 13:28:40 +00:00
readwriter.pyi [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV 2020-11-27 15:47:39 +09:00
session.py [SPARK-33989][SQL] Strip auto-generated cast when using Cast.sql 2021-01-14 15:27:14 +00:00
session.pyi [SPARK-33457][PYTHON] Adjust mypy configuration 2020-11-25 09:27:04 +09:00
streaming.py [SPARK-34377][SQL] Add new parquet datasource options to control datetime rebasing in read 2021-02-08 13:28:40 +00:00
streaming.pyi [SPARK-33836][SS][PYTHON] Expose DataStreamReader.table and DataStreamWriter.toTable 2020-12-21 19:42:59 +09:00
types.py [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*) 2020-11-03 10:00:49 +09:00
types.pyi [SPARK-33457][PYTHON] Adjust mypy configuration 2020-11-25 09:27:04 +09:00
udf.py [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*) 2020-11-03 10:00:49 +09:00
udf.pyi [SPARK-33457][PYTHON] Adjust mypy configuration 2020-11-25 09:27:04 +09:00
utils.py Spelling r common dev mlib external project streaming resource managers python 2020-11-27 10:22:45 -06:00
window.py [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*) 2020-11-03 10:00:49 +09:00
window.pyi [SPARK-33250][PYTHON][DOCS] Migration to NumPy documentation style in SQL (pyspark.sql.*) 2020-11-03 10:00:49 +09:00