[SPARK-18281] [SQL] [PYSPARK] Remove timeout for reading data through socket for local iterator

## What changes were proposed in this pull request?

There is a timeout failure when using `rdd.toLocalIterator()` or `df.toLocalIterator()` for a PySpark RDD and DataFrame:

    df = spark.createDataFrame([[1],[2],[3]])
    it = df.toLocalIterator()
    row = next(it)

    df2 = df.repartition(1000)  # create many empty partitions which increase materialization time so causing timeout
    it2 = df2.toLocalIterator()
    row = next(it2)

The cause of this issue is, we open a socket to serve the data from JVM side. We set timeout for connection and reading through the socket in Python side. In Python we use a generator to read the data, so we only begin to connect the socket once we start to ask data from it. If we don't consume it immediately, there is connection timeout.

In the other side, the materialization time for RDD partitions is unpredictable. So we can't set a timeout for reading data through the socket. Otherwise, it is very possibly to fail.

## How was this patch tested?

Added tests into PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16263 from viirya/fix-pyspark-localiterator.
This commit is contained in:
Liang-Chi Hsieh 2016-12-20 13:12:16 -08:00 committed by Davies Liu
parent 150d26cad4
commit 95c95b71ed
2 changed files with 17 additions and 6 deletions

View file

@ -135,12 +135,11 @@ def _load_from_socket(port, serializer):
break
if not sock:
raise Exception("could not open socket")
try:
rf = sock.makefile("rb", 65536)
for item in serializer.load_stream(rf):
yield item
finally:
sock.close()
# The RDD materialization time is unpredicable, if we set a timeout for socket reading
# operation, it will very possibly fail. See SPARK-18281.
sock.settimeout(None)
# The socket will be automatically closed when garbage-collected.
return serializer.load_stream(sock.makefile("rb", 65536))
def ignore_unicode_prefix(f):

View file

@ -502,6 +502,18 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(0, self.sc.emptyRDD().sum())
self.assertEqual(6, self.sc.parallelize([1, 2, 3]).sum())
def test_to_localiterator(self):
from time import sleep
rdd = self.sc.parallelize([1, 2, 3])
it = rdd.toLocalIterator()
sleep(5)
self.assertEqual([1, 2, 3], sorted(it))
rdd2 = rdd.repartition(1000)
it2 = rdd2.toLocalIterator()
sleep(5)
self.assertEqual([1, 2, 3], sorted(it2))
def test_save_as_textfile_with_unicode(self):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"