[SPARK-10772] [STREAMING] [SCALA] NullPointerException when transform function in DStream returns NULL
Currently, the ```TransformedDStream``` will using ```Some(transformFunc(parentRDDs, validTime))``` as compute return value, when the ```transformFunc``` somehow returns null as return value, the followed operator will have NullPointerExeception. This fix uses the ```Option()``` instead of ```Some()``` to deal with the possible null value. When ```transformFunc``` returns ```null```, the option will transform null to ```None```, the downstream can handle ```None``` correctly. NOTE (2015-09-25): The latest fix will check the return value of transform function, if it is ```NULL```, a spark exception will be thrown out Author: Jacker Hu <gt.hu.chang@gmail.com> Author: jhu-chang <gt.hu.chang@gmail.com> Closes #8881 from jhu-chang/Fix_Transform.
This commit is contained in:
parent
864de3bf40
commit
a16396df76
|
@ -17,9 +17,11 @@
|
|||
|
||||
package org.apache.spark.streaming.dstream
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.apache.spark.SparkException
|
||||
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
|
||||
import org.apache.spark.streaming.{Duration, Time}
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
private[streaming]
|
||||
class TransformedDStream[U: ClassTag] (
|
||||
|
@ -38,6 +40,12 @@ class TransformedDStream[U: ClassTag] (
|
|||
|
||||
override def compute(validTime: Time): Option[RDD[U]] = {
|
||||
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
|
||||
Some(transformFunc(parentRDDs, validTime))
|
||||
val transformedRDD = transformFunc(parentRDDs, validTime)
|
||||
if (transformedRDD == null) {
|
||||
throw new SparkException("Transform function must not return null. " +
|
||||
"Return SparkContext.emptyRDD() instead to represent no element " +
|
||||
"as the result of transformation.")
|
||||
}
|
||||
Some(transformedRDD)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,6 +211,19 @@ class BasicOperationsSuite extends TestSuiteBase {
|
|||
)
|
||||
}
|
||||
|
||||
test("transform with NULL") {
|
||||
val input = Seq(1 to 4)
|
||||
intercept[SparkException] {
|
||||
testOperation(
|
||||
input,
|
||||
(r: DStream[Int]) => r.transform(rdd => null.asInstanceOf[RDD[Int]]),
|
||||
Seq(Seq()),
|
||||
1,
|
||||
false
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
test("transformWith") {
|
||||
val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
|
||||
val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
|
||||
|
|
Loading…
Reference in a new issue