spark-instrumented-optimizer/python/pyspark/streaming
hyukjinkwon b070ded284 [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch in PythonTransformFunction
## What changes were proposed in this pull request?

This PR proposes to wrap the transformed rdd within `TransformFunction`. `PythonTransformFunction` looks requiring to return `JavaRDD` in `_jrdd`.

39e2bad6a8/python/pyspark/streaming/util.py (L67)

6ee28423ad/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala (L43)

However, this could be `JavaPairRDD` by some APIs, for example, `zip` in PySpark's RDD API.
`_jrdd` could be checked as below:

```python
>>> rdd.zip(rdd)._jrdd.getClass().toString()
u'class org.apache.spark.api.java.JavaPairRDD'
```

So, here, I wrapped it with `map` so that it ensures returning `JavaRDD`.

```python
>>> rdd.zip(rdd).map(lambda x: x)._jrdd.getClass().toString()
u'class org.apache.spark.api.java.JavaRDD'
```

I tried to elaborate some failure cases as below:

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]) \
    .transform(lambda rdd: rdd.cartesian(rdd)) \
    .pprint()
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.cartesian(rdd))
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd))
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd).union(rdd.zip(rdd)))
ssc.start()
```

```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd).coalesce(1))
ssc.start()
```

## How was this patch tested?

Unit tests were added in `python/pyspark/streaming/tests.py` and manually tested.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19498 from HyukjinKwon/SPARK-17756.
2018-06-09 01:27:51 +07:00
..
__init__.py [SPARK-6328][PYTHON] Python API for StreamingListener 2015-11-16 11:29:27 -08:00
context.py [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch in PythonTransformFunction 2018-06-09 01:27:51 +07:00
dstream.py [MINOR] Fix Typos 'an -> a' 2016-06-06 09:35:47 +01:00
flume.py [SPARK-22313][PYTHON][FOLLOWUP] Explicitly import warnings namespace in flume.py 2017-12-29 14:46:03 +09:00
kafka.py [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener 2018-04-19 10:00:57 +08:00
kinesis.py [SPARK-19405][STREAMING] Support for cross-account Kinesis reads via STS 2017-02-22 11:32:36 -05:00
listener.py [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener 2018-04-19 10:00:57 +08:00
tests.py [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch in PythonTransformFunction 2018-06-09 01:27:51 +07:00
util.py [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch in PythonTransformFunction 2018-06-09 01:27:51 +07:00