[SPARK-18975][CORE] Add an API to remove SparkListener
## What changes were proposed in this pull request? In current Spark we could add customized SparkListener through `SparkContext#addListener` API, but there's no equivalent API to remove the registered one. In our scenario SparkListener will be added repeatedly accordingly to the changed environment. If lacks the ability to remove listeners, there might be many registered listeners finally, this is unnecessary and potentially affects the performance. So here propose to add an API to remove registered listener. ## How was this patch tested? Add an unit test to verify it. Author: jerryshao <sshao@hortonworks.com> Closes #16382 from jerryshao/SPARK-18975.
This commit is contained in:
parent
2615100055
commit
31da755c80
|
@ -1571,6 +1571,15 @@ class SparkContext(config: SparkConf) extends Logging {
|
||||||
listenerBus.addListener(listener)
|
listenerBus.addListener(listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* :: DeveloperApi ::
|
||||||
|
* Deregister the listener from Spark's listener bus.
|
||||||
|
*/
|
||||||
|
@DeveloperApi
|
||||||
|
def removeSparkListener(listener: SparkListenerInterface): Unit = {
|
||||||
|
listenerBus.removeListener(listener)
|
||||||
|
}
|
||||||
|
|
||||||
private[spark] def getExecutorIds(): Seq[String] = {
|
private[spark] def getExecutorIds(): Seq[String] = {
|
||||||
schedulerBackend match {
|
schedulerBackend match {
|
||||||
case b: CoarseGrainedSchedulerBackend =>
|
case b: CoarseGrainedSchedulerBackend =>
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.TextInputFormat
|
||||||
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
|
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
|
||||||
import org.scalatest.Matchers._
|
import org.scalatest.Matchers._
|
||||||
|
|
||||||
|
import org.apache.spark.scheduler.SparkListener
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
|
class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
|
||||||
|
@ -451,4 +452,19 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
|
||||||
sc.stop()
|
sc.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("register and deregister Spark listener from SparkContext") {
|
||||||
|
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
|
||||||
|
try {
|
||||||
|
val sparkListener1 = new SparkListener { }
|
||||||
|
val sparkListener2 = new SparkListener { }
|
||||||
|
sc.addSparkListener(sparkListener1)
|
||||||
|
sc.addSparkListener(sparkListener2)
|
||||||
|
assert(sc.listenerBus.listeners.contains(sparkListener1))
|
||||||
|
assert(sc.listenerBus.listeners.contains(sparkListener2))
|
||||||
|
sc.removeSparkListener(sparkListener1)
|
||||||
|
assert(!sc.listenerBus.listeners.contains(sparkListener1))
|
||||||
|
assert(sc.listenerBus.listeners.contains(sparkListener2))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue