[SPARK-17679] [PYSPARK] remove unnecessary Py4J ListConverter patch
## What changes were proposed in this pull request?
This PR removes a patch on ListConverter from https://github.com/apache/spark/pull/5570, as it is no longer necessary. The underlying issue in Py4J https://github.com/bartdag/py4j/issues/160 was patched in 224b94b666
and is present in 0.10.3, the version currently in use in Spark.
## How was this patch tested?
The original test added in https://github.com/apache/spark/pull/5570 remains.
Author: Jason White <jason.white@shopify.com>
Closes #15254 from JasonMWhite/remove_listconverter_patch.
This commit is contained in:
parent
1dd68d3827
commit
1f31bdaef6
|
@ -29,18 +29,9 @@ if sys.version >= '3':
|
|||
xrange = range
|
||||
|
||||
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
|
||||
from py4j.java_collections import ListConverter
|
||||
|
||||
from pyspark.serializers import read_int
|
||||
|
||||
|
||||
# patching ListConverter, or it will convert bytearray into Java ArrayList
|
||||
def can_convert_list(self, obj):
|
||||
return isinstance(obj, (list, tuple, xrange))
|
||||
|
||||
ListConverter.can_convert = can_convert_list
|
||||
|
||||
|
||||
def launch_gateway():
|
||||
if "PYSPARK_GATEWAY_PORT" in os.environ:
|
||||
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
|
||||
|
|
|
@ -23,7 +23,7 @@ if sys.version >= '3':
|
|||
import py4j.protocol
|
||||
from py4j.protocol import Py4JJavaError
|
||||
from py4j.java_gateway import JavaObject
|
||||
from py4j.java_collections import ListConverter, JavaArray, JavaList
|
||||
from py4j.java_collections import JavaArray, JavaList
|
||||
|
||||
from pyspark import RDD, SparkContext
|
||||
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
|
||||
|
@ -76,7 +76,7 @@ def _py2java(sc, obj):
|
|||
elif isinstance(obj, SparkContext):
|
||||
obj = obj._jsc
|
||||
elif isinstance(obj, list):
|
||||
obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
|
||||
obj = [_py2java(sc, x) for x in obj]
|
||||
elif isinstance(obj, JavaObject):
|
||||
pass
|
||||
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
|
||||
|
|
|
@ -23,7 +23,7 @@ if sys.version >= '3':
|
|||
import py4j.protocol
|
||||
from py4j.protocol import Py4JJavaError
|
||||
from py4j.java_gateway import JavaObject
|
||||
from py4j.java_collections import ListConverter, JavaArray, JavaList
|
||||
from py4j.java_collections import JavaArray, JavaList
|
||||
|
||||
from pyspark import RDD, SparkContext
|
||||
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
|
||||
|
@ -78,7 +78,7 @@ def _py2java(sc, obj):
|
|||
elif isinstance(obj, SparkContext):
|
||||
obj = obj._jsc
|
||||
elif isinstance(obj, list):
|
||||
obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
|
||||
obj = [_py2java(sc, x) for x in obj]
|
||||
elif isinstance(obj, JavaObject):
|
||||
pass
|
||||
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
|
||||
|
|
|
@ -52,8 +52,6 @@ from pyspark.shuffle import Aggregator, ExternalMerger, \
|
|||
get_used_memory, ExternalSorter, ExternalGroupBy
|
||||
from pyspark.traceback_utils import SCCallSiteSync
|
||||
|
||||
from py4j.java_collections import ListConverter, MapConverter
|
||||
|
||||
|
||||
__all__ = ["RDD"]
|
||||
|
||||
|
@ -2317,16 +2315,9 @@ def _prepare_for_python_RDD(sc, command):
|
|||
# The broadcast will have same life cycle as created PythonRDD
|
||||
broadcast = sc.broadcast(pickled_command)
|
||||
pickled_command = ser.dumps(broadcast)
|
||||
# There is a bug in py4j.java_gateway.JavaClass with auto_convert
|
||||
# https://github.com/bartdag/py4j/issues/161
|
||||
# TODO: use auto_convert once py4j fix the bug
|
||||
broadcast_vars = ListConverter().convert(
|
||||
[x._jbroadcast for x in sc._pickled_broadcast_vars],
|
||||
sc._gateway._gateway_client)
|
||||
broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
|
||||
sc._pickled_broadcast_vars.clear()
|
||||
env = MapConverter().convert(sc.environment, sc._gateway._gateway_client)
|
||||
includes = ListConverter().convert(sc._python_includes, sc._gateway._gateway_client)
|
||||
return pickled_command, broadcast_vars, env, includes
|
||||
return pickled_command, broadcast_vars, sc.environment, sc._python_includes
|
||||
|
||||
|
||||
def _wrap_function(sc, func, deserializer, serializer, profiler=None):
|
||||
|
|
Loading…
Reference in a new issue