[Spark-1382] Fix NPE in DStream.slice (updated version of #365)
@zsxwing I cherry-picked your changes and merged the master. #365 had some conflicts once again! Author: zsxwing <zsxwing@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #562 from tdas/SPARK-1382 and squashes the following commits: e2962c1 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-1382 20968d9 [zsxwing] Replace Exception with SparkException in DStream e476651 [zsxwing] Merge remote-tracking branch 'origin/master' into SPARK-1382 35ba56a [zsxwing] SPARK-1382: Fix NPE in DStream.slice
This commit is contained in:
parent
87cf35c2d6
commit
058797c172
|
@ -18,20 +18,19 @@
|
|||
package org.apache.spark.streaming.dstream
|
||||
|
||||
|
||||
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
|
||||
|
||||
import scala.deprecated
|
||||
import scala.collection.mutable.HashMap
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.{Logging, SparkException}
|
||||
import org.apache.spark.rdd.{BlockRDD, RDD}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.MetadataCleaner
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.StreamingContext._
|
||||
import org.apache.spark.streaming.scheduler.Job
|
||||
import org.apache.spark.streaming.Duration
|
||||
import org.apache.spark.util.MetadataCleaner
|
||||
|
||||
/**
|
||||
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
|
||||
|
@ -144,7 +143,7 @@ abstract class DStream[T: ClassTag] (
|
|||
*/
|
||||
private[streaming] def initialize(time: Time) {
|
||||
if (zeroTime != null && zeroTime != time) {
|
||||
throw new Exception("ZeroTime is already initialized to " + zeroTime
|
||||
throw new SparkException("ZeroTime is already initialized to " + zeroTime
|
||||
+ ", cannot initialize it again to " + time)
|
||||
}
|
||||
zeroTime = time
|
||||
|
@ -220,7 +219,7 @@ abstract class DStream[T: ClassTag] (
|
|||
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
|
||||
"than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
|
||||
"delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
|
||||
"set the Java property 'spark.cleaner.delay' to more than " +
|
||||
"set the Java cleaner delay to more than " +
|
||||
math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
|
||||
)
|
||||
|
||||
|
@ -235,7 +234,7 @@ abstract class DStream[T: ClassTag] (
|
|||
|
||||
private[streaming] def setContext(s: StreamingContext) {
|
||||
if (ssc != null && ssc != s) {
|
||||
throw new Exception("Context is already set in " + this + ", cannot set it again")
|
||||
throw new SparkException("Context is already set in " + this + ", cannot set it again")
|
||||
}
|
||||
ssc = s
|
||||
logInfo("Set context for " + this)
|
||||
|
@ -244,7 +243,7 @@ abstract class DStream[T: ClassTag] (
|
|||
|
||||
private[streaming] def setGraph(g: DStreamGraph) {
|
||||
if (graph != null && graph != g) {
|
||||
throw new Exception("Graph is already set in " + this + ", cannot set it again")
|
||||
throw new SparkException("Graph is already set in " + this + ", cannot set it again")
|
||||
}
|
||||
graph = g
|
||||
dependencies.foreach(_.setGraph(graph))
|
||||
|
@ -261,7 +260,7 @@ abstract class DStream[T: ClassTag] (
|
|||
/** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
|
||||
private[streaming] def isTimeValid(time: Time): Boolean = {
|
||||
if (!isInitialized) {
|
||||
throw new Exception (this + " has not been initialized")
|
||||
throw new SparkException (this + " has not been initialized")
|
||||
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
|
||||
logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
|
||||
" and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
|
||||
|
@ -728,6 +727,9 @@ abstract class DStream[T: ClassTag] (
|
|||
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
|
||||
*/
|
||||
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
|
||||
if (!isInitialized) {
|
||||
throw new SparkException(this + " has not been initialized")
|
||||
}
|
||||
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
|
||||
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
|
||||
+ slideDuration + ")")
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.spark.rdd.{BlockRDD, RDD}
|
|||
import org.apache.spark.SparkContext._
|
||||
|
||||
import util.ManualClock
|
||||
import org.apache.spark.{SparkContext, SparkConf}
|
||||
import org.apache.spark.{SparkException, SparkConf}
|
||||
import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
|
||||
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
|
||||
import scala.reflect.ClassTag
|
||||
|
@ -398,6 +398,16 @@ class BasicOperationsSuite extends TestSuiteBase {
|
|||
Thread.sleep(1000)
|
||||
}
|
||||
|
||||
test("slice - has not been initialized") {
|
||||
val ssc = new StreamingContext(conf, Seconds(1))
|
||||
val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
|
||||
val stream = new TestInputStream[Int](ssc, input, 2)
|
||||
val thrown = intercept[SparkException] {
|
||||
stream.slice(new Time(0), new Time(1000))
|
||||
}
|
||||
assert(thrown.getMessage.contains("has not been initialized"))
|
||||
}
|
||||
|
||||
val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
|
||||
|
||||
test("rdd cleanup - map and window") {
|
||||
|
|
Loading…
Reference in a new issue