Starvation check in Standlone scheduler

This commit is contained in:
Patrick Wendell 2013-02-03 12:17:20 -08:00
parent 667860448a
commit b14322956c

View file

@ -31,6 +31,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
var firstJob: Option[JobInfo] = None
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
@ -191,6 +193,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
}
if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 &&
firstJob.isDefined &&
firstJob.get.executors.size == 0) {
logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.")
}
}
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
@ -232,6 +239,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
idToJob(job.id) = job
actorToJob(driver) = job
addressToJob(driver.path.address) = job
if (!firstJob.isDefined) firstJob = Some(job)
return job
}