Rename "jobs" to "applications" in the standalone cluster

This commit is contained in:
Matei Zaharia 2013-02-17 23:23:08 -08:00
parent 06e5e6627f
commit 7151e1e4c8
34 changed files with 299 additions and 295 deletions

View file

@ -53,7 +53,7 @@ import storage.{StorageStatus, StorageUtils, RDDInfo}
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI.
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
@ -61,7 +61,7 @@ import storage.{StorageStatus, StorageUtils, RDDInfo}
*/
class SparkContext(
val master: String,
val jobName: String,
val appName: String,
val sparkHome: String = null,
val jars: Seq[String] = Nil,
environment: Map[String, String] = Map())
@ -143,7 +143,7 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
scheduler.initialize(backend)
scheduler
@ -162,7 +162,7 @@ class SparkContext(
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
@ -178,9 +178,9 @@ class SparkContext(
val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
val masterWithoutProtocol = master.replaceFirst("^mesos://", "") // Strip initial mesos://
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName)
new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
} else {
new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName)
new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
}
scheduler.initialize(backend)
scheduler

View file

@ -23,41 +23,41 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI
* @param appName A name for your application, to display on the cluster web UI
*/
def this(master: String, jobName: String) = this(new SparkContext(master, jobName))
def this(master: String, appName: String) = this(new SparkContext(master, appName))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
def this(master: String, jobName: String, sparkHome: String, jarFile: String) =
this(new SparkContext(master, jobName, sparkHome, Seq(jarFile)))
def this(master: String, appName: String, sparkHome: String, jarFile: String) =
this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
def this(master: String, jobName: String, sparkHome: String, jars: Array[String]) =
this(new SparkContext(master, jobName, sparkHome, jars.toSeq))
def this(master: String, appName: String, sparkHome: String, jars: Array[String]) =
this(new SparkContext(master, appName, sparkHome, jars.toSeq))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes
*/
def this(master: String, jobName: String, sparkHome: String, jars: Array[String],
def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String]) =
this(new SparkContext(master, jobName, sparkHome, jars.toSeq, environment))
this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment))
private[spark] val env = sc.env

View file

@ -9,7 +9,7 @@ import java.util.Arrays
*
* Stores the unique id() of the Python-side partitioning function so that it is incorporated into
* equality comparisons. Correctness requires that the id is a unique identifier for the
* lifetime of the job (i.e. that it is not re-used as the id of a different partitioning
* lifetime of the program (i.e. that it is not re-used as the id of a different partitioning
* function). This can be ensured by using the Python id() function and maintaining a reference
* to the Python partitioning function so that its id() is not reused.
*/

View file

@ -1,6 +1,6 @@
package spark.deploy
private[spark] class JobDescription(
private[spark] class ApplicationDescription(
val name: String,
val cores: Int,
val memoryPerSlave: Int,
@ -10,5 +10,5 @@ private[spark] class JobDescription(
val user = System.getProperty("user.name", "<unknown>")
override def toString: String = "JobDescription(" + name + ")"
override def toString: String = "ApplicationDescription(" + name + ")"
}

View file

@ -1,7 +1,7 @@
package spark.deploy
import spark.deploy.ExecutorState.ExecutorState
import spark.deploy.master.{WorkerInfo, JobInfo}
import spark.deploy.master.{WorkerInfo, ApplicationInfo}
import spark.deploy.worker.ExecutorRunner
import scala.collection.immutable.List
@ -23,7 +23,7 @@ case class RegisterWorker(
private[spark]
case class ExecutorStateChanged(
jobId: String,
appId: String,
execId: Int,
state: ExecutorState,
message: Option[String],
@ -36,12 +36,12 @@ private[spark] case class Heartbeat(workerId: String) extends DeployMessage
private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage
private[spark] case class KillExecutor(jobId: String, execId: Int) extends DeployMessage
private[spark] case class KillExecutor(appId: String, execId: Int) extends DeployMessage
private[spark] case class LaunchExecutor(
jobId: String,
appId: String,
execId: Int,
jobDesc: JobDescription,
appDesc: ApplicationDescription,
cores: Int,
memory: Int,
sparkHome: String)
@ -49,12 +49,13 @@ private[spark] case class LaunchExecutor(
// Client to Master
private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
private[spark] case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage
// Master to Client
private[spark]
case class RegisteredJob(jobId: String) extends DeployMessage
case class RegisteredApplication(appId: String) extends DeployMessage
private[spark]
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
@ -64,7 +65,7 @@ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String
exitStatus: Option[Int])
private[spark]
case class JobKilled(message: String)
case class appKilled(message: String)
// Internal message in Client
@ -78,7 +79,7 @@ private[spark] case object RequestMasterState
private[spark]
case class MasterState(host: String, port: Int, workers: Array[WorkerInfo],
activeJobs: Array[JobInfo], completedJobs: Array[JobInfo]) {
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
def uri = "spark://" + host + ":" + port
}

View file

@ -1,6 +1,6 @@
package spark.deploy
import master.{JobInfo, WorkerInfo}
import master.{ApplicationInfo, WorkerInfo}
import worker.ExecutorRunner
import cc.spray.json._
@ -20,8 +20,8 @@ private[spark] object JsonProtocol extends DefaultJsonProtocol {
)
}
implicit object JobInfoJsonFormat extends RootJsonWriter[JobInfo] {
def write(obj: JobInfo) = JsObject(
implicit object AppInfoJsonFormat extends RootJsonWriter[ApplicationInfo] {
def write(obj: ApplicationInfo) = JsObject(
"starttime" -> JsNumber(obj.startTime),
"id" -> JsString(obj.id),
"name" -> JsString(obj.desc.name),
@ -31,8 +31,8 @@ private[spark] object JsonProtocol extends DefaultJsonProtocol {
"submitdate" -> JsString(obj.submitDate.toString))
}
implicit object JobDescriptionJsonFormat extends RootJsonWriter[JobDescription] {
def write(obj: JobDescription) = JsObject(
implicit object AppDescriptionJsonFormat extends RootJsonWriter[ApplicationDescription] {
def write(obj: ApplicationDescription) = JsObject(
"name" -> JsString(obj.name),
"cores" -> JsNumber(obj.cores),
"memoryperslave" -> JsNumber(obj.memoryPerSlave),
@ -44,8 +44,8 @@ private[spark] object JsonProtocol extends DefaultJsonProtocol {
def write(obj: ExecutorRunner) = JsObject(
"id" -> JsNumber(obj.execId),
"memory" -> JsNumber(obj.memory),
"jobid" -> JsString(obj.jobId),
"jobdesc" -> obj.jobDesc.toJson.asJsObject
"appid" -> JsString(obj.appId),
"appdesc" -> obj.appDesc.toJson.asJsObject
)
}
@ -57,8 +57,8 @@ private[spark] object JsonProtocol extends DefaultJsonProtocol {
"coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum),
"memory" -> JsNumber(obj.workers.map(_.memory).sum),
"memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum),
"activejobs" -> JsArray(obj.activeJobs.toList.map(_.toJson)),
"completedjobs" -> JsArray(obj.completedJobs.toList.map(_.toJson))
"activeapps" -> JsArray(obj.activeApps.toList.map(_.toJson)),
"completedapps" -> JsArray(obj.completedApps.toList.map(_.toJson))
)
}

View file

@ -8,25 +8,25 @@ import akka.pattern.AskTimeoutException
import spark.{SparkException, Logging}
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
import spark.deploy.RegisterJob
import spark.deploy.RegisterApplication
import spark.deploy.master.Master
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
import akka.dispatch.Await
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
* and a listener for job events, and calls back the listener when various events occur.
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
*/
private[spark] class Client(
actorSystem: ActorSystem,
masterUrl: String,
jobDescription: JobDescription,
appDescription: ApplicationDescription,
listener: ClientListener)
extends Logging {
var actor: ActorRef = null
var jobId: String = null
var appId: String = null
class ClientActor extends Actor with Logging {
var master: ActorRef = null
@ -38,7 +38,7 @@ private[spark] class Client(
try {
master = context.actorFor(Master.toAkkaUrl(masterUrl))
masterAddress = master.path.address
master ! RegisterJob(jobDescription)
master ! RegisterApplication(appDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
@ -50,17 +50,17 @@ private[spark] class Client(
}
override def receive = {
case RegisteredJob(jobId_) =>
jobId = jobId_
listener.connected(jobId)
case RegisteredApplication(appId_) =>
appId = appId_
listener.connected(appId)
case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) =>
val fullId = jobId + "/" + id
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
listener.executorAdded(fullId, workerId, host, cores, memory)
case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = jobId + "/" + id
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {

View file

@ -8,7 +8,7 @@ package spark.deploy.client
* Users of this API should *not* block inside the callback methods.
*/
private[spark] trait ClientListener {
def connected(jobId: String): Unit
def connected(appId: String): Unit
def disconnected(): Unit

View file

@ -2,13 +2,13 @@ package spark.deploy.client
import spark.util.AkkaUtils
import spark.{Logging, Utils}
import spark.deploy.{Command, JobDescription}
import spark.deploy.{Command, ApplicationDescription}
private[spark] object TestClient {
class TestListener extends ClientListener with Logging {
def connected(id: String) {
logInfo("Connected to master, got job ID " + id)
logInfo("Connected to master, got app ID " + id)
}
def disconnected() {
@ -24,7 +24,7 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
val desc = new JobDescription(
val desc = new ApplicationDescription(
"TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home")
val listener = new TestListener
val client = new Client(actorSystem, url, desc, listener)

View file

@ -1,18 +1,18 @@
package spark.deploy.master
import spark.deploy.JobDescription
import spark.deploy.ApplicationDescription
import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
private[spark] class JobInfo(
private[spark] class ApplicationInfo(
val startTime: Long,
val id: String,
val desc: JobDescription,
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef)
{
var state = JobState.WAITING
var state = ApplicationState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L
@ -48,7 +48,7 @@ private[spark] class JobInfo(
_retryCount
}
def markFinished(endState: JobState.Value) {
def markFinished(endState: ApplicationState.Value) {
state = endState
endTime = System.currentTimeMillis()
}

View file

@ -0,0 +1,11 @@
package spark.deploy.master
private[spark] object ApplicationState
extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
type ApplicationState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
val MAX_NUM_RETRY = 10
}

View file

@ -4,12 +4,12 @@ import spark.deploy.ExecutorState
private[spark] class ExecutorInfo(
val id: Int,
val job: JobInfo,
val application: ApplicationInfo,
val worker: WorkerInfo,
val cores: Int,
val memory: Int) {
var state = ExecutorState.LAUNCHING
def fullId: String = job.id + "/" + id
def fullId: String = application.id + "/" + id
}

View file

@ -1,9 +0,0 @@
package spark.deploy.master
private[spark] object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
type JobState = Value
val WAITING, RUNNING, FINISHED, FAILED = Value
val MAX_NUM_RETRY = 10
}

View file

@ -16,22 +16,22 @@ import spark.util.AkkaUtils
private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
var nextJobNumber = 0
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
val actorToWorker = new HashMap[ActorRef, WorkerInfo]
val addressToWorker = new HashMap[Address, WorkerInfo]
val jobs = new HashSet[JobInfo]
val idToJob = new HashMap[String, JobInfo]
val actorToJob = new HashMap[ActorRef, JobInfo]
val addressToJob = new HashMap[Address, JobInfo]
val apps = new HashSet[ApplicationInfo]
val idToApp = new HashMap[String, ApplicationInfo]
val actorToApp = new HashMap[ActorRef, ApplicationInfo]
val addressToApp = new HashMap[Address, ApplicationInfo]
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
@ -39,9 +39,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "false").toBoolean
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
@ -76,41 +76,41 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
case RegisterJob(description) => {
logInfo("Registering job " + description.name)
val job = addJob(description, sender)
logInfo("Registered job " + description.name + " with ID " + job.id)
waitingJobs += job
case RegisterApplication(description) => {
logInfo("Registering app " + description.name)
val app = addApplication(description, sender)
logInfo("Registered app " + description.name + " with ID " + app.id)
waitingApps += app
context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredJob(job.id)
sender ! RegisteredApplication(app.id)
schedule()
}
case ExecutorStateChanged(jobId, execId, state, message, exitStatus) => {
val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
exec.state = state
exec.job.driver ! ExecutorUpdated(execId, state, message, exitStatus)
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
val jobInfo = idToJob(jobId)
// Remove this executor from the worker and job
val appInfo = idToApp(appId)
// Remove this executor from the worker and app
logInfo("Removing executor " + exec.fullId + " because it is " + state)
jobInfo.removeExecutor(exec)
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)
// Only retry certain number of times so we don't go into an infinite loop.
if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) {
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
logError("Job %s with ID %s failed %d times, removing it".format(
jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
removeJob(jobInfo)
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo)
}
}
}
case None =>
logWarning("Got status update for unknown executor " + jobId + "/" + execId)
logWarning("Got status update for unknown executor " + appId + "/" + execId)
}
}
@ -124,53 +124,53 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
case Terminated(actor) => {
// The disconnected actor could've been either a worker or a job; remove whichever of
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToJob.get(actor).foreach(removeJob)
actorToApp.get(actor).foreach(removeApplication)
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or a job; remove whichever it was
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
addressToApp.get(address).foreach(removeApplication)
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or a job; remove whichever it was
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
addressToApp.get(address).foreach(removeApplication)
}
case RequestMasterState => {
sender ! MasterState(ip, port, workers.toArray, jobs.toArray, completedJobs.toArray)
sender ! MasterState(ip, port, workers.toArray, apps.toArray, completedApps.toArray)
}
}
/**
* Can a job use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the job on it (right now the standalone backend doesn't like having
* Can an app use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the app on it (right now the standalone backend doesn't like having
* two executors on the same worker).
*/
def canUse(job: JobInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job)
def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
}
/**
* Schedule the currently available resources among waiting jobs. This method will be called
* every time a new job joins or resource availability changes.
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
def schedule() {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job
// in the queue, then the second job, etc.
if (spreadOutJobs) {
// Try to spread out each job among all the nodes, until it has all its cores
for (job <- waitingJobs if job.coresLeft > 0) {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(job, _)).sortBy(_.coresFree).reverse
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
@ -182,22 +182,22 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec, job.desc.sparkHome)
job.state = JobState.RUNNING
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each job into as few nodes as possible until we've assigned all its cores
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0) {
for (job <- waitingJobs if job.coresLeft > 0) {
if (canUse(job, worker)) {
val coresToUse = math.min(worker.coresFree, job.coresLeft)
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = job.addExecutor(worker, coresToUse)
launchExecutor(worker, exec, job.desc.sparkHome)
job.state = JobState.RUNNING
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec, app.desc.sparkHome)
app.state = ApplicationState.RUNNING
}
}
}
@ -208,8 +208,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory, sparkHome)
exec.job.driver ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
worker.actor ! LaunchExecutor(exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
@ -231,46 +231,46 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
logInfo("Telling job of lost executor: " + exec.id)
exec.job.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.job.removeExecutor(exec)
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
}
def addJob(desc: JobDescription, driver: ActorRef): JobInfo = {
def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
val job = new JobInfo(now, newJobId(date), desc, date, driver)
jobs += job
idToJob(job.id) = job
actorToJob(driver) = job
addressToJob(driver.path.address) = job
return job
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
addressToApp(driver.path.address) = app
return app
}
def removeJob(job: JobInfo) {
if (jobs.contains(job)) {
logInfo("Removing job " + job.id)
jobs -= job
idToJob -= job.id
actorToJob -= job.driver
addressToWorker -= job.driver.path.address
completedJobs += job // Remember it in our history
waitingJobs -= job
for (exec <- job.executors.values) {
def removeApplication(app: ApplicationInfo) {
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
idToApp -= app.id
actorToApp -= app.driver
addressToWorker -= app.driver.path.address
completedApps += app // Remember it in our history
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
}
job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed
app.markFinished(ApplicationState.FINISHED) // TODO: Mark it as FAILED if it failed
schedule()
}
}
/** Generate a new job ID given a job's submission date */
def newJobId(submitDate: Date): String = {
val jobId = "job-%s-%04d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
nextJobNumber += 1
jobId
/** Generate a new app ID given a app's submission date */
def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
nextAppNumber += 1
appId
}
/** Check for, and remove, any timed-out workers */

View file

@ -40,27 +40,27 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
}
}
} ~
path("job") {
parameters("jobId", 'format ?) {
case (jobId, Some(js)) if (js.equalsIgnoreCase("json")) =>
path("app") {
parameters("appId", 'format ?) {
case (appId, Some(js)) if (js.equalsIgnoreCase("json")) =>
val future = master ? RequestMasterState
val jobInfo = for (masterState <- future.mapTo[MasterState]) yield {
masterState.activeJobs.find(_.id == jobId).getOrElse({
masterState.completedJobs.find(_.id == jobId).getOrElse(null)
val appInfo = for (masterState <- future.mapTo[MasterState]) yield {
masterState.activeApps.find(_.id == appId).getOrElse({
masterState.completedApps.find(_.id == appId).getOrElse(null)
})
}
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
ctx.complete(jobInfo.mapTo[JobInfo])
ctx.complete(appInfo.mapTo[ApplicationInfo])
}
case (jobId, _) =>
case (appId, _) =>
completeWith {
val future = master ? RequestMasterState
future.map { state =>
val masterState = state.asInstanceOf[MasterState]
val job = masterState.activeJobs.find(_.id == jobId).getOrElse({
masterState.completedJobs.find(_.id == jobId).getOrElse(null)
val app = masterState.activeApps.find(_.id == appId).getOrElse({
masterState.completedApps.find(_.id == appId).getOrElse(null)
})
spark.deploy.master.html.job_details.render(job)
spark.deploy.master.html.app_details.render(app)
}
}
}

View file

@ -37,8 +37,8 @@ private[spark] class WorkerInfo(
}
}
def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job)
def hasExecutor(app: ApplicationInfo): Boolean = {
executors.values.exists(_.application == app)
}
def webUiAddress : String = {

View file

@ -1,7 +1,7 @@
package spark.deploy.worker
import java.io._
import spark.deploy.{ExecutorState, ExecutorStateChanged, JobDescription}
import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription}
import akka.actor.ActorRef
import spark.{Utils, Logging}
import java.net.{URI, URL}
@ -14,9 +14,9 @@ import spark.deploy.ExecutorStateChanged
* Manages the execution of one executor process.
*/
private[spark] class ExecutorRunner(
val jobId: String,
val appId: String,
val execId: Int,
val jobDesc: JobDescription,
val appDesc: ApplicationDescription,
val cores: Int,
val memory: Int,
val worker: ActorRef,
@ -26,7 +26,7 @@ private[spark] class ExecutorRunner(
val workDir: File)
extends Logging {
val fullId = jobId + "/" + execId
val fullId = appId + "/" + execId
var workerThread: Thread = null
var process: Process = null
var shutdownHook: Thread = null
@ -60,7 +60,7 @@ private[spark] class ExecutorRunner(
process.destroy()
process.waitFor()
}
worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None, None)
worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@ -74,10 +74,10 @@ private[spark] class ExecutorRunner(
}
def buildCommandSeq(): Seq[String] = {
val command = jobDesc.command
val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run";
val command = appDesc.command
val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run"
val runScript = new File(sparkHome, script).getCanonicalPath
Seq(runScript, command.mainClass) ++ (command.arguments ++ Seq(jobId)).map(substituteVariables)
Seq(runScript, command.mainClass) ++ (command.arguments ++ Seq(appId)).map(substituteVariables)
}
/** Spawn a thread that will redirect a given stream to a file */
@ -96,12 +96,12 @@ private[spark] class ExecutorRunner(
}
/**
* Download and run the executor described in our JobDescription
* Download and run the executor described in our ApplicationDescription
*/
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, jobId + "/" + execId)
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
@ -110,7 +110,7 @@ private[spark] class ExecutorRunner(
val command = buildCommandSeq()
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- jobDesc.command.environment) {
for ((key, value) <- appDesc.command.environment) {
env.put(key, value)
}
env.put("SPARK_MEM", memory.toString + "m")
@ -128,7 +128,7 @@ private[spark] class ExecutorRunner(
// times on the same machine.
val exitCode = process.waitFor()
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message),
worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
Some(exitCode))
} catch {
case interrupted: InterruptedException =>
@ -140,7 +140,7 @@ private[spark] class ExecutorRunner(
process.destroy()
}
val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message), None)
worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
}
}
}

View file

@ -109,19 +109,19 @@ private[spark] class Worker(
logError("Worker registration failed: " + message)
System.exit(1)
case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_, execSparkHome_) =>
logInfo("Asked to launch executor %s/%d for %s".format(jobId, execId, jobDesc.name))
case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(
jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, new File(execSparkHome_), workDir)
executors(jobId + "/" + execId) = manager
appId, execId, appDesc, cores_, memory_, self, workerId, ip, new File(execSparkHome_), workDir)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None, None)
master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
case ExecutorStateChanged(jobId, execId, state, message, exitStatus) =>
master ! ExecutorStateChanged(jobId, execId, state, message, exitStatus)
val fullId = jobId + "/" + execId
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
logInfo("Executor " + fullId + " finished with state " + state +
@ -133,8 +133,8 @@ private[spark] class Worker(
memoryUsed -= executor.memory
}
case KillExecutor(jobId, execId) =>
val fullId = jobId + "/" + execId
case KillExecutor(appId, execId) =>
val fullId = appId + "/" + execId
executors.get(fullId) match {
case Some(executor) =>
logInfo("Asked to kill executor " + fullId)

View file

@ -92,7 +92,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
"Options:\n" +
" -c CORES, --cores CORES Number of cores to use\n" +
" -m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)\n" +
" -d DIR, --work-dir DIR Directory to run jobs in (default: SPARK_HOME/work)\n" +
" -d DIR, --work-dir DIR Directory to run apps in (default: SPARK_HOME/work)\n" +
" -i IP, --ip IP IP address or DNS name to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
" --webui-port PORT Port for web UI (default: 8081)")

View file

@ -41,9 +41,9 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
}
} ~
path("log") {
parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
parameters("appId", "executorId", "logType") { (appId, executorId, logType) =>
respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) {
getFromFileName("work/" + jobId + "/" + executorId + "/" + logType)
getFromFileName("work/" + appId + "/" + executorId + "/" + logType)
}
}
} ~

View file

@ -2,14 +2,14 @@ package spark.scheduler.cluster
import spark.{Utils, Logging, SparkContext}
import spark.deploy.client.{Client, ClientListener}
import spark.deploy.{Command, JobDescription}
import spark.deploy.{Command, ApplicationDescription}
import scala.collection.mutable.HashMap
private[spark] class SparkDeploySchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
jobName: String)
appName: String)
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
with ClientListener
with Logging {
@ -29,10 +29,11 @@ private[spark] class SparkDeploySchedulerBackend(
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(throw new IllegalArgumentException("must supply spark home for spark standalone"))
val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, sparkHome)
val sparkHome = sc.getSparkHome().getOrElse(
throw new IllegalArgumentException("must supply spark home for spark standalone"))
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome)
client = new Client(sc.env.actorSystem, master, jobDesc, this)
client = new Client(sc.env.actorSystem, master, appDesc, this)
client.start()
}
@ -45,8 +46,8 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
override def connected(jobId: String) {
logInfo("Connected to Spark cluster with job ID " + jobId)
override def connected(appId: String) {
logInfo("Connected to Spark cluster with app ID " + appId)
}
override def disconnected() {

View file

@ -28,7 +28,7 @@ private[spark] class CoarseMesosSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
frameworkName: String)
appName: String)
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
with MScheduler
with Logging {
@ -76,7 +76,7 @@ private[spark] class CoarseMesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = CoarseMesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try { {
val ret = driver.run()

View file

@ -24,7 +24,7 @@ private[spark] class MesosSchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
master: String,
frameworkName: String)
appName: String)
extends SchedulerBackend
with MScheduler
with Logging {
@ -49,7 +49,7 @@ private[spark] class MesosSchedulerBackend(
setDaemon(true)
override def run() {
val scheduler = MesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
try {
val ret = driver.run()

View file

@ -0,0 +1,40 @@
@(app: spark.deploy.master.ApplicationInfo)
@spark.common.html.layout(title = "Application Details") {
<!-- Application Details -->
<div class="row">
<div class="span12">
<ul class="unstyled">
<li><strong>ID:</strong> @app.id</li>
<li><strong>Description:</strong> @app.desc.name</li>
<li><strong>User:</strong> @app.desc.user</li>
<li><strong>Cores:</strong>
@app.desc.cores
(@app.coresGranted Granted
@if(app.desc.cores == Integer.MAX_VALUE) {
} else {
, @app.coresLeft
}
)
</li>
<li><strong>Memory per Slave:</strong> @app.desc.memoryPerSlave</li>
<li><strong>Submit Date:</strong> @app.submitDate</li>
<li><strong>State:</strong> @app.state</li>
</ul>
</div>
</div>
<hr/>
<!-- Executors -->
<div class="row">
<div class="span12">
<h3> Executor Summary </h3>
<br/>
@executors_table(app.executors.values.toList)
</div>
</div>
}

View file

@ -0,0 +1,20 @@
@(app: spark.deploy.master.ApplicationInfo)
@import spark.Utils
@import spark.deploy.WebUI.formatDate
@import spark.deploy.WebUI.formatDuration
<tr>
<td>
<a href="app?appId=@(app.id)">@app.id</a>
</td>
<td>@app.desc.name</td>
<td>
@app.coresGranted
</td>
<td>@Utils.memoryMegabytesToString(app.desc.memoryPerSlave)</td>
<td>@formatDate(app.submitDate)</td>
<td>@app.desc.user</td>
<td>@app.state.toString()</td>
<td>@formatDuration(app.duration)</td>
</tr>

View file

@ -1,9 +1,9 @@
@(jobs: Array[spark.deploy.master.JobInfo])
@(apps: Array[spark.deploy.master.ApplicationInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<tr>
<th>JobID</th>
<th>ID</th>
<th>Description</th>
<th>Cores</th>
<th>Memory per Node</th>
@ -14,8 +14,8 @@
</tr>
</thead>
<tbody>
@for(j <- jobs) {
@job_row(j)
@for(j <- apps) {
@app_row(j)
}
</tbody>
</table>

View file

@ -9,7 +9,7 @@
<td>@executor.memory</td>
<td>@executor.state</td>
<td>
<a href="@(executor.worker.webUiAddress)/log?jobId=@(executor.job.id)&executorId=@(executor.id)&logType=stdout">stdout</a>
<a href="@(executor.worker.webUiAddress)/log?jobId=@(executor.job.id)&executorId=@(executor.id)&logType=stderr">stderr</a>
<a href="@(executor.worker.webUiAddress)/log?appId=@(executor.application.id)&executorId=@(executor.id)&logType=stdout">stdout</a>
<a href="@(executor.worker.webUiAddress)/log?appId=@(executor.application.id)&executorId=@(executor.id)&logType=stderr">stderr</a>
</td>
</tr>
</tr>

View file

@ -14,7 +14,7 @@
@{state.workers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
@{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Jobs:</strong> @state.activeJobs.size Running, @state.completedJobs.size Completed </li>
<li><strong>Applications:</strong> @state.activeApps.size Running, @state.completedApps.size Completed </li>
</ul>
</div>
</div>
@ -22,7 +22,7 @@
<!-- Worker Summary -->
<div class="row">
<div class="span12">
<h3> Cluster Summary </h3>
<h3> Workers </h3>
<br/>
@worker_table(state.workers.sortBy(_.id))
</div>
@ -30,23 +30,23 @@
<hr/>
<!-- Job Summary (Running) -->
<!-- App Summary (Running) -->
<div class="row">
<div class="span12">
<h3> Running Jobs </h3>
<h3> Running Applications </h3>
<br/>
@job_table(state.activeJobs.sortBy(_.startTime).reverse)
@app_table(state.activeApps.sortBy(_.startTime).reverse)
</div>
</div>
<hr/>
<!-- Job Summary (Completed) -->
<!-- App Summary (Completed) -->
<div class="row">
<div class="span12">
<h3> Completed Jobs </h3>
<h3> Completed Applications </h3>
<br/>
@job_table(state.completedJobs.sortBy(_.endTime).reverse)
@app_table(state.completedApps.sortBy(_.endTime).reverse)
</div>
</div>

View file

@ -1,40 +0,0 @@
@(job: spark.deploy.master.JobInfo)
@spark.common.html.layout(title = "Job Details") {
<!-- Job Details -->
<div class="row">
<div class="span12">
<ul class="unstyled">
<li><strong>ID:</strong> @job.id</li>
<li><strong>Description:</strong> @job.desc.name</li>
<li><strong>User:</strong> @job.desc.user</li>
<li><strong>Cores:</strong>
@job.desc.cores
(@job.coresGranted Granted
@if(job.desc.cores == Integer.MAX_VALUE) {
} else {
, @job.coresLeft
}
)
</li>
<li><strong>Memory per Slave:</strong> @job.desc.memoryPerSlave</li>
<li><strong>Submit Date:</strong> @job.submitDate</li>
<li><strong>State:</strong> @job.state</li>
</ul>
</div>
</div>
<hr/>
<!-- Executors -->
<div class="row">
<div class="span12">
<h3> Executor Summary </h3>
<br/>
@executors_table(job.executors.values.toList)
</div>
</div>
}

View file

@ -1,20 +0,0 @@
@(job: spark.deploy.master.JobInfo)
@import spark.Utils
@import spark.deploy.WebUI.formatDate
@import spark.deploy.WebUI.formatDuration
<tr>
<td>
<a href="job?jobId=@(job.id)">@job.id</a>
</td>
<td>@job.desc.name</td>
<td>
@job.coresGranted
</td>
<td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
<td>@formatDate(job.submitDate)</td>
<td>@job.desc.user</td>
<td>@job.state.toString()</td>
<td>@formatDuration(job.duration)</td>
</tr>

View file

@ -8,13 +8,13 @@
<td>@Utils.memoryMegabytesToString(executor.memory)</td>
<td>
<ul class="unstyled">
<li><strong>ID:</strong> @executor.jobId</li>
<li><strong>Name:</strong> @executor.jobDesc.name</li>
<li><strong>User:</strong> @executor.jobDesc.user</li>
<li><strong>ID:</strong> @executor.appId</li>
<li><strong>Name:</strong> @executor.appDesc.name</li>
<li><strong>User:</strong> @executor.appDesc.user</li>
</ul>
</td>
<td>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stdout">stdout</a>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stderr">stderr</a>
<a href="log?appId=@(executor.appId)&executorId=@(executor.execId)&logType=stdout">stdout</a>
<a href="log?appId=@(executor.appId)&executorId=@(executor.execId)&logType=stderr">stderr</a>
</td>
</tr>

View file

@ -12,7 +12,7 @@ private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.jobName
val framework = ssc.sc.appName
val sparkHome = ssc.sc.sparkHome
val jars = ssc.sc.jars
val graph = ssc.graph

View file

@ -39,11 +39,11 @@ class StreamingContext private (
/**
* Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param frameworkName A name for your job, to display on the cluster web UI
* @param appName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(master: String, frameworkName: String, batchDuration: Duration) =
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
def this(master: String, appName: String, batchDuration: Duration) =
this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration)
/**
* Re-creates a StreamingContext from a checkpoint file.
@ -384,14 +384,14 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
MetadataCleaner.setDelaySeconds(3600)
}
new SparkContext(master, frameworkName)
new SparkContext(master, appName)
}
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {

View file

@ -27,11 +27,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Creates a StreamingContext.
* @param master Name of the Spark Master
* @param frameworkName Name to be used when registering with the scheduler
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(master: String, frameworkName: String, batchDuration: Duration) =
this(new StreamingContext(master, frameworkName, batchDuration))
def this(master: String, appName: String, batchDuration: Duration) =
this(new StreamingContext(master, appName, batchDuration))
/**
* Creates a StreamingContext.