diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 2836574ecb..22319a96ca 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -18,35 +18,23 @@ import scala.collection.mutable.ArrayBuffer private[spark] class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging { - val localIpAddress = Utils.localIpAddress + private val localIpAddress = Utils.localIpAddress + private val masterActorSystems = ArrayBuffer[ActorSystem]() + private val workerActorSystems = ArrayBuffer[ActorSystem]() - var masterActor : ActorRef = _ - var masterActorSystem : ActorSystem = _ - var masterPort : Int = _ - var masterUrl : String = _ - - val workerActorSystems = ArrayBuffer[ActorSystem]() - val workerActors = ArrayBuffer[ActorRef]() - - def start() : String = { + def start(): String = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ - val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) - masterActorSystem = actorSystem - masterUrl = "spark://" + localIpAddress + ":" + masterPort - masterActor = masterActorSystem.actorOf( - Props(new Master(localIpAddress, masterPort, 0)), name = "Master") + val (masterSystem, masterPort) = Master.startSystemAndActor(localIpAddress, 0, 0) + masterActorSystems += masterSystem + val masterUrl = "spark://" + localIpAddress + ":" + masterPort - /* Start the Slaves */ + /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0) - workerActorSystems += actorSystem - val actor = actorSystem.actorOf( - Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)), - name = "Worker") - workerActors += actor + val (workerSystem, _) = Worker.startSystemAndActor(localIpAddress, 0, 0, coresPerWorker, + memoryPerWorker, masterUrl, null, Some(workerNum)) + workerActorSystems += workerSystem } return masterUrl @@ -57,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I // Stop the workers before the master so they don't get upset that it disconnected workerActorSystems.foreach(_.shutdown()) workerActorSystems.foreach(_.awaitTermination()) - masterActorSystem.shutdown() - masterActorSystem.awaitTermination() + masterActorSystems.foreach(_.shutdown()) + masterActorSystems.foreach(_.awaitTermination()) } } diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 90fe9508cd..a63eee1233 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -9,6 +9,7 @@ import spark.{SparkException, Logging} import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown import spark.deploy.RegisterJob +import spark.deploy.master.Master import akka.remote.RemoteClientDisconnected import akka.actor.Terminated import akka.dispatch.Await @@ -24,26 +25,18 @@ private[spark] class Client( listener: ClientListener) extends Logging { - val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r - var actor: ActorRef = null var jobId: String = null - if (MASTER_REGEX.unapplySeq(masterUrl) == None) { - throw new SparkException("Invalid master URL: " + masterUrl) - } - class ClientActor extends Actor with Logging { var master: ActorRef = null var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times override def preStart() { - val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get - logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) - val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) + logInfo("Connecting to master " + masterUrl) try { - master = context.actorFor(akkaUrl) + master = context.actorFor(Master.toAkkaUrl(masterUrl)) masterAddress = master.path.address master ! RegisterJob(jobDescription) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index c618e87cdd..92e7914b1b 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -262,11 +262,29 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } private[spark] object Master { + private val systemName = "sparkMaster" + private val actorName = "Master" + private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r + def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) - val actor = actorSystem.actorOf( - Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master") + val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort) actorSystem.awaitTermination() } + + /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ + def toAkkaUrl(sparkUrl: String): String = { + sparkUrl match { + case sparkUrlRegex(host, port) => + "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + case _ => + throw new SparkException("Invalid master URL: " + sparkUrl) + } + } + + def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) + val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) + (actorSystem, boundPort) + } } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 8b41620d98..2219dd6262 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -1,7 +1,7 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} -import akka.actor.{ActorRef, Props, Actor} +import akka.actor.{ActorRef, Props, Actor, ActorSystem} import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ @@ -13,6 +13,7 @@ import akka.remote.RemoteClientDisconnected import spark.deploy.RegisterWorker import spark.deploy.LaunchExecutor import spark.deploy.RegisterWorkerFailed +import spark.deploy.master.Master import akka.actor.Terminated import java.io.File @@ -27,7 +28,6 @@ private[spark] class Worker( extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs - val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r var master: ActorRef = null var masterWebUiUrl : String = "" @@ -48,11 +48,7 @@ private[spark] class Worker( def memoryFree: Int = memory - memoryUsed def createWorkDir() { - workDir = if (workDirPath != null) { - new File(workDirPath) - } else { - new File(sparkHome, "work") - } + workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { if (!workDir.exists() && !workDir.mkdirs()) { logError("Failed to create work directory " + workDir) @@ -68,8 +64,7 @@ private[spark] class Worker( override def preStart() { logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( ip, port, cores, Utils.memoryMegabytesToString(memory))) - val envVar = System.getenv("SPARK_HOME") - sparkHome = new File(if (envVar == null) "." else envVar) + sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() connectToMaster() @@ -77,24 +72,15 @@ private[spark] class Worker( } def connectToMaster() { - masterUrl match { - case MASTER_REGEX(masterHost, masterPort) => { - logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) - val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) - try { - master = context.actorFor(akkaUrl) - master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing - } catch { - case e: Exception => - logError("Failed to connect to master", e) - System.exit(1) - } - } - - case _ => - logError("Invalid master URL: " + masterUrl) + logInfo("Connecting to master " + masterUrl) + try { + master = context.actorFor(Master.toAkkaUrl(masterUrl)) + master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } catch { + case e: Exception => + logError("Failed to connect to master", e) System.exit(1) } } @@ -183,11 +169,19 @@ private[spark] class Worker( private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) - val actor = actorSystem.actorOf( - Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory, - args.master, args.workDir)), - name = "Worker") + val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort, args.cores, + args.memory, args.master, args.workDir) actorSystem.awaitTermination() } + + def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, + masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + // The LocalSparkCluster runs multiple local sparkWorkerX actor systems + val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") + val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) + val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, + masterUrl, workDir)), name = "Worker") + (actorSystem, boundPort) + } + } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 36398095a2..7be6b9fa87 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster( val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager" - val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager" - val DEFAULT_MANAGER_IP: String = Utils.localHostName() val timeout = 10.seconds var driverActor: ActorRef = { diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index e0fdeffbc4..3a3626e8a0 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -18,9 +18,13 @@ import java.util.concurrent.TimeoutException * Various utility classes for working with Akka. */ private[spark] object AkkaUtils { + /** * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the * ActorSystem itself and its port (which is hard to get from Akka). + * + * Note: the `name` parameter is important, as even if a client sends a message to right + * host + port, if the system name is incorrect, Akka will drop the message. */ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt @@ -41,7 +45,7 @@ private[spark] object AkkaUtils { akka.actor.default-dispatcher.throughput = %d """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize)) - val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) + val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader) // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a // hack because Akka doesn't let you figure out the port through the public API yet.