Split Time to Time (absolute instant of time) and Duration (duration of time).
This commit is contained in:
parent
6c502e3793
commit
156e8b47ef
|
@ -17,7 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
|||
val jars = ssc.sc.jars
|
||||
val graph = ssc.graph
|
||||
val checkpointDir = ssc.checkpointDir
|
||||
val checkpointInterval = ssc.checkpointInterval
|
||||
val checkpointInterval: Duration = ssc.checkpointInterval
|
||||
|
||||
def validate() {
|
||||
assert(master != null, "Checkpoint.master is null")
|
||||
|
|
|
@ -2,7 +2,7 @@ package spark.streaming
|
|||
|
||||
import spark.streaming.dstream._
|
||||
import StreamingContext._
|
||||
import Time._
|
||||
//import Time._
|
||||
|
||||
import spark.{RDD, Logging}
|
||||
import spark.storage.StorageLevel
|
||||
|
@ -47,7 +47,7 @@ abstract class DStream[T: ClassManifest] (
|
|||
// =======================================================================
|
||||
|
||||
/** Time interval after which the DStream generates a RDD */
|
||||
def slideTime: Time
|
||||
def slideTime: Duration
|
||||
|
||||
/** List of parent DStreams on which this DStream depends on */
|
||||
def dependencies: List[DStream[_]]
|
||||
|
@ -67,14 +67,14 @@ abstract class DStream[T: ClassManifest] (
|
|||
protected[streaming] var zeroTime: Time = null
|
||||
|
||||
// Duration for which the DStream will remember each RDD created
|
||||
protected[streaming] var rememberDuration: Time = null
|
||||
protected[streaming] var rememberDuration: Duration = null
|
||||
|
||||
// Storage level of the RDDs in the stream
|
||||
protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
|
||||
|
||||
// Checkpoint details
|
||||
protected[streaming] val mustCheckpoint = false
|
||||
protected[streaming] var checkpointInterval: Time = null
|
||||
protected[streaming] var checkpointInterval: Duration = null
|
||||
protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
|
||||
|
||||
// Reference to whole DStream graph
|
||||
|
@ -108,7 +108,7 @@ abstract class DStream[T: ClassManifest] (
|
|||
* Enable periodic checkpointing of RDDs of this DStream
|
||||
* @param interval Time interval after which generated RDD will be checkpointed
|
||||
*/
|
||||
def checkpoint(interval: Time): DStream[T] = {
|
||||
def checkpoint(interval: Duration): DStream[T] = {
|
||||
if (isInitialized) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Cannot change checkpoint interval of an DStream after streaming context has started")
|
||||
|
@ -224,7 +224,7 @@ abstract class DStream[T: ClassManifest] (
|
|||
dependencies.foreach(_.setGraph(graph))
|
||||
}
|
||||
|
||||
protected[streaming] def remember(duration: Time) {
|
||||
protected[streaming] def remember(duration: Duration) {
|
||||
if (duration != null && duration > rememberDuration) {
|
||||
rememberDuration = duration
|
||||
logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this)
|
||||
|
@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] (
|
|||
* @param windowTime width of the window; must be a multiple of this DStream's interval.
|
||||
* @return
|
||||
*/
|
||||
def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime)
|
||||
def window(windowTime: Duration): DStream[T] = window(windowTime, this.slideTime)
|
||||
|
||||
/**
|
||||
* Return a new DStream which is computed based on windowed batches of this DStream.
|
||||
|
@ -541,7 +541,7 @@ abstract class DStream[T: ClassManifest] (
|
|||
* the new DStream will generate RDDs); must be a multiple of this
|
||||
* DStream's interval
|
||||
*/
|
||||
def window(windowTime: Time, slideTime: Time): DStream[T] = {
|
||||
def window(windowTime: Duration, slideTime: Duration): DStream[T] = {
|
||||
new WindowedDStream(this, windowTime, slideTime)
|
||||
}
|
||||
|
||||
|
@ -550,22 +550,22 @@ abstract class DStream[T: ClassManifest] (
|
|||
* This is equivalent to window(batchTime, batchTime).
|
||||
* @param batchTime tumbling window duration; must be a multiple of this DStream's interval
|
||||
*/
|
||||
def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime)
|
||||
def tumble(batchTime: Duration): DStream[T] = window(batchTime, batchTime)
|
||||
|
||||
/**
|
||||
* Returns a new DStream in which each RDD has a single element generated by reducing all
|
||||
* elements in a window over this DStream. windowTime and slideTime are as defined in the
|
||||
* window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
|
||||
*/
|
||||
def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = {
|
||||
def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Duration, slideTime: Duration): DStream[T] = {
|
||||
this.window(windowTime, slideTime).reduce(reduceFunc)
|
||||
}
|
||||
|
||||
def reduceByWindow(
|
||||
reduceFunc: (T, T) => T,
|
||||
invReduceFunc: (T, T) => T,
|
||||
windowTime: Time,
|
||||
slideTime: Time
|
||||
windowTime: Duration,
|
||||
slideTime: Duration
|
||||
): DStream[T] = {
|
||||
this.map(x => (1, x))
|
||||
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1)
|
||||
|
@ -577,7 +577,7 @@ abstract class DStream[T: ClassManifest] (
|
|||
* of elements in a window over this DStream. windowTime and slideTime are as defined in the
|
||||
* window() operation. This is equivalent to window(windowTime, slideTime).count()
|
||||
*/
|
||||
def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = {
|
||||
def countByWindow(windowTime: Duration, slideTime: Duration): DStream[Int] = {
|
||||
this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime)
|
||||
}
|
||||
|
||||
|
|
|
@ -12,8 +12,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
|
|||
private val outputStreams = new ArrayBuffer[DStream[_]]()
|
||||
|
||||
private[streaming] var zeroTime: Time = null
|
||||
private[streaming] var batchDuration: Time = null
|
||||
private[streaming] var rememberDuration: Time = null
|
||||
private[streaming] var batchDuration: Duration = null
|
||||
private[streaming] var rememberDuration: Duration = null
|
||||
private[streaming] var checkpointInProgress = false
|
||||
|
||||
private[streaming] def start(time: Time) {
|
||||
|
@ -41,7 +41,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private[streaming] def setBatchDuration(duration: Time) {
|
||||
private[streaming] def setBatchDuration(duration: Duration) {
|
||||
this.synchronized {
|
||||
if (batchDuration != null) {
|
||||
throw new Exception("Batch duration already set as " + batchDuration +
|
||||
|
@ -51,7 +51,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
|
|||
batchDuration = duration
|
||||
}
|
||||
|
||||
private[streaming] def remember(duration: Time) {
|
||||
private[streaming] def remember(duration: Duration) {
|
||||
this.synchronized {
|
||||
if (rememberDuration != null) {
|
||||
throw new Exception("Batch duration already set as " + batchDuration +
|
||||
|
|
62
streaming/src/main/scala/spark/streaming/Duration.scala
Normal file
62
streaming/src/main/scala/spark/streaming/Duration.scala
Normal file
|
@ -0,0 +1,62 @@
|
|||
package spark.streaming
|
||||
|
||||
class Duration (private val millis: Long) {
|
||||
|
||||
def < (that: Duration): Boolean = (this.millis < that.millis)
|
||||
|
||||
def <= (that: Duration): Boolean = (this.millis <= that.millis)
|
||||
|
||||
def > (that: Duration): Boolean = (this.millis > that.millis)
|
||||
|
||||
def >= (that: Duration): Boolean = (this.millis >= that.millis)
|
||||
|
||||
def + (that: Duration): Duration = new Duration(millis + that.millis)
|
||||
|
||||
def - (that: Duration): Duration = new Duration(millis - that.millis)
|
||||
|
||||
def * (times: Int): Duration = new Duration(millis * times)
|
||||
|
||||
def / (that: Duration): Long = millis / that.millis
|
||||
|
||||
def isMultipleOf(that: Duration): Boolean =
|
||||
(this.millis % that.millis == 0)
|
||||
|
||||
def min(that: Duration): Duration = if (this < that) this else that
|
||||
|
||||
def max(that: Duration): Duration = if (this > that) this else that
|
||||
|
||||
def isZero: Boolean = (this.millis == 0)
|
||||
|
||||
override def toString: String = (millis.toString + " ms")
|
||||
|
||||
def toFormattedString: String = millis.toString
|
||||
|
||||
def milliseconds: Long = millis
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Helper object that creates instance of [[spark.streaming.Duration]] representing
|
||||
* a given number of milliseconds.
|
||||
*/
|
||||
object Milliseconds {
|
||||
def apply(milliseconds: Long) = new Duration(milliseconds)
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper object that creates instance of [[spark.streaming.Duration]] representing
|
||||
* a given number of seconds.
|
||||
*/
|
||||
object Seconds {
|
||||
def apply(seconds: Long) = new Duration(seconds * 1000)
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper object that creates instance of [[spark.streaming.Duration]] representing
|
||||
* a given number of minutes.
|
||||
*/
|
||||
object Minutes {
|
||||
def apply(minutes: Long) = new Duration(minutes * 60000)
|
||||
}
|
||||
|
||||
|
|
@ -1,16 +1,16 @@
|
|||
package spark.streaming
|
||||
|
||||
private[streaming]
|
||||
case class Interval(beginTime: Time, endTime: Time) {
|
||||
def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs))
|
||||
class Interval(val beginTime: Time, val endTime: Time) {
|
||||
def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs))
|
||||
|
||||
def duration(): Time = endTime - beginTime
|
||||
def duration(): Duration = endTime - beginTime
|
||||
|
||||
def + (time: Time): Interval = {
|
||||
def + (time: Duration): Interval = {
|
||||
new Interval(beginTime + time, endTime + time)
|
||||
}
|
||||
|
||||
def - (time: Time): Interval = {
|
||||
def - (time: Duration): Interval = {
|
||||
new Interval(beginTime - time, endTime - time)
|
||||
}
|
||||
|
||||
|
@ -27,24 +27,14 @@ case class Interval(beginTime: Time, endTime: Time) {
|
|||
|
||||
def >= (that: Interval) = !(this < that)
|
||||
|
||||
def next(): Interval = {
|
||||
this + (endTime - beginTime)
|
||||
}
|
||||
|
||||
def isZero = (beginTime.isZero && endTime.isZero)
|
||||
|
||||
def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString
|
||||
|
||||
override def toString = "[" + beginTime + ", " + endTime + "]"
|
||||
override def toString = "[" + beginTime + ", " + endTime + "]"
|
||||
}
|
||||
|
||||
object Interval {
|
||||
def zero() = new Interval (Time.zero, Time.zero)
|
||||
|
||||
def currentInterval(intervalDuration: Time): Interval = {
|
||||
val time = Time(System.currentTimeMillis)
|
||||
val intervalBegin = time.floor(intervalDuration)
|
||||
Interval(intervalBegin, intervalBegin + intervalDuration)
|
||||
def currentInterval(duration: Duration): Interval = {
|
||||
val time = new Time(System.currentTimeMillis)
|
||||
val intervalBegin = time.floor(duration)
|
||||
new Interval(intervalBegin, intervalBegin + duration)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -69,21 +69,21 @@ extends Serializable {
|
|||
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
|
||||
}
|
||||
|
||||
def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = {
|
||||
def groupByKeyAndWindow(windowTime: Duration, slideTime: Duration): DStream[(K, Seq[V])] = {
|
||||
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
|
||||
}
|
||||
|
||||
def groupByKeyAndWindow(
|
||||
windowTime: Time,
|
||||
slideTime: Time,
|
||||
windowTime: Duration,
|
||||
slideTime: Duration,
|
||||
numPartitions: Int
|
||||
): DStream[(K, Seq[V])] = {
|
||||
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions))
|
||||
}
|
||||
|
||||
def groupByKeyAndWindow(
|
||||
windowTime: Time,
|
||||
slideTime: Time,
|
||||
windowTime: Duration,
|
||||
slideTime: Duration,
|
||||
partitioner: Partitioner
|
||||
): DStream[(K, Seq[V])] = {
|
||||
self.window(windowTime, slideTime).groupByKey(partitioner)
|
||||
|
@ -91,23 +91,23 @@ extends Serializable {
|
|||
|
||||
def reduceByKeyAndWindow(
|
||||
reduceFunc: (V, V) => V,
|
||||
windowTime: Time
|
||||
windowTime: Duration
|
||||
): DStream[(K, V)] = {
|
||||
reduceByKeyAndWindow(reduceFunc, windowTime, self.slideTime, defaultPartitioner())
|
||||
}
|
||||
|
||||
def reduceByKeyAndWindow(
|
||||
reduceFunc: (V, V) => V,
|
||||
windowTime: Time,
|
||||
slideTime: Time
|
||||
windowTime: Duration,
|
||||
slideTime: Duration
|
||||
): DStream[(K, V)] = {
|
||||
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner())
|
||||
}
|
||||
|
||||
def reduceByKeyAndWindow(
|
||||
reduceFunc: (V, V) => V,
|
||||
windowTime: Time,
|
||||
slideTime: Time,
|
||||
windowTime: Duration,
|
||||
slideTime: Duration,
|
||||
numPartitions: Int
|
||||
): DStream[(K, V)] = {
|
||||
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
|
||||
|
@ -115,8 +115,8 @@ extends Serializable {
|
|||
|
||||
def reduceByKeyAndWindow(
|
||||
reduceFunc: (V, V) => V,
|
||||
windowTime: Time,
|
||||
slideTime: Time,
|
||||
windowTime: Duration,
|
||||
slideTime: Duration,
|
||||
partitioner: Partitioner
|
||||
): DStream[(K, V)] = {
|
||||
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
|
||||
|
@ -134,8 +134,8 @@ extends Serializable {
|
|||
def reduceByKeyAndWindow(
|
||||
reduceFunc: (V, V) => V,
|
||||
invReduceFunc: (V, V) => V,
|
||||
windowTime: Time,
|
||||
slideTime: Time
|
||||
windowTime: Duration,
|
||||
slideTime: Duration
|
||||
): DStream[(K, V)] = {
|
||||
|
||||
reduceByKeyAndWindow(
|
||||
|
@ -145,8 +145,8 @@ extends Serializable {
|
|||
def reduceByKeyAndWindow(
|
||||
reduceFunc: (V, V) => V,
|
||||
invReduceFunc: (V, V) => V,
|
||||
windowTime: Time,
|
||||
slideTime: Time,
|
||||
windowTime: Duration,
|
||||
slideTime: Duration,
|
||||
numPartitions: Int
|
||||
): DStream[(K, V)] = {
|
||||
|
||||
|
@ -157,8 +157,8 @@ extends Serializable {
|
|||
def reduceByKeyAndWindow(
|
||||
reduceFunc: (V, V) => V,
|
||||
invReduceFunc: (V, V) => V,
|
||||
windowTime: Time,
|
||||
slideTime: Time,
|
||||
windowTime: Duration,
|
||||
slideTime: Duration,
|
||||
partitioner: Partitioner
|
||||
): DStream[(K, V)] = {
|
||||
|
||||
|
@ -169,8 +169,8 @@ extends Serializable {
|
|||
}
|
||||
|
||||
def countByKeyAndWindow(
|
||||
windowTime: Time,
|
||||
slideTime: Time,
|
||||
windowTime: Duration,
|
||||
slideTime: Duration,
|
||||
numPartitions: Int = self.ssc.sc.defaultParallelism
|
||||
): DStream[(K, Long)] = {
|
||||
|
||||
|
|
|
@ -22,7 +22,8 @@ class Scheduler(ssc: StreamingContext) extends Logging {
|
|||
|
||||
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
|
||||
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
|
||||
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_))
|
||||
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
|
||||
longTime => generateRDDs(new Time(longTime)))
|
||||
|
||||
def start() {
|
||||
// If context was started from checkpoint, then restart timer such that
|
||||
|
@ -41,7 +42,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
|
|||
timer.restart(graph.zeroTime.milliseconds)
|
||||
logInfo("Scheduler's timer restarted")
|
||||
} else {
|
||||
val firstTime = Time(timer.start())
|
||||
val firstTime = new Time(timer.start())
|
||||
graph.start(firstTime - ssc.graph.batchDuration)
|
||||
logInfo("Scheduler's timer started")
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import java.util.UUID
|
|||
class StreamingContext private (
|
||||
sc_ : SparkContext,
|
||||
cp_ : Checkpoint,
|
||||
batchDur_ : Time
|
||||
batchDur_ : Duration
|
||||
) extends Logging {
|
||||
|
||||
/**
|
||||
|
@ -34,7 +34,7 @@ class StreamingContext private (
|
|||
* @param sparkContext Existing SparkContext
|
||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||
*/
|
||||
def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration)
|
||||
def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
|
||||
|
||||
/**
|
||||
* Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
|
||||
|
@ -42,7 +42,7 @@ class StreamingContext private (
|
|||
* @param frameworkName A name for your job, to display on the cluster web UI
|
||||
* @param batchDuration The time interval at which streaming data will be divided into batches
|
||||
*/
|
||||
def this(master: String, frameworkName: String, batchDuration: Time) =
|
||||
def this(master: String, frameworkName: String, batchDuration: Duration) =
|
||||
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
|
||||
|
||||
/**
|
||||
|
@ -96,7 +96,7 @@ class StreamingContext private (
|
|||
}
|
||||
}
|
||||
|
||||
protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null
|
||||
protected[streaming] var checkpointInterval: Duration = if (isCheckpointPresent) cp_.checkpointInterval else null
|
||||
protected[streaming] var receiverJobThread: Thread = null
|
||||
protected[streaming] var scheduler: Scheduler = null
|
||||
|
||||
|
@ -107,7 +107,7 @@ class StreamingContext private (
|
|||
* if the developer wishes to query old data outside the DStream computation).
|
||||
* @param duration Minimum duration that each DStream should remember its RDDs
|
||||
*/
|
||||
def remember(duration: Time) {
|
||||
def remember(duration: Duration) {
|
||||
graph.remember(duration)
|
||||
}
|
||||
|
||||
|
@ -117,7 +117,7 @@ class StreamingContext private (
|
|||
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
|
||||
* @param interval checkpoint interval
|
||||
*/
|
||||
def checkpoint(directory: String, interval: Time = null) {
|
||||
def checkpoint(directory: String, interval: Duration = null) {
|
||||
if (directory != null) {
|
||||
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
|
||||
checkpointDir = directory
|
||||
|
|
|
@ -7,7 +7,7 @@ package spark.streaming
|
|||
* @param millis Time in UTC.
|
||||
*/
|
||||
|
||||
case class Time(private val millis: Long) {
|
||||
class Time(private val millis: Long) {
|
||||
|
||||
def < (that: Time): Boolean = (this.millis < that.millis)
|
||||
|
||||
|
@ -17,63 +17,32 @@ case class Time(private val millis: Long) {
|
|||
|
||||
def >= (that: Time): Boolean = (this.millis >= that.millis)
|
||||
|
||||
def + (that: Time): Time = Time(millis + that.millis)
|
||||
|
||||
def - (that: Time): Time = Time(millis - that.millis)
|
||||
|
||||
def * (times: Int): Time = Time(millis * times)
|
||||
def + (that: Duration): Time = new Time(millis + that.milliseconds)
|
||||
|
||||
def / (that: Time): Long = millis / that.millis
|
||||
def - (that: Time): Duration = new Duration(millis - that.millis)
|
||||
|
||||
def floor(that: Time): Time = {
|
||||
val t = that.millis
|
||||
def - (that: Duration): Time = new Time(millis - that.milliseconds)
|
||||
|
||||
def floor(that: Duration): Time = {
|
||||
val t = that.milliseconds
|
||||
val m = math.floor(this.millis / t).toLong
|
||||
Time(m * t)
|
||||
new Time(m * t)
|
||||
}
|
||||
|
||||
def isMultipleOf(that: Time): Boolean =
|
||||
(this.millis % that.millis == 0)
|
||||
def isMultipleOf(that: Duration): Boolean =
|
||||
(this.millis % that.milliseconds == 0)
|
||||
|
||||
def min(that: Time): Time = if (this < that) this else that
|
||||
|
||||
def max(that: Time): Time = if (this > that) this else that
|
||||
|
||||
def isZero: Boolean = (this.millis == 0)
|
||||
|
||||
override def toString: String = (millis.toString + " ms")
|
||||
|
||||
def toFormattedString: String = millis.toString
|
||||
|
||||
def milliseconds: Long = millis
|
||||
}
|
||||
|
||||
private[streaming] object Time {
|
||||
val zero = Time(0)
|
||||
|
||||
/*private[streaming] object Time {
|
||||
implicit def toTime(long: Long) = Time(long)
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper object that creates instance of [[spark.streaming.Time]] representing
|
||||
* a given number of milliseconds.
|
||||
*/
|
||||
object Milliseconds {
|
||||
def apply(milliseconds: Long) = Time(milliseconds)
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper object that creates instance of [[spark.streaming.Time]] representing
|
||||
* a given number of seconds.
|
||||
*/
|
||||
object Seconds {
|
||||
def apply(seconds: Long) = Time(seconds * 1000)
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper object that creates instance of [[spark.streaming.Time]] representing
|
||||
* a given number of minutes.
|
||||
*/
|
||||
object Minutes {
|
||||
def apply(minutes: Long) = Time(minutes * 60000)
|
||||
}
|
||||
*/
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ package spark.streaming.dstream
|
|||
|
||||
import spark.{RDD, Partitioner}
|
||||
import spark.rdd.CoGroupedRDD
|
||||
import spark.streaming.{Time, DStream}
|
||||
import spark.streaming.{Time, DStream, Duration}
|
||||
|
||||
private[streaming]
|
||||
class CoGroupedDStream[K : ClassManifest](
|
||||
|
@ -24,7 +24,7 @@ class CoGroupedDStream[K : ClassManifest](
|
|||
|
||||
override def dependencies = parents.toList
|
||||
|
||||
override def slideTime = parents.head.slideTime
|
||||
override def slideTime: Duration = parents.head.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
|
||||
val part = partitioner
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
|
||||
private[streaming]
|
||||
|
@ -11,7 +11,7 @@ class FilteredDStream[T: ClassManifest](
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[T]] = {
|
||||
parent.getOrCompute(validTime).map(_.filter(filterFunc))
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
import spark.SparkContext._
|
||||
|
||||
|
@ -12,7 +12,7 @@ class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
|
||||
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc))
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
|
||||
private[streaming]
|
||||
|
@ -11,7 +11,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[U]] = {
|
||||
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.RDD
|
||||
import spark.streaming.{DStream, Job, Time}
|
||||
import spark.streaming.{Duration, DStream, Job, Time}
|
||||
|
||||
private[streaming]
|
||||
class ForEachDStream[T: ClassManifest] (
|
||||
|
@ -11,7 +11,7 @@ class ForEachDStream[T: ClassManifest] (
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[Unit]] = None
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
|
||||
private[streaming]
|
||||
|
@ -9,7 +9,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T])
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[Array[T]]] = {
|
||||
parent.getOrCompute(validTime).map(_.glom())
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{StreamingContext, DStream}
|
||||
import spark.streaming.{Duration, StreamingContext, DStream}
|
||||
|
||||
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
|
||||
extends DStream[T](ssc_) {
|
||||
|
||||
override def dependencies = List()
|
||||
|
||||
override def slideTime = {
|
||||
override def slideTime: Duration = {
|
||||
if (ssc == null) throw new Exception("ssc is null")
|
||||
if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
|
||||
ssc.graph.batchDuration
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
|
||||
private[streaming]
|
||||
|
@ -12,7 +12,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[U]] = {
|
||||
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
import spark.SparkContext._
|
||||
|
||||
|
@ -12,7 +12,7 @@ class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[(K, U)]] = {
|
||||
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
|
||||
private[streaming]
|
||||
|
@ -11,7 +11,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] (
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[U]] = {
|
||||
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
|
||||
|
|
|
@ -9,15 +9,15 @@ import spark.SparkContext._
|
|||
import spark.storage.StorageLevel
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import spark.streaming.{Interval, Time, DStream}
|
||||
import spark.streaming.{Duration, Interval, Time, DStream}
|
||||
|
||||
private[streaming]
|
||||
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
||||
parent: DStream[(K, V)],
|
||||
reduceFunc: (V, V) => V,
|
||||
invReduceFunc: (V, V) => V,
|
||||
_windowTime: Time,
|
||||
_slideTime: Time,
|
||||
_windowTime: Duration,
|
||||
_slideTime: Duration,
|
||||
partitioner: Partitioner
|
||||
) extends DStream[(K,V)](parent.ssc) {
|
||||
|
||||
|
@ -39,15 +39,15 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
|||
super.persist(StorageLevel.MEMORY_ONLY_SER)
|
||||
reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
|
||||
|
||||
def windowTime: Time = _windowTime
|
||||
def windowTime: Duration = _windowTime
|
||||
|
||||
override def dependencies = List(reducedStream)
|
||||
|
||||
override def slideTime: Time = _slideTime
|
||||
override def slideTime: Duration = _slideTime
|
||||
|
||||
override val mustCheckpoint = true
|
||||
|
||||
override def parentRememberDuration: Time = rememberDuration + windowTime
|
||||
override def parentRememberDuration: Duration = rememberDuration + windowTime
|
||||
|
||||
override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
|
||||
super.persist(storageLevel)
|
||||
|
@ -55,7 +55,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
|||
this
|
||||
}
|
||||
|
||||
override def checkpoint(interval: Time): DStream[(K, V)] = {
|
||||
override def checkpoint(interval: Duration): DStream[(K, V)] = {
|
||||
super.checkpoint(interval)
|
||||
//reducedStream.checkpoint(interval)
|
||||
this
|
||||
|
@ -66,7 +66,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
|
|||
val invReduceF = invReduceFunc
|
||||
|
||||
val currentTime = validTime
|
||||
val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime)
|
||||
val currentWindow = new Interval(currentTime - windowTime + parent.slideTime, currentTime)
|
||||
val previousWindow = currentWindow - slideTime
|
||||
|
||||
logDebug("Window time = " + windowTime)
|
||||
|
|
|
@ -2,7 +2,7 @@ package spark.streaming.dstream
|
|||
|
||||
import spark.{RDD, Partitioner}
|
||||
import spark.SparkContext._
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
|
||||
private[streaming]
|
||||
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
|
||||
|
@ -15,7 +15,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
|
||||
parent.getOrCompute(validTime) match {
|
||||
|
|
|
@ -4,7 +4,7 @@ import spark.RDD
|
|||
import spark.Partitioner
|
||||
import spark.SparkContext._
|
||||
import spark.storage.StorageLevel
|
||||
import spark.streaming.{Time, DStream}
|
||||
import spark.streaming.{Duration, Time, DStream}
|
||||
|
||||
private[streaming]
|
||||
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
|
||||
|
@ -18,7 +18,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override val mustCheckpoint = true
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.RDD
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
|
||||
private[streaming]
|
||||
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
|
||||
|
@ -11,7 +11,7 @@ class TransformedDStream[T: ClassManifest, U: ClassManifest] (
|
|||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = parent.slideTime
|
||||
override def slideTime: Duration = parent.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[U]] = {
|
||||
parent.getOrCompute(validTime).map(transformFunc(_, validTime))
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package spark.streaming.dstream
|
||||
|
||||
import spark.streaming.{DStream, Time}
|
||||
import spark.streaming.{Duration, DStream, Time}
|
||||
import spark.RDD
|
||||
import collection.mutable.ArrayBuffer
|
||||
import spark.rdd.UnionRDD
|
||||
|
@ -23,7 +23,7 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
|
|||
|
||||
override def dependencies = parents.toList
|
||||
|
||||
override def slideTime: Time = parents.head.slideTime
|
||||
override def slideTime: Duration = parents.head.slideTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[T]] = {
|
||||
val rdds = new ArrayBuffer[RDD[T]]()
|
||||
|
|
|
@ -3,13 +3,13 @@ package spark.streaming.dstream
|
|||
import spark.RDD
|
||||
import spark.rdd.UnionRDD
|
||||
import spark.storage.StorageLevel
|
||||
import spark.streaming.{Interval, Time, DStream}
|
||||
import spark.streaming.{Duration, Interval, Time, DStream}
|
||||
|
||||
private[streaming]
|
||||
class WindowedDStream[T: ClassManifest](
|
||||
parent: DStream[T],
|
||||
_windowTime: Time,
|
||||
_slideTime: Time)
|
||||
_windowTime: Duration,
|
||||
_slideTime: Duration)
|
||||
extends DStream[T](parent.ssc) {
|
||||
|
||||
if (!_windowTime.isMultipleOf(parent.slideTime))
|
||||
|
@ -22,16 +22,16 @@ class WindowedDStream[T: ClassManifest](
|
|||
|
||||
parent.persist(StorageLevel.MEMORY_ONLY_SER)
|
||||
|
||||
def windowTime: Time = _windowTime
|
||||
def windowTime: Duration = _windowTime
|
||||
|
||||
override def dependencies = List(parent)
|
||||
|
||||
override def slideTime: Time = _slideTime
|
||||
override def slideTime: Duration = _slideTime
|
||||
|
||||
override def parentRememberDuration: Time = rememberDuration + windowTime
|
||||
override def parentRememberDuration: Duration = rememberDuration + windowTime
|
||||
|
||||
override def compute(validTime: Time): Option[RDD[T]] = {
|
||||
val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
|
||||
val currentWindow = new Interval(validTime - windowTime + parent.slideTime, validTime)
|
||||
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue