[SPARK-2601] [PySpark] Fix Py4J error when transforming pickleFiles
Similar to SPARK-1034, the problem was that Py4J didn’t cope well with the fake ClassTags used in the Java API. It doesn’t look like there’s any reason why PythonRDD needs to take a ClassTag, since it just ignores the type of the previous RDD, so I removed the type parameter and we no longer pass ClassTags from Python. Author: Josh Rosen <joshrosen@apache.org> Closes #1605 from JoshRosen/spark-2601 and squashes the following commits: b68e118 [Josh Rosen] Fix Py4J error when transforming pickleFiles [SPARK-2601]
This commit is contained in:
parent
12901643b7
commit
ba46bbed5d
|
@ -37,8 +37,8 @@ import org.apache.spark.broadcast.Broadcast
|
|||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
private[spark] class PythonRDD[T: ClassTag](
|
||||
parent: RDD[T],
|
||||
private[spark] class PythonRDD(
|
||||
parent: RDD[_],
|
||||
command: Array[Byte],
|
||||
envVars: JMap[String, String],
|
||||
pythonIncludes: JList[String],
|
||||
|
|
|
@ -1687,7 +1687,6 @@ class PipelinedRDD(RDD):
|
|||
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
|
||||
self.ctx._gateway._gateway_client)
|
||||
self.ctx._pickled_broadcast_vars.clear()
|
||||
class_tag = self._prev_jrdd.classTag()
|
||||
env = MapConverter().convert(self.ctx.environment,
|
||||
self.ctx._gateway._gateway_client)
|
||||
includes = ListConverter().convert(self.ctx._python_includes,
|
||||
|
@ -1696,8 +1695,7 @@ class PipelinedRDD(RDD):
|
|||
bytearray(pickled_command),
|
||||
env, includes, self.preservesPartitioning,
|
||||
self.ctx.pythonExec,
|
||||
broadcast_vars, self.ctx._javaAccumulator,
|
||||
class_tag)
|
||||
broadcast_vars, self.ctx._javaAccumulator)
|
||||
self._jrdd_val = python_rdd.asJavaRDD()
|
||||
return self._jrdd_val
|
||||
|
||||
|
|
|
@ -226,6 +226,15 @@ class TestRDDFunctions(PySparkTestCase):
|
|||
cart = rdd1.cartesian(rdd2)
|
||||
result = cart.map(lambda (x, y): x + y).collect()
|
||||
|
||||
def test_transforming_pickle_file(self):
|
||||
# Regression test for SPARK-2601
|
||||
data = self.sc.parallelize(["Hello", "World!"])
|
||||
tempFile = tempfile.NamedTemporaryFile(delete=True)
|
||||
tempFile.close()
|
||||
data.saveAsPickleFile(tempFile.name)
|
||||
pickled_file = self.sc.pickleFile(tempFile.name)
|
||||
pickled_file.map(lambda x: x).collect()
|
||||
|
||||
def test_cartesian_on_textfile(self):
|
||||
# Regression test for
|
||||
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
|
||||
|
|
Loading…
Reference in a new issue