Fixed bugs to ensure better cleanup of JobScheduler, JobGenerator and NetworkInputTracker upon close.

This commit is contained in:
Tathagata Das 2014-01-12 16:44:07 -08:00
parent c5921e5c61
commit 7883b8f579
9 changed files with 96 additions and 56 deletions

View file

@ -354,17 +354,17 @@ abstract class DStream[T: ClassTag] (
* this method to save custom checkpoint data. * this method to save custom checkpoint data.
*/ */
private[streaming] def updateCheckpointData(currentTime: Time) { private[streaming] def updateCheckpointData(currentTime: Time) {
logInfo("Updating checkpoint data for time " + currentTime) logDebug("Updating checkpoint data for time " + currentTime)
checkpointData.update(currentTime) checkpointData.update(currentTime)
dependencies.foreach(_.updateCheckpointData(currentTime)) dependencies.foreach(_.updateCheckpointData(currentTime))
logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
} }
private[streaming] def clearCheckpointData(time: Time) { private[streaming] def clearCheckpointData(time: Time) {
logInfo("Clearing checkpoint data") logDebug("Clearing checkpoint data")
checkpointData.cleanup(time) checkpointData.cleanup(time)
dependencies.foreach(_.clearCheckpointData(time)) dependencies.foreach(_.clearCheckpointData(time))
logInfo("Cleared checkpoint data") logDebug("Cleared checkpoint data")
} }
/** /**

View file

@ -17,9 +17,8 @@
package org.apache.spark.streaming.scheduler package org.apache.spark.streaming.scheduler
import akka.actor.{Props, Actor} import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import org.apache.spark.SparkEnv import org.apache.spark.{SparkException, SparkEnv, Logging}
import org.apache.spark.Logging
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
@ -40,13 +39,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private val ssc = jobScheduler.ssc private val ssc = jobScheduler.ssc
private val graph = ssc.graph private val graph = ssc.graph
private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent =>
logDebug("Got event of type " + event.getClass.getName)
processEvent(event)
}
}))
val clock = { val clock = {
val clockClass = ssc.sc.conf.get( val clockClass = ssc.sc.conf.get(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
@ -60,7 +52,23 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
null null
} }
// eventActor is created when generator starts.
// This not being null means the scheduler has been started and not stopped
private var eventActor: ActorRef = null
/** Start generation of jobs */
def start() = synchronized { def start() = synchronized {
if (eventActor != null) {
throw new SparkException("JobGenerator already started")
}
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent =>
logDebug("Got event of type " + event.getClass.getName)
processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) { if (ssc.isCheckpointPresent) {
restart() restart()
} else { } else {
@ -68,12 +76,16 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
} }
} }
def stop() { /** Stop generation of jobs */
def stop() = synchronized {
if (eventActor != null) {
timer.stop() timer.stop()
ssc.env.actorSystem.stop(eventActor)
if (checkpointWriter != null) checkpointWriter.stop() if (checkpointWriter != null) checkpointWriter.stop()
ssc.graph.stop() ssc.graph.stop()
logInfo("JobGenerator stopped") logInfo("JobGenerator stopped")
} }
}
/** /**
* On batch completion, clear old metadata and checkpoint computation. * On batch completion, clear old metadata and checkpoint computation.
@ -172,4 +184,3 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
} }
} }
} }

View file

@ -41,20 +41,26 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val executor = Executors.newFixedThreadPool(numConcurrentJobs) private val executor = Executors.newFixedThreadPool(numConcurrentJobs)
private val jobGenerator = new JobGenerator(this) private val jobGenerator = new JobGenerator(this)
private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()
// These two are created only when scheduler starts.
// eventActor not being null means the scheduler has been started and not stopped
var networkInputTracker: NetworkInputTracker = null
private var eventActor: ActorRef = null
def start() = synchronized {
if (eventActor != null) {
throw new SparkException("JobScheduler already started")
}
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = { def receive = {
case event: JobSchedulerEvent => processEvent(event) case event: JobSchedulerEvent => processEvent(event)
} }
})) }), "JobScheduler")
val clock = jobGenerator.clock // used by testsuites listenerBus.start()
val listenerBus = new StreamingListenerBus()
var networkInputTracker: NetworkInputTracker = null
def start() = synchronized {
if (networkInputTracker != null) {
throw new SparkException("StreamingContext already started")
}
networkInputTracker = new NetworkInputTracker(ssc) networkInputTracker = new NetworkInputTracker(ssc)
networkInputTracker.start() networkInputTracker.start()
Thread.sleep(1000) Thread.sleep(1000)
@ -63,13 +69,15 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
} }
def stop() = synchronized { def stop() = synchronized {
if (networkInputTracker != null) { if (eventActor != null) {
jobGenerator.stop() jobGenerator.stop()
networkInputTracker.stop() networkInputTracker.stop()
executor.shutdown() executor.shutdown()
if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
executor.shutdownNow() executor.shutdownNow()
} }
listenerBus.stop()
ssc.env.actorSystem.stop(eventActor)
logInfo("JobScheduler stopped") logInfo("JobScheduler stopped")
} }
} }
@ -104,7 +112,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
case e: Throwable => case e: Throwable =>
reportError("Error in job scheduler", e) reportError("Error in job scheduler", e)
} }
} }
private def handleJobStart(job: Job) { private def handleJobStart(job: Job) {

View file

@ -19,8 +19,7 @@ package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import org.apache.spark.Logging import org.apache.spark.{SparkException, Logging, SparkEnv}
import org.apache.spark.SparkEnv
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
@ -32,6 +31,7 @@ import akka.pattern.ask
import akka.dispatch._ import akka.dispatch._
import org.apache.spark.storage.BlockId import org.apache.spark.storage.BlockId
import org.apache.spark.streaming.{Time, StreamingContext} import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.util.AkkaUtils
private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
@ -39,7 +39,9 @@ private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], m
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
/** /**
* This class manages the execution of the receivers of NetworkInputDStreams. * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
* this class must be created after all input streams have been added and StreamingContext.start()
* has been called because it needs the final set of input streams at the time of instantiation.
*/ */
private[streaming] private[streaming]
class NetworkInputTracker(ssc: StreamingContext) extends Logging { class NetworkInputTracker(ssc: StreamingContext) extends Logging {
@ -49,23 +51,33 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
val receiverExecutor = new ReceiverExecutor() val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef] val receiverInfo = new HashMap[Int, ActorRef]
val receivedBlockIds = new HashMap[Int, Queue[BlockId]] val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
val timeout = 5000.milliseconds val timeout = AkkaUtils.askTimeout(ssc.conf)
// actor is created when generator starts.
// This not being null means the tracker has been started and not stopped
var actor: ActorRef = null
var currentTime: Time = null var currentTime: Time = null
/** Start the actor and receiver execution thread. */ /** Start the actor and receiver execution thread. */
def start() { def start() {
if (actor != null) {
throw new SparkException("NetworkInputTracker already started")
}
if (!networkInputStreams.isEmpty) { if (!networkInputStreams.isEmpty) {
ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
receiverExecutor.start() receiverExecutor.start()
logInfo("NetworkInputTracker started")
} }
} }
/** Stop the receiver execution thread. */ /** Stop the receiver execution thread. */
def stop() { def stop() {
if (!networkInputStreams.isEmpty) { if (!networkInputStreams.isEmpty && actor != null) {
receiverExecutor.interrupt() receiverExecutor.interrupt()
receiverExecutor.stopReceivers() receiverExecutor.stopReceivers()
ssc.env.actorSystem.stop(actor)
logInfo("NetworkInputTracker stopped") logInfo("NetworkInputTracker stopped")
} }
} }

View file

@ -24,9 +24,10 @@ import org.apache.spark.util.Distribution
sealed trait StreamingListenerEvent sealed trait StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
/** /**
* A listener interface for receiving information about an ongoing streaming * A listener interface for receiving information about an ongoing streaming

View file

@ -31,7 +31,7 @@ private[spark] class StreamingListenerBus() extends Logging {
private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY) private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false private var queueFullErrorMessageLogged = false
new Thread("StreamingListenerBus") { val listenerThread = new Thread("StreamingListenerBus") {
setDaemon(true) setDaemon(true)
override def run() { override def run() {
while (true) { while (true) {
@ -41,11 +41,18 @@ private[spark] class StreamingListenerBus() extends Logging {
listeners.foreach(_.onBatchStarted(batchStarted)) listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted => case batchCompleted: StreamingListenerBatchCompleted =>
listeners.foreach(_.onBatchCompleted(batchCompleted)) listeners.foreach(_.onBatchCompleted(batchCompleted))
case StreamingListenerShutdown =>
// Get out of the while loop and shutdown the daemon thread
return
case _ => case _ =>
} }
} }
} }
}.start() }
def start() {
listenerThread.start()
}
def addListener(listener: StreamingListener) { def addListener(listener: StreamingListener) {
listeners += listener listeners += listener
@ -54,9 +61,9 @@ private[spark] class StreamingListenerBus() extends Logging {
def post(event: StreamingListenerEvent) { def post(event: StreamingListenerEvent) {
val eventAdded = eventQueue.offer(event) val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) { if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " + logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " + "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.") "rate at which events are being started by the scheduler.")
queueFullErrorMessageLogged = true queueFullErrorMessageLogged = true
} }
} }
@ -68,7 +75,7 @@ private[spark] class StreamingListenerBus() extends Logging {
*/ */
def waitUntilEmpty(timeoutMillis: Int): Boolean = { def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty()) { while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) { if (System.currentTimeMillis > finishTime) {
return false return false
} }
@ -78,4 +85,6 @@ private[spark] class StreamingListenerBus() extends Logging {
} }
return true return true
} }
def stop(): Unit = post(StreamingListenerShutdown)
} }

View file

@ -20,17 +20,7 @@ package org.apache.spark.streaming.util
private[streaming] private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
private val minPollTime = 25L private val thread = new Thread("RecurringTimer") {
private val pollTime = {
if (period / 10.0 > minPollTime) {
(period / 10.0).toLong
} else {
minPollTime
}
}
private val thread = new Thread() {
override def run() { loop } override def run() { loop }
} }
@ -66,7 +56,6 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
callback(nextTime) callback(nextTime)
nextTime += period nextTime += period
} }
} catch { } catch {
case e: InterruptedException => case e: InterruptedException =>
} }

View file

@ -45,6 +45,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc.stop() ssc.stop()
ssc = null ssc = null
} }
if (sc != null) {
sc.stop()
sc = null
}
} }
test("from no conf constructor") { test("from no conf constructor") {
@ -124,6 +128,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop multiple times") { test("stop multiple times") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register
ssc.start()
ssc.stop() ssc.stop()
ssc.stop() ssc.stop()
ssc = null ssc = null
@ -131,9 +137,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop only streaming context") { test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration) ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register
ssc.start()
ssc.stop(false) ssc.stop(false)
ssc = null ssc = null
assert(sc.makeRDD(1 to 100).collect().size === 100) assert(sc.makeRDD(1 to 100).collect().size === 100)
ssc = new StreamingContext(sc, batchDuration)
} }
test("waitForStop") { test("waitForStop") {

View file

@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
Thread.sleep(10) ssc.waitForStop(50)
} }
val timeTaken = System.currentTimeMillis() - startTime val timeTaken = System.currentTimeMillis() - startTime
logInfo("Output generated in " + timeTaken + " milliseconds")
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")