spark-instrumented-optimizer/python/pyspark/tests
Bryan Cutler 5e79ae3b40 [SPARK-23961][SPARK-27548][PYTHON] Fix error when toLocalIterator goes out of scope and properly raise errors from worker
## What changes were proposed in this pull request?

This fixes an error when a PySpark local iterator, for both RDD and DataFrames, goes out of scope and the connection is closed before fully consuming the iterator. The error occurs on the JVM in the serving thread, when Python closes the local socket while the JVM is writing to it. This usually happens when there is enough data to fill the socket read buffer, causing the write call to block.

Additionally, this fixes a problem when an error occurs in the Python worker and the collect job is cancelled with an exception. Previously, the Python driver was never notified of the error so the user could get a partial result (iteration until the error) and the application will continue. With this change, an error in the worker is sent to the Python iterator and is then raised.

The change here introduces a protocol for PySpark local iterators that work as follows:

1) The local socket connection is made when the iterator is created
2) When iterating, Python first sends a request for partition data as a non-zero integer
3) While the JVM local iterator over partitions has next, it triggers a job to collect the next partition
4) The JVM sends a nonzero response to indicate it has the next partition to send
5) The next partition is sent to Python and read by the PySpark deserializer
6) After sending the entire partition, an `END_OF_DATA_SECTION` is sent to Python which stops the deserializer and allows to make another request
7) When the JVM gets a request from Python but has already consumed it's local iterator, it will send a zero response to Python and both will close the socket cleanly
8) If an error occurs in the worker, a negative response is sent to Python followed by the error message. Python will then raise a RuntimeError with the message, stopping iteration.
9) When the PySpark local iterator is garbage-collected, it will read any remaining data from the current partition (this is data that has already been collected) and send a request of zero to tell the JVM to stop collection jobs and close the connection.

Steps 1, 3, 5, 6 are the same as before. Step 8 was completely missing before because errors in the worker were never communicated back to Python. The other steps add synchronization to allow for a clean closing of the socket, with a small trade-off in performance for each partition. This is mainly because the JVM does not start collecting partition data until it receives a request to do so, where before it would eagerly write all data until the socket receive buffer is full.

## How was this patch tested?

Added new unit tests for DataFrame and RDD `toLocalIterator` and tested not fully consuming the iterator. Manual tests with Python 2.7  and 3.6.

Closes #24070 from BryanCutler/pyspark-toLocalIterator-clean-stop-SPARK-23961.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
2019-05-07 14:47:39 -07:00
..
__init__.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_appsubmit.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_broadcast.py [SPARK-26201] Fix python broadcast with encryption 2018-11-30 12:48:56 -06:00
test_conf.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_context.py [SPARK-26349][PYSPARK] Forbid insecure py4j gateways 2019-01-08 11:26:36 -08:00
test_daemon.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_join.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_profiler.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_rdd.py [SPARK-23961][SPARK-27548][PYTHON] Fix error when toLocalIterator goes out of scope and properly raise errors from worker 2019-05-07 14:47:39 -07:00
test_readwrite.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_serializers.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_shuffle.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_taskcontext.py [SPARK-25921][FOLLOW UP][PYSPARK] Fix barrier task run without BarrierTaskContext while python worker reuse 2019-01-11 14:28:37 +08:00
test_util.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_worker.py [SPARK-26743][PYTHON] Adds a test to check the actual resource limit set via 'spark.executor.pyspark.memory' 2019-01-28 10:02:27 +08:00