[SPARK-18838][CORE] Add separate listener queues to LiveListenerBus.

This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.

The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.

The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.

Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19211 from vanzin/SPARK-18838.
This commit is contained in:
Marcelo Vanzin 2017-09-20 13:41:29 +08:00 committed by Wenchen Fan
parent 718bbc9390
commit c6ff59a230
16 changed files with 470 additions and 287 deletions

View file

@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager(
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
listenerBus.addToManagementQueue(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {

View file

@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
this(sc, new SystemClock)
}
sc.addSparkListener(this)
sc.listenerBus.addToManagementQueue(this)
override val rpcEnv: RpcEnv = sc.env.rpcEnv

View file

@ -419,7 +419,7 @@ class SparkContext(config: SparkConf) extends Logging {
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
listenerBus.addToStatusQueue(jobProgressListener)
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
@ -442,7 +442,7 @@ class SparkContext(config: SparkConf) extends Logging {
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
@ -522,7 +522,7 @@ class SparkContext(config: SparkConf) extends Logging {
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
@ -1563,7 +1563,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def addSparkListener(listener: SparkListenerInterface) {
listenerBus.addListener(listener)
listenerBus.addToSharedQueue(listener)
}
/**
@ -1879,8 +1879,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def stop(): Unit = {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(
s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
throw new SparkException(s"Cannot stop SparkContext within listener bus thread.")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
@ -2378,7 +2377,7 @@ class SparkContext(config: SparkConf) extends Logging {
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
listenerBus.addListener(listener)
listenerBus.addToSharedQueue(listener)
logInfo(s"Registered listener $className")
}
} catch {

View file

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import com.codahale.metrics.{Gauge, Timer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
/**
* An asynchronous queue for events. All events posted to this queue will be delivered to the child
* listeners in a separate thread.
*
* Delivery will only begin when the `start()` method is called. The `stop()` method should be
* called when no more events need to be delivered.
*/
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
extends SparkListenerBus
with Logging {
import AsyncEventQueue._
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
// Keep the event count separately, so that waitUntilEmpty() can be implemented properly;
// this allows that method to return only when the events in the queue have been fully
// processed (instead of just dequeued).
private val eventCount = new AtomicLong()
/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
private val logDroppedEvent = new AtomicBoolean(false)
private var sc: SparkContext = null
private val started = new AtomicBoolean(false)
private val stopped = new AtomicBoolean(false)
private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
// Remove the queue size gauge first, in case it was created by a previous incarnation of
// this queue that was removed from the listener bus.
metrics.metricRegistry.remove(s"queue.$name.size")
metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
override def getValue: Int = eventQueue.size()
})
private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
try {
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
}
eventCount.decrementAndGet()
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
}
}
override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
}
/**
* Start an asynchronous thread to dispatch events to the underlying listeners.
*
* @param sc Used to stop the SparkContext in case the async dispatcher fails.
*/
private[scheduler] def start(sc: SparkContext): Unit = {
if (started.compareAndSet(false, true)) {
this.sc = sc
dispatchThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}
/**
* Stop the listener bus. It will wait until the queued events have been processed, but new
* events will be dropped.
*/
private[scheduler] def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
eventQueue.put(POISON_PILL)
eventCount.incrementAndGet()
}
dispatchThread.join()
}
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
eventCount.incrementAndGet()
if (eventQueue.offer(event)) {
return
}
eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")
val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedEvents events from $name since $previous.")
}
}
}
}
/**
* For testing only. Wait until there are no more events in the queue.
*
* @return true if the queue is empty.
*/
def waitUntilEmpty(deadline: Long): Boolean = {
while (eventCount.get() != 0) {
if (System.currentTimeMillis > deadline) {
return false
}
Thread.sleep(10)
}
true
}
}
private object AsyncEventQueue {
val POISON_PILL = new SparkListenerEvent() { }
}

View file

@ -17,20 +17,22 @@
package org.apache.spark.scheduler
import java.util.{List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.DynamicVariable
import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
import com.codahale.metrics.{Counter, MetricRegistry, Timer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.util.Utils
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
@ -39,20 +41,13 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
self =>
private[spark] class LiveListenerBus(conf: SparkConf) {
import LiveListenerBus._
private var sparkContext: SparkContext = _
// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private val eventQueue =
new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue)
private[spark] val metrics = new LiveListenerBusMetrics(conf)
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
private val logDroppedEvent = new AtomicBoolean(false)
/** Add a listener to queue shared by all non-internal listeners. */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
}
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
/** Add a listener to the executor management queue. */
def addToManagementQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
}
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
val timer = metrics.eventProcessingTime
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
val timerContext = timer.time()
try {
postToAll(event)
} finally {
timerContext.stop()
}
} finally {
self.synchronized {
processingEvent = false
}
}
/** Add a listener to the application status queue. */
def addToStatusQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, APP_STATUS_QUEUE)
}
/** Add a listener to the event log queue. */
def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EVENT_LOG_QUEUE)
}
/**
* Add a listener to a specific queue, creating a new queue if needed. Queues are independent
* of each other (each one uses a separate thread for delivering events), allowing slower
* listeners to be somewhat isolated from others.
*/
private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
}
queues.add(newQueue)
}
}
override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
def removeListener(listener: SparkListenerInterface): Unit = synchronized {
// Remove listener from all queues it was added to, and stop queues that have become empty.
queues.asScala
.filter { queue =>
queue.removeListener(listener)
queue.listeners.isEmpty()
}
.foreach { toRemove =>
if (started.get() && !stopped.get()) {
toRemove.stop()
}
queues.remove(toRemove)
}
}
/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
if (!stopped.get()) {
metrics.numEventsPosted.inc()
val it = queues.iterator()
while (it.hasNext()) {
it.next().post(event)
}
}
}
/**
@ -123,46 +141,14 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = {
if (started.compareAndSet(false, true)) {
sparkContext = sc
metricsSystem.registerSource(metrics)
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logDebug(s"$name has already stopped! Dropping event $event")
return
}
metrics.numEventsPosted.inc()
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("LiveListenerBus already started.")
}
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
this.sparkContext = sc
queues.asScala.foreach(_.start(sc))
metricsSystem.registerSource(metrics)
}
/**
@ -173,80 +159,64 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
*/
@throws(classOf[TimeoutException])
def waitUntilEmpty(timeoutMillis: Long): Unit = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
throw new TimeoutException(
s"The event queue is not empty after $timeoutMillis milliseconds")
val deadline = System.currentTimeMillis + timeoutMillis
queues.asScala.foreach { queue =>
if (!queue.waitUntilEmpty(deadline)) {
throw new TimeoutException(s"The event queue is not empty after $timeoutMillis ms.")
}
/* Sleep rather than using wait/notify, because this is used only for testing and
* wait/notify add overhead in the general case. */
Thread.sleep(10)
}
}
/**
* For testing only. Return whether the listener daemon thread is still alive.
* Exposed for testing.
*/
def listenerThreadIsAlive: Boolean = listenerThread.isAlive
/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
/**
* Stop the listener bus. It will wait until the queued events have been processed, but drop the
* new events after stopping.
*/
def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
throw new IllegalStateException(s"Attempted to stop bus that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
// `stop` is called.
eventLock.release()
listenerThread.join()
} else {
// Keep quiet
if (!stopped.compareAndSet(false, true)) {
return
}
synchronized {
queues.asScala.foreach(_.stop())
queues.clear()
}
}
/**
* If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
* notified with the dropped events.
*
* Note: `onDropEvent` can be called in any thread.
*/
def onDropEvent(event: SparkListenerEvent): Unit = {
metrics.numDroppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")
// For testing only.
private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = {
queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }
}
// For testing only.
private[spark] def listeners: JList[SparkListenerInterface] = {
queues.asScala.flatMap(_.listeners.asScala).asJava
}
// For testing only.
private[scheduler] def activeQueues(): Set[String] = {
queues.asScala.map(_.name).toSet
}
}
private[spark] object LiveListenerBus {
// Allows for Context to check whether stop() call is made within listener thread
val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
/** The thread name of Spark listener bus */
val name = "SparkListenerBus"
private[scheduler] val SHARED_QUEUE = "shared"
private[scheduler] val APP_STATUS_QUEUE = "appStatus"
private[scheduler] val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement"
private[scheduler] val EVENT_LOG_QUEUE = "eventLog"
}
private[spark] class LiveListenerBusMetrics(
conf: SparkConf,
queue: LinkedBlockingQueue[_])
private[spark] class LiveListenerBusMetrics(conf: SparkConf)
extends Source with Logging {
override val sourceName: String = "LiveListenerBus"
@ -260,25 +230,6 @@ private[spark] class LiveListenerBusMetrics(
*/
val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))
/**
* The total number of events that were dropped without being delivered to listeners.
*/
val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
/**
* The amount of time taken to post a single event to all listeners.
*/
val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
/**
* The number of messages waiting in the queue.
*/
val queueSize: Gauge[Int] = {
metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
override def getValue: Int = queue.size()
})
}
// Guarded by synchronization.
private val perListenerClassTimers = mutable.Map[String, Timer]()
@ -303,5 +254,5 @@ private[spark] class LiveListenerBusMetrics(
}
}
}
}
}

View file

@ -162,13 +162,14 @@ private[spark] object SparkUI {
def createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
create(Some(sc), conf,
sc.listenerBus.addToStatusQueue,
securityManager, appName, jobProgressListener = Some(jobProgressListener),
startTime = startTime)
}
def createHistoryUI(
@ -179,8 +180,7 @@ private[spark] object SparkUI {
basePath: String,
lastUpdateTime: Option[Long],
startTime: Long): SparkUI = {
val sparkUI = create(
None, conf, listenerBus, securityManager, appName, basePath,
val sparkUI = create(None, conf, listenerBus.addListener, securityManager, appName, basePath,
lastUpdateTime = lastUpdateTime, startTime = startTime)
val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
@ -202,7 +202,7 @@ private[spark] object SparkUI {
private def create(
sc: Option[SparkContext],
conf: SparkConf,
listenerBus: SparkListenerBus,
addListenerFn: SparkListenerInterface => Unit,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
@ -212,7 +212,7 @@ private[spark] object SparkUI {
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
addListenerFn(listener)
listener
}
@ -222,11 +222,11 @@ private[spark] object SparkUI {
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
addListenerFn(environmentListener)
addListenerFn(storageStatusListener)
addListenerFn(executorsListener)
addListenerFn(storageListener)
addListenerFn(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,

View file

@ -49,6 +49,11 @@ class ExecutorAllocationManagerSuite
contexts.foreach(_.stop())
}
private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
bus.post(event)
bus.waitUntilEmpty(1000)
}
test("verify min/max executors") {
val conf = new SparkConf()
.setMaster("myDummyLocalExternalClusterManager")
@ -95,7 +100,7 @@ class ExecutorAllocationManagerSuite
test("add executors") {
sc = createSparkContext(1, 10, 1)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Keep adding until the limit is reached
assert(numExecutorsTarget(manager) === 1)
@ -140,7 +145,7 @@ class ExecutorAllocationManagerSuite
test("add executors capped by num pending tasks") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 5)))
// Verify that we're capped at number of tasks in the stage
assert(numExecutorsTarget(manager) === 0)
@ -156,10 +161,10 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3)))
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 1)
assert(numExecutorsTarget(manager) === 6)
@ -172,9 +177,9 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
// Verify that re-running a task doesn't blow things up
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 3)))
post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
assert(addExecutors(manager) === 1)
assert(numExecutorsTarget(manager) === 9)
assert(numExecutorsToAdd(manager) === 2)
@ -183,7 +188,7 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task once we're at our limit doesn't blow things up
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
assert(addExecutors(manager) === 0)
assert(numExecutorsTarget(manager) === 10)
}
@ -193,13 +198,13 @@ class ExecutorAllocationManagerSuite
val manager = sc.executorAllocationManager.get
// Verify that we're capped at number of tasks including the speculative ones in the stage
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2)))
assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
@ -210,13 +215,13 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a speculative task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true)))
post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true)))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
@ -225,7 +230,7 @@ class ExecutorAllocationManagerSuite
test("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))
assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
@ -236,15 +241,15 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 3)
val task1Info = createTaskInfo(0, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
post(sc.listenerBus, SparkListenerTaskStart(2, 0, task1Info))
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)
val task2Info = createTaskInfo(1, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null))
post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info))
post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, null))
post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, null))
assert(adjustRequestedExecutors(manager) === -1)
}
@ -352,21 +357,22 @@ class ExecutorAllocationManagerSuite
sc = createSparkContext(5, 12, 5)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 8)))
// Remove when numExecutorsTarget is the same as the current number of executors
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
(1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) }
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 8)
assert(maxNumExecutorsNeeded(manager) == 8)
assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors
// Remove executors when numExecutorsTarget is lower than current number of executors
(1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) }
(1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { info =>
post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, info, null))
}
adjustRequestedExecutors(manager)
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 5)
@ -378,7 +384,7 @@ class ExecutorAllocationManagerSuite
onExecutorRemoved(manager, "3")
// numExecutorsTarget is lower than minNumExecutors
sc.listenerBus.postToAll(
post(sc.listenerBus,
SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null))
assert(executorIds(manager).size === 5)
assert(numExecutorsTarget(manager) === 5)
@ -390,7 +396,7 @@ class ExecutorAllocationManagerSuite
test ("interleaving add and remove") {
sc = createSparkContext(5, 12, 5)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Add a few executors
assert(addExecutors(manager) === 1)
@ -569,7 +575,7 @@ class ExecutorAllocationManagerSuite
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Scheduler queue backlogged
onSchedulerBacklogged(manager)
@ -682,26 +688,26 @@ class ExecutorAllocationManagerSuite
// Starting a stage should start the add timer
val numTasks = 10
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, numTasks)))
assert(addTime(manager) !== NOT_SET)
// Starting a subset of the tasks should not cancel the add timer
val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") }
taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
taskInfos.tail.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) }
assert(addTime(manager) !== NOT_SET)
// Starting all remaining tasks should cancel the add timer
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head))
post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfos.head))
assert(addTime(manager) === NOT_SET)
// Start two different stages
// The add timer should be canceled only if all tasks in both stages start running
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks)))
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, numTasks)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, numTasks)))
assert(addTime(manager) !== NOT_SET)
taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) }
taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(1, 0, info)) }
assert(addTime(manager) !== NOT_SET)
taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) }
taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(2, 0, info)) }
assert(addTime(manager) === NOT_SET)
}
@ -715,22 +721,22 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager).size === 5)
// Starting a task cancel the remove timer for that executor
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1")))
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2")))
post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1")))
post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2")))
assert(removeTimes(manager).size === 3)
assert(!removeTimes(manager).contains("executor-1"))
assert(!removeTimes(manager).contains("executor-2"))
// Finishing all tasks running on an executor should start the remove timer for that executor
sc.listenerBus.postToAll(SparkListenerTaskEnd(
post(sc.listenerBus, SparkListenerTaskEnd(
0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics))
sc.listenerBus.postToAll(SparkListenerTaskEnd(
post(sc.listenerBus, SparkListenerTaskEnd(
0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics))
assert(removeTimes(manager).size === 4)
assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet
assert(removeTimes(manager).contains("executor-2"))
sc.listenerBus.postToAll(SparkListenerTaskEnd(
post(sc.listenerBus, SparkListenerTaskEnd(
0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics))
assert(removeTimes(manager).size === 5)
assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished
@ -743,13 +749,13 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager).isEmpty)
// New executors have registered
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(removeTimes(manager).contains("executor-1"))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
@ -757,14 +763,14 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager).contains("executor-2"))
// Existing executors have disconnected
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", ""))
post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-1", ""))
assert(executorIds(manager).size === 1)
assert(!executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(!removeTimes(manager).contains("executor-1"))
// Unknown executor has disconnected
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", ""))
post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-3", ""))
assert(executorIds(manager).size === 1)
assert(removeTimes(manager).size === 1)
}
@ -775,8 +781,8 @@ class ExecutorAllocationManagerSuite
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
@ -788,15 +794,15 @@ class ExecutorAllocationManagerSuite
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
@ -809,7 +815,7 @@ class ExecutorAllocationManagerSuite
sc = createSparkContext(0, 100000, 0)
val manager = sc.executorAllocationManager.get
val stage1 = createStageInfo(0, 1000)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
post(sc.listenerBus, SparkListenerStageSubmitted(stage1))
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
@ -820,12 +826,12 @@ class ExecutorAllocationManagerSuite
onExecutorAdded(manager, s"executor-$i")
}
assert(executorIds(manager).size === 15)
sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
post(sc.listenerBus, SparkListenerStageCompleted(stage1))
adjustRequestedExecutors(manager)
assert(numExecutorsTarget(manager) === 0)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 1000)))
addExecutors(manager)
assert(numExecutorsTarget(manager) === 16)
}
@ -842,7 +848,7 @@ class ExecutorAllocationManagerSuite
// Verify whether the initial number of executors is kept with no pending tasks
assert(numExecutorsTarget(manager) === 3)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2)))
clock.advance(100L)
assert(maxNumExecutorsNeeded(manager) === 2)
@ -892,7 +898,7 @@ class ExecutorAllocationManagerSuite
Seq.empty
)
val stageInfo1 = createStageInfo(1, 5, localityPreferences1)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1))
post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo1))
assert(localityAwareTasks(manager) === 3)
assert(hostToLocalTaskCount(manager) ===
@ -904,13 +910,13 @@ class ExecutorAllocationManagerSuite
Seq.empty
)
val stageInfo2 = createStageInfo(2, 3, localityPreferences2)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2))
post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo2))
assert(localityAwareTasks(manager) === 5)
assert(hostToLocalTaskCount(manager) ===
Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2))
sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1))
post(sc.listenerBus, SparkListenerStageCompleted(stageInfo1))
assert(localityAwareTasks(manager) === 2)
assert(hostToLocalTaskCount(manager) ===
Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
@ -921,16 +927,16 @@ class ExecutorAllocationManagerSuite
val manager = sc.executorAllocationManager.get
assert(maxNumExecutorsNeeded(manager) === 0)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1)))
assert(maxNumExecutorsNeeded(manager) === 1)
val taskInfo = createTaskInfo(1, 1, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo))
assert(maxNumExecutorsNeeded(manager) === 1)
// If the task is failed, we expect it to be resubmitted later.
val taskEndReason = ExceptionFailure(null, null, null, null, None)
sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null))
post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null))
assert(maxNumExecutorsNeeded(manager) === 1)
}
@ -942,7 +948,7 @@ class ExecutorAllocationManagerSuite
// Allocation manager is reset when adding executor requests are sent without reporting back
// executor added.
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10)))
assert(addExecutors(manager) === 1)
assert(numExecutorsTarget(manager) === 2)
@ -957,7 +963,7 @@ class ExecutorAllocationManagerSuite
assert(executorIds(manager) === Set.empty)
// Allocation manager is reset when executors are added.
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10)))
addExecutors(manager)
addExecutors(manager)

View file

@ -751,7 +751,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Helper functions to extract commonly used code in Fetch Failure test cases
private def setupStageAbortTest(sc: SparkContext) {
sc.listenerBus.addListener(new EndListener())
sc.listenerBus.addToSharedQueue(new EndListener())
ended = false
jobResult = null
}

View file

@ -164,9 +164,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem]))
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
listenerBus.addToEventLogQueue(eventLogger)
listenerBus.post(applicationStart)
listenerBus.post(applicationEnd)
listenerBus.stop()
eventLogger.stop()

View file

@ -34,6 +34,8 @@ import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {
import LiveListenerBus._
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
@ -42,18 +44,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext])
private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem])
private def numDroppedEvents(bus: LiveListenerBus): Long = {
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
}
private def queueSize(bus: LiveListenerBus): Int = {
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
.asInstanceOf[Int]
}
private def eventProcessingTimeCount(bus: LiveListenerBus): Long = {
bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount()
}
test("don't call sc.stop in listener") {
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
val listener = new SparkContextStoppingListener(sc)
val bus = new LiveListenerBus(sc.conf)
bus.addListener(listener)
// Starting listener bus should flush all buffered events
bus.start(sc, sc.env.metricsSystem)
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.addToSharedQueue(listener)
sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.stop()
bus.stop()
assert(listener.sparkExSeen)
}
@ -61,13 +73,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val conf = new SparkConf()
val counter = new BasicJobCounter
val bus = new LiveListenerBus(conf)
bus.addListener(counter)
bus.addToSharedQueue(counter)
// Metrics are initially empty.
assert(bus.metrics.numEventsPosted.getCount === 0)
assert(bus.metrics.numDroppedEvents.getCount === 0)
assert(bus.metrics.queueSize.getValue === 0)
assert(bus.metrics.eventProcessingTime.getCount === 0)
assert(numDroppedEvents(bus) === 0)
assert(queueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 0)
// Post five events:
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
@ -75,7 +87,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Five messages should be marked as received and queued, but no messages should be posted to
// listeners yet because the the listener bus hasn't been started.
assert(bus.metrics.numEventsPosted.getCount === 5)
assert(bus.metrics.queueSize.getValue === 5)
assert(queueSize(bus) === 5)
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
@ -83,18 +95,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
assert(bus.metrics.queueSize.getValue === 0)
assert(bus.metrics.eventProcessingTime.getCount === 5)
assert(queueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 5)
// After listener bus has stopped, posting events should not increment counter
bus.stop()
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 5)
assert(bus.metrics.numEventsPosted.getCount === 5)
// Make sure per-listener-class timers were created:
assert(bus.metrics.getTimerForListenerClass(
classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5)
assert(eventProcessingTimeCount(bus) === 5)
// Listener bus must not be started twice
intercept[IllegalStateException] {
@ -135,7 +143,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val bus = new LiveListenerBus(new SparkConf())
val blockingListener = new BlockingListener
bus.addListener(blockingListener)
bus.addToSharedQueue(blockingListener)
bus.start(mockSparkContext, mockMetricsSystem)
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
@ -168,7 +176,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val listenerStarted = new Semaphore(0)
val listenerWait = new Semaphore(0)
bus.addListener(new SparkListener {
bus.addToSharedQueue(new SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
listenerStarted.release()
listenerWait.acquire()
@ -180,20 +188,19 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Post a message to the listener bus and wait for processing to begin:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
assert(bus.metrics.queueSize.getValue === 0)
assert(bus.metrics.numDroppedEvents.getCount === 0)
assert(queueSize(bus) === 0)
assert(numDroppedEvents(bus) === 0)
// If we post an additional message then it should remain in the queue because the listener is
// busy processing the first event:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(bus.metrics.queueSize.getValue === 1)
assert(bus.metrics.numDroppedEvents.getCount === 0)
assert(queueSize(bus) === 1)
assert(numDroppedEvents(bus) === 0)
// The queue is now full, so any additional events posted to the listener will be dropped:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(bus.metrics.queueSize.getValue === 1)
assert(bus.metrics.numDroppedEvents.getCount === 1)
assert(queueSize(bus) === 1)
assert(numDroppedEvents(bus) === 1)
// Allow the the remaining events to be processed so we can stop the listener bus:
listenerWait.release(2)
@ -419,9 +426,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val bus = new LiveListenerBus(new SparkConf())
// Propagate events to bad listener first
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
bus.addToSharedQueue(badListener)
bus.addToSharedQueue(jobCounter1)
bus.addToSharedQueue(jobCounter2)
bus.start(mockSparkContext, mockMetricsSystem)
// Post events to all listeners, and wait until the queue is drained
@ -429,7 +436,6 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// The exception should be caught, and the event should be propagated to other listeners
assert(bus.listenerThreadIsAlive)
assert(jobCounter1.count === 5)
assert(jobCounter2.count === 5)
}
@ -449,6 +455,31 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
.count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1)
}
test("add and remove listeners to/from LiveListenerBus queues") {
val bus = new LiveListenerBus(new SparkConf(false))
val counter1 = new BasicJobCounter()
val counter2 = new BasicJobCounter()
val counter3 = new BasicJobCounter()
bus.addToSharedQueue(counter1)
bus.addToStatusQueue(counter2)
bus.addToStatusQueue(counter3)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 3)
bus.removeListener(counter1)
assert(bus.activeQueues() === Set(APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
bus.removeListener(counter2)
assert(bus.activeQueues() === Set(APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 1)
bus.removeListener(counter3)
assert(bus.activeQueues().isEmpty)
assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
}
/**
* Assert that the given list of numbers has an average that is greater than zero.
*/

View file

@ -27,7 +27,7 @@ import org.apache.spark.storage._
* Test various functionality in the StorageListener that supports the StorageTab.
*/
class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
private var bus: LiveListenerBus = _
private var bus: SparkListenerBus = _
private var storageStatusListener: StorageStatusListener = _
private var storageListener: StorageListener = _
private val memAndDisk = StorageLevel.MEMORY_AND_DISK
@ -43,7 +43,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
before {
val conf = new SparkConf()
bus = new LiveListenerBus(conf)
bus = new ReplayListenerBus()
storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)

View file

@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
import StreamingQueryListener._
sparkListenerBus.addListener(this)
sparkListenerBus.addToSharedQueue(this)
/**
* RunIds of active queries whose events are supposed to be forwarded by this ListenerBus

View file

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.CacheManager
@ -148,7 +149,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
if (SparkSession.sqlListener.get() == null) {
val listener = new SQLListener(sc.conf)
if (SparkSession.sqlListener.compareAndSet(null, listener)) {
sc.addSparkListener(listener)
sc.listenerBus.addToStatusQueue(listener)
sc.ui.foreach(new SQLTab(listener, _))
}
}

View file

@ -659,8 +659,7 @@ class StreamingContext private[streaming] (
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
var shutdownHookRefToRemove: AnyRef = null
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(
s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}")
throw new SparkException(s"Cannot stop StreamingContext within listener bus thread.")
}
synchronized {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet

View file

@ -76,7 +76,7 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
* forward them to StreamingListeners.
*/
def start(): Unit = {
sparkListenerBus.addListener(this) // for getting callbacks on spark events
sparkListenerBus.addToStatusQueue(this)
}
/**

View file

@ -575,8 +575,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
test("getActive and getActiveOrCreate") {
require(StreamingContext.getActive().isEmpty, "context exists from before")
sc = new SparkContext(conf)
var newContextCreated = false
def creatingFunc(): StreamingContext = {
@ -603,6 +601,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getActiveOrCreate should create new context and getActive should return it only
// after starting the context
testGetActiveOrCreate {
sc = new SparkContext(conf)
ssc = StreamingContext.getActiveOrCreate(creatingFunc _)
assert(ssc != null, "no context created")
assert(newContextCreated === true, "new context not created")
@ -622,6 +621,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getActiveOrCreate and getActive should return independently created context after activating
testGetActiveOrCreate {
sc = new SparkContext(conf)
ssc = creatingFunc() // Create
assert(StreamingContext.getActive().isEmpty,
"new initialized context returned before starting")