Added new Clock interface that is used by RecurringTimer to scheduler events on system time or manually-configured time.

This commit is contained in:
Tathagata Das 2012-08-06 14:52:46 -07:00
parent 43b81eb271
commit cae894ee7a
6 changed files with 130 additions and 30 deletions

View file

@ -197,7 +197,6 @@ extends Logging with Serializable {
private[streaming] def toQueue = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
println("Added RDD " + rdd.id)
queue.add(rdd)
})
queue

View file

@ -1,5 +1,7 @@
package spark.streaming
import java.util.concurrent.atomic.AtomicLong
class Job(val time: Time, func: () => _) {
val id = Job.getNewId()
def run(): Long = {
@ -13,11 +15,8 @@ class Job(val time: Time, func: () => _) {
}
object Job {
var lastId = 1
val id = new AtomicLong(0)
def getNewId() = synchronized {
lastId += 1
lastId
}
def getNewId() = id.getAndIncrement()
}

View file

@ -1,6 +1,7 @@
package spark.streaming
import spark.streaming.util.RecurringTimer
import spark.streaming.util.Clock
import spark.SparkEnv
import spark.Logging
@ -20,8 +21,10 @@ extends Logging {
val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val timer = new RecurringTimer(ssc.batchDuration, generateRDDs(_))
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.batchDuration, generateRDDs(_))
def start() {
val zeroTime = Time(timer.start())

View file

@ -0,0 +1,77 @@
package spark.streaming.util
import spark.streaming._
trait Clock {
def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
class SystemClock() extends Clock {
val minPollTime = 25L
def currentTime(): Long = {
System.currentTimeMillis()
}
def waitTillTime(targetTime: Long): Long = {
var currentTime = 0L
currentTime = System.currentTimeMillis()
var waitTime = targetTime - currentTime
if (waitTime <= 0) {
return currentTime
}
val pollTime = {
if (waitTime / 10.0 > minPollTime) {
(waitTime / 10.0).toLong
} else {
minPollTime
}
}
while (true) {
currentTime = System.currentTimeMillis()
waitTime = targetTime - currentTime
if (waitTime <= 0) {
return currentTime
}
val sleepTime =
if (waitTime < pollTime) {
waitTime
} else {
pollTime
}
Thread.sleep(sleepTime)
}
return -1
}
}
class ManualClock() extends Clock {
var time = 0L
def currentTime() = time
def addToTime(timeToAdd: Long) = {
this.synchronized {
time += timeToAdd
this.notifyAll()
}
}
def waitTillTime(targetTime: Long): Long = {
this.synchronized {
while (time < targetTime) {
this.wait(100)
}
}
return currentTime()
}
}

View file

@ -1,6 +1,6 @@
package spark.streaming.util
class RecurringTimer(period: Long, callback: (Long) => Unit) {
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L
@ -19,7 +19,7 @@ class RecurringTimer(period: Long, callback: (Long) => Unit) {
var nextTime = 0L
def start(): Long = {
nextTime = (math.floor(System.currentTimeMillis() / period) + 1).toLong * period
nextTime = (math.floor(clock.currentTime / period) + 1).toLong * period
thread.start()
nextTime
}
@ -31,22 +31,32 @@ class RecurringTimer(period: Long, callback: (Long) => Unit) {
def loop() {
try {
while (true) {
val beforeSleepTime = System.currentTimeMillis()
while (beforeSleepTime >= nextTime) {
callback(nextTime)
nextTime += period
}
val sleepTime = if (nextTime - beforeSleepTime < 2 * pollTime) {
nextTime - beforeSleepTime
} else {
pollTime
}
Thread.sleep(sleepTime)
val afterSleepTime = System.currentTimeMillis()
clock.waitTillTime(nextTime)
callback(nextTime)
nextTime += period
}
} catch {
case e: InterruptedException =>
}
}
}
object RecurringTimer {
def main(args: Array[String]) {
var lastRecurTime = 0L
val period = 1000
def onRecur(time: Long) {
val currentTime = System.currentTimeMillis()
println("" + currentTime + ": " + (currentTime - lastRecurTime))
lastRecurTime = currentTime
}
val timer = new RecurringTimer(new SystemClock(), period, onRecur)
timer.start()
Thread.sleep(30 * 1000)
timer.stop()
}
}

View file

@ -1,6 +1,8 @@
package spark.streaming
import spark.{Logging, RDD}
import spark.Logging
import spark.RDD
import spark.streaming.util.ManualClock
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
@ -13,11 +15,13 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
var ssc: SparkStreamContext = null
val batchDurationMillis = 1000
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
def testOp[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]]) {
try {
try {
ssc = new SparkStreamContext("local", "test")
ssc.setBatchDuration(Milliseconds(batchDurationMillis))
@ -26,12 +30,14 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
val outputQueue = outputStream.toQueue
ssc.start()
Thread.sleep(batchDurationMillis * input.size)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
clock.addToTime(input.size * batchDurationMillis)
Thread.sleep(100)
val output = new ArrayBuffer[Seq[V]]()
while(outputQueue.size > 0) {
val rdd = outputQueue.take()
logInfo("Collecting RDD " + rdd.id + ", " + rdd.getClass.getSimpleName + ", " + rdd.splits.size)
val rdd = outputQueue.take()
output += (rdd.collect())
}
assert(output.size === expectedOutput.size)
@ -58,8 +64,14 @@ class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
object DStreamSuite {
def main(args: Array[String]) {
val r = new DStreamSuite()
val inputData = Array(1 to 4, 5 to 8, 9 to 12)
r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
try {
val r = new DStreamSuite()
val inputData = Array(1 to 4, 5 to 8, 9 to 12)
r.testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
} catch {
case e: Exception => e.printStackTrace()
}
System.exit(0)
}
}