From 1f31bdaef670dd43999613deae3620f4ddcd1fbf Mon Sep 17 00:00:00 2001 From: Jason White Date: Mon, 3 Oct 2016 14:12:03 -0700 Subject: [PATCH] [SPARK-17679] [PYSPARK] remove unnecessary Py4J ListConverter patch ## What changes were proposed in this pull request? This PR removes a patch on ListConverter from https://github.com/apache/spark/pull/5570, as it is no longer necessary. The underlying issue in Py4J https://github.com/bartdag/py4j/issues/160 was patched in https://github.com/bartdag/py4j/commit/224b94b6665e56a93a064073886e1d803a4969d2 and is present in 0.10.3, the version currently in use in Spark. ## How was this patch tested? The original test added in https://github.com/apache/spark/pull/5570 remains. Author: Jason White Closes #15254 from JasonMWhite/remove_listconverter_patch. --- python/pyspark/java_gateway.py | 9 --------- python/pyspark/ml/common.py | 4 ++-- python/pyspark/mllib/common.py | 4 ++-- python/pyspark/rdd.py | 13 ++----------- 4 files changed, 6 insertions(+), 24 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 527ca82d31..f76cadcf62 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -29,18 +29,9 @@ if sys.version >= '3': xrange = range from py4j.java_gateway import java_import, JavaGateway, GatewayClient -from py4j.java_collections import ListConverter - from pyspark.serializers import read_int -# patching ListConverter, or it will convert bytearray into Java ArrayList -def can_convert_list(self, obj): - return isinstance(obj, (list, tuple, xrange)) - -ListConverter.can_convert = can_convert_list - - def launch_gateway(): if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) diff --git a/python/pyspark/ml/common.py b/python/pyspark/ml/common.py index aec860fca7..387c5d7309 100644 --- a/python/pyspark/ml/common.py +++ b/python/pyspark/ml/common.py @@ -23,7 +23,7 @@ if sys.version >= '3': import py4j.protocol from py4j.protocol import Py4JJavaError from py4j.java_gateway import JavaObject -from py4j.java_collections import ListConverter, JavaArray, JavaList +from py4j.java_collections import JavaArray, JavaList from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer @@ -76,7 +76,7 @@ def _py2java(sc, obj): elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): - obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client) + obj = [_py2java(sc, x) for x in obj] elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py index 21f0e09ea7..bac8f35056 100644 --- a/python/pyspark/mllib/common.py +++ b/python/pyspark/mllib/common.py @@ -23,7 +23,7 @@ if sys.version >= '3': import py4j.protocol from py4j.protocol import Py4JJavaError from py4j.java_gateway import JavaObject -from py4j.java_collections import ListConverter, JavaArray, JavaList +from py4j.java_collections import JavaArray, JavaList from pyspark import RDD, SparkContext from pyspark.serializers import PickleSerializer, AutoBatchedSerializer @@ -78,7 +78,7 @@ def _py2java(sc, obj): elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): - obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client) + obj = [_py2java(sc, x) for x in obj] elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 5fb10f86f4..ed81eb16df 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -52,8 +52,6 @@ from pyspark.shuffle import Aggregator, ExternalMerger, \ get_used_memory, ExternalSorter, ExternalGroupBy from pyspark.traceback_utils import SCCallSiteSync -from py4j.java_collections import ListConverter, MapConverter - __all__ = ["RDD"] @@ -2317,16 +2315,9 @@ def _prepare_for_python_RDD(sc, command): # The broadcast will have same life cycle as created PythonRDD broadcast = sc.broadcast(pickled_command) pickled_command = ser.dumps(broadcast) - # There is a bug in py4j.java_gateway.JavaClass with auto_convert - # https://github.com/bartdag/py4j/issues/161 - # TODO: use auto_convert once py4j fix the bug - broadcast_vars = ListConverter().convert( - [x._jbroadcast for x in sc._pickled_broadcast_vars], - sc._gateway._gateway_client) + broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] sc._pickled_broadcast_vars.clear() - env = MapConverter().convert(sc.environment, sc._gateway._gateway_client) - includes = ListConverter().convert(sc._python_includes, sc._gateway._gateway_client) - return pickled_command, broadcast_vars, env, includes + return pickled_command, broadcast_vars, sc.environment, sc._python_includes def _wrap_function(sc, func, deserializer, serializer, profiler=None):