[SPARK-4949]shutdownCallback in SparkDeploySchedulerBackend should be enclosed by synchronized block.

A variable `shutdownCallback` in SparkDeploySchedulerBackend can be accessed from multiple threads so it should be enclosed by synchronized block.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #3781 from sarutak/SPARK-4949 and squashes the following commits:

c146c93 [Kousuke Saruta] Removed "setShutdownCallback" method
c7265dc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949
42ca528 [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference
552df7c [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference
f556819 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949
1b60fd1 [Kousuke Saruta] Improved the locking logics
5942765 [Kousuke Saruta] Enclosed shutdownCallback in SparkDeploySchedulerBackend by synchronized block
This commit is contained in:
Kousuke Saruta 2015-02-18 12:20:11 +00:00 committed by Sean Owen
parent e79a7a626d
commit 82197ed3bd

View file

@ -17,6 +17,8 @@
package org.apache.spark.scheduler.cluster
import java.util.concurrent.Semaphore
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
@ -31,16 +33,16 @@ private[spark] class SparkDeploySchedulerBackend(
with AppClientListener
with Logging {
var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
@volatile var appId: String = _
private var client: AppClient = null
private var stopping = false
val registrationLock = new Object()
var registrationDone = false
@volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _
@volatile private var appId: String = _
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
val totalExpectedCores = maxCores.getOrElse(0)
private val registrationBarrier = new Semaphore(0)
private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
private val totalExpectedCores = maxCores.getOrElse(0)
override def start() {
super.start()
@ -95,8 +97,10 @@ private[spark] class SparkDeploySchedulerBackend(
stopping = true
super.stop()
client.stop()
if (shutdownCallback != null) {
shutdownCallback(this)
val callback = shutdownCallback
if (callback != null) {
callback(this)
}
}
@ -149,18 +153,11 @@ private[spark] class SparkDeploySchedulerBackend(
}
private def waitForRegistration() = {
registrationLock.synchronized {
while (!registrationDone) {
registrationLock.wait()
}
}
registrationBarrier.acquire()
}
private def notifyContext() = {
registrationLock.synchronized {
registrationDone = true
registrationLock.notifyAll()
}
registrationBarrier.release()
}
}