[SPARK-10748][MESOS] Log error instead of crashing Spark Mesos dispatcher when a job is misconfigured

## What changes were proposed in this pull request?

Now handling the spark exception which gets thrown for invalid job configuration, marking that job as failed and continuing to launch the other drivers instead of throwing the exception.
## How was this patch tested?

I verified manually, now the misconfigured jobs move to Finished Drivers section in UI and continue to launch the other jobs.

Author: Devaraj K <devaraj@apache.org>

Closes #13077 from devaraj-kavali/SPARK-10748.
This commit is contained in:
Devaraj K 2017-02-10 14:11:56 +00:00 committed by Sean Owen
parent 8e8afb3a34
commit 8640dc0823
No known key found for this signature in database
GPG key ID: BEB3956D6717BDDC

View file

@ -559,15 +559,25 @@ private[spark] class MesosClusterScheduler(
} else {
val offer = offerOption.get
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None, getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
try {
val task = createTaskInfo(submission, offer)
queuedTasks += task
logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
None, new Date(), None, getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
} catch {
case e: SparkException =>
afterLaunchCallback(submission.submissionId)
finishedDrivers += new MesosClusterSubmissionState(submission, TaskID.newBuilder().
setValue(submission.submissionId).build(), SlaveID.newBuilder().setValue("").
build(), None, null, None, getDriverFrameworkID(submission))
logError(s"Failed to launch the driver with id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem, reason: ${e.getMessage}")
}
}
}
}