Merge branch 'master' of github.com:mesos/spark into scala-2.10

This commit is contained in:
Prashant Sharma 2013-07-12 15:12:46 +05:30
commit a220e11a07

View file

@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend(
// An ExecutorInfo for our tasks // An ExecutorInfo for our tasks
var execArgs: Array[Byte] = null var execArgs: Array[Byte] = null
var classLoader: ClassLoader = null
override def start() { override def start() {
synchronized { synchronized {
classLoader = Thread.currentThread.getContextClassLoader
new Thread("MesosSchedulerBackend driver") { new Thread("MesosSchedulerBackend driver") {
setDaemon(true) setDaemon(true)
override def run() { override def run() {
@ -114,13 +118,28 @@ private[spark] class MesosSchedulerBackend(
return execArgs return execArgs
} }
private def setClassLoader(): ClassLoader = {
val oldClassLoader = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader)
return oldClassLoader
}
private def restoreClassLoader(oldClassLoader: ClassLoader) {
Thread.currentThread.setContextClassLoader(oldClassLoader)
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
logInfo("Registered as framework ID " + frameworkId.getValue) val oldClassLoader = setClassLoader()
registeredLock.synchronized { try {
isRegistered = true logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.notifyAll() registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
}
} finally {
restoreClassLoader(oldClassLoader)
} }
} }
@ -142,49 +161,54 @@ private[spark] class MesosSchedulerBackend(
* tasks are balanced across the cluster. * tasks are balanced across the cluster.
*/ */
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized { val oldClassLoader = setClassLoader()
// Build a big list of the offerable workers, and remember their indices so that we can try {
// figure out which Offer to reply to for each worker synchronized {
val offerableIndices = new ArrayBuffer[Int] // Build a big list of the offerable workers, and remember their indices so that we can
val offerableWorkers = new ArrayBuffer[WorkerOffer] // figure out which Offer to reply to for each worker
val offerableIndices = new ArrayBuffer[Int]
val offerableWorkers = new ArrayBuffer[WorkerOffer]
def enoughMemory(o: Offer) = { def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem") val mem = getResource(o.getResourcesList, "mem")
val slaveId = o.getSlaveId.getValue val slaveId = o.getSlaveId.getValue
mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId) mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
} }
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
offerableIndices += index offerableIndices += index
offerableWorkers += new WorkerOffer( offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue, offer.getSlaveId.getValue,
offer.getHostname, offer.getHostname,
getResource(offer.getResourcesList, "cpus").toInt) getResource(offer.getResourcesList, "cpus").toInt)
} }
// Call into the ClusterScheduler // Call into the ClusterScheduler
val taskLists = scheduler.resourceOffers(offerableWorkers) val taskLists = scheduler.resourceOffers(offerableWorkers)
// Build a list of Mesos tasks for each slave // Build a list of Mesos tasks for each slave
val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
for ((taskList, index) <- taskLists.zipWithIndex) { for ((taskList, index) <- taskLists.zipWithIndex) {
if (!taskList.isEmpty) { if (!taskList.isEmpty) {
val offerNum = offerableIndices(index) val offerNum = offerableIndices(index)
val slaveId = offers(offerNum).getSlaveId.getValue val slaveId = offers(offerNum).getSlaveId.getValue
slaveIdsWithExecutors += slaveId slaveIdsWithExecutors += slaveId
mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
for (taskDesc <- taskList) { for (taskDesc <- taskList) {
taskIdToSlaveId(taskDesc.taskId) = slaveId taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
}
} }
} }
}
// Reply to the offers // Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
for (i <- 0 until offers.size) { for (i <- 0 until offers.size) {
d.launchTasks(offers(i).getId, mesosTasks(i), filters) d.launchTasks(offers(i).getId, mesosTasks(i), filters)
}
} }
} finally {
restoreClassLoader(oldClassLoader)
} }
} }
@ -224,23 +248,33 @@ private[spark] class MesosSchedulerBackend(
} }
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val tid = status.getTaskId.getValue.toLong val oldClassLoader = setClassLoader()
val state = TaskState.fromMesos(status.getState) try {
synchronized { val tid = status.getTaskId.getValue.toLong
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { val state = TaskState.fromMesos(status.getState)
// We lost the executor on this slave, so remember that it's gone synchronized {
slaveIdsWithExecutors -= taskIdToSlaveId(tid) if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
} // We lost the executor on this slave, so remember that it's gone
if (isFinished(status.getState)) { slaveIdsWithExecutors -= taskIdToSlaveId(tid)
taskIdToSlaveId.remove(tid) }
if (isFinished(status.getState)) {
taskIdToSlaveId.remove(tid)
}
} }
scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
} finally {
restoreClassLoader(oldClassLoader)
} }
scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
} }
override def error(d: SchedulerDriver, message: String) { override def error(d: SchedulerDriver, message: String) {
logError("Mesos error: " + message) val oldClassLoader = setClassLoader()
scheduler.error(message) try {
logError("Mesos error: " + message)
scheduler.error(message)
} finally {
restoreClassLoader(oldClassLoader)
}
} }
override def stop() { override def stop() {
@ -256,11 +290,16 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
logInfo("Mesos slave lost: " + slaveId.getValue) val oldClassLoader = setClassLoader()
synchronized { try {
slaveIdsWithExecutors -= slaveId.getValue logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
slaveIdsWithExecutors -= slaveId.getValue
}
scheduler.executorLost(slaveId.getValue, reason)
} finally {
restoreClassLoader(oldClassLoader)
} }
scheduler.executorLost(slaveId.getValue, reason)
} }
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {