Fix createActorSystem not actually using the systemName parameter.
This meant all system names were "spark", which worked, but didn't lead to the most intuitive log output. This fixes createActorSystem to use the passed system name, and refactors Master/Worker to encapsulate their system/actor names instead of having the clients guess at them. Note that the driver system name, "spark", is left as is, and is still repeated a few times, but that seems like a separate issue.
This commit is contained in:
parent
8b3041c723
commit
28e0cb9f31
|
@ -18,35 +18,23 @@ import scala.collection.mutable.ArrayBuffer
|
|||
private[spark]
|
||||
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
|
||||
|
||||
val localIpAddress = Utils.localIpAddress
|
||||
private val localIpAddress = Utils.localIpAddress
|
||||
private val masterActorSystems = ArrayBuffer[ActorSystem]()
|
||||
private val workerActorSystems = ArrayBuffer[ActorSystem]()
|
||||
|
||||
var masterActor : ActorRef = _
|
||||
var masterActorSystem : ActorSystem = _
|
||||
var masterPort : Int = _
|
||||
var masterUrl : String = _
|
||||
|
||||
val workerActorSystems = ArrayBuffer[ActorSystem]()
|
||||
val workerActors = ArrayBuffer[ActorRef]()
|
||||
|
||||
def start() : String = {
|
||||
def start(): String = {
|
||||
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
|
||||
|
||||
/* Start the Master */
|
||||
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
|
||||
masterActorSystem = actorSystem
|
||||
masterUrl = "spark://" + localIpAddress + ":" + masterPort
|
||||
masterActor = masterActorSystem.actorOf(
|
||||
Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
|
||||
val (masterSystem, masterPort) = Master.startSystemAndActor(localIpAddress, 0, 0)
|
||||
masterActorSystems += masterSystem
|
||||
val masterUrl = "spark://" + localIpAddress + ":" + masterPort
|
||||
|
||||
/* Start the Slaves */
|
||||
/* Start the Workers */
|
||||
for (workerNum <- 1 to numWorkers) {
|
||||
val (actorSystem, boundPort) =
|
||||
AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
|
||||
workerActorSystems += actorSystem
|
||||
val actor = actorSystem.actorOf(
|
||||
Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
|
||||
name = "Worker")
|
||||
workerActors += actor
|
||||
val (workerSystem, _) = Worker.startSystemAndActor(localIpAddress, 0, 0, coresPerWorker,
|
||||
memoryPerWorker, masterUrl, null, Some(workerNum))
|
||||
workerActorSystems += workerSystem
|
||||
}
|
||||
|
||||
return masterUrl
|
||||
|
@ -57,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
|
|||
// Stop the workers before the master so they don't get upset that it disconnected
|
||||
workerActorSystems.foreach(_.shutdown())
|
||||
workerActorSystems.foreach(_.awaitTermination())
|
||||
masterActorSystem.shutdown()
|
||||
masterActorSystem.awaitTermination()
|
||||
masterActorSystems.foreach(_.shutdown())
|
||||
masterActorSystems.foreach(_.awaitTermination())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import spark.{SparkException, Logging}
|
|||
import akka.remote.RemoteClientLifeCycleEvent
|
||||
import akka.remote.RemoteClientShutdown
|
||||
import spark.deploy.RegisterJob
|
||||
import spark.deploy.master.Master
|
||||
import akka.remote.RemoteClientDisconnected
|
||||
import akka.actor.Terminated
|
||||
import akka.dispatch.Await
|
||||
|
@ -24,26 +25,18 @@ private[spark] class Client(
|
|||
listener: ClientListener)
|
||||
extends Logging {
|
||||
|
||||
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
|
||||
|
||||
var actor: ActorRef = null
|
||||
var jobId: String = null
|
||||
|
||||
if (MASTER_REGEX.unapplySeq(masterUrl) == None) {
|
||||
throw new SparkException("Invalid master URL: " + masterUrl)
|
||||
}
|
||||
|
||||
class ClientActor extends Actor with Logging {
|
||||
var master: ActorRef = null
|
||||
var masterAddress: Address = null
|
||||
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
|
||||
|
||||
override def preStart() {
|
||||
val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get
|
||||
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
|
||||
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
|
||||
logInfo("Connecting to master " + masterUrl)
|
||||
try {
|
||||
master = context.actorFor(akkaUrl)
|
||||
master = context.actorFor(Master.toAkkaUrl(masterUrl))
|
||||
masterAddress = master.path.address
|
||||
master ! RegisterJob(jobDescription)
|
||||
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
|
||||
|
|
|
@ -262,11 +262,29 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
|
|||
}
|
||||
|
||||
private[spark] object Master {
|
||||
private val systemName = "sparkMaster"
|
||||
private val actorName = "Master"
|
||||
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
|
||||
|
||||
def main(argStrings: Array[String]) {
|
||||
val args = new MasterArguments(argStrings)
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
|
||||
val actor = actorSystem.actorOf(
|
||||
Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")
|
||||
val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort)
|
||||
actorSystem.awaitTermination()
|
||||
}
|
||||
|
||||
/** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
|
||||
def toAkkaUrl(sparkUrl: String): String = {
|
||||
sparkUrl match {
|
||||
case sparkUrlRegex(host, port) =>
|
||||
"akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
|
||||
case _ =>
|
||||
throw new SparkException("Invalid master URL: " + sparkUrl)
|
||||
}
|
||||
}
|
||||
|
||||
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
|
||||
val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
|
||||
(actorSystem, boundPort)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package spark.deploy.worker
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, HashMap}
|
||||
import akka.actor.{ActorRef, Props, Actor}
|
||||
import akka.actor.{ActorRef, Props, Actor, ActorSystem}
|
||||
import spark.{Logging, Utils}
|
||||
import spark.util.AkkaUtils
|
||||
import spark.deploy._
|
||||
|
@ -13,6 +13,7 @@ import akka.remote.RemoteClientDisconnected
|
|||
import spark.deploy.RegisterWorker
|
||||
import spark.deploy.LaunchExecutor
|
||||
import spark.deploy.RegisterWorkerFailed
|
||||
import spark.deploy.master.Master
|
||||
import akka.actor.Terminated
|
||||
import java.io.File
|
||||
|
||||
|
@ -27,7 +28,6 @@ private[spark] class Worker(
|
|||
extends Actor with Logging {
|
||||
|
||||
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
|
||||
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
|
||||
|
||||
var master: ActorRef = null
|
||||
var masterWebUiUrl : String = ""
|
||||
|
@ -48,11 +48,7 @@ private[spark] class Worker(
|
|||
def memoryFree: Int = memory - memoryUsed
|
||||
|
||||
def createWorkDir() {
|
||||
workDir = if (workDirPath != null) {
|
||||
new File(workDirPath)
|
||||
} else {
|
||||
new File(sparkHome, "work")
|
||||
}
|
||||
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
|
||||
try {
|
||||
if (!workDir.exists() && !workDir.mkdirs()) {
|
||||
logError("Failed to create work directory " + workDir)
|
||||
|
@ -68,8 +64,7 @@ private[spark] class Worker(
|
|||
override def preStart() {
|
||||
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
|
||||
ip, port, cores, Utils.memoryMegabytesToString(memory)))
|
||||
val envVar = System.getenv("SPARK_HOME")
|
||||
sparkHome = new File(if (envVar == null) "." else envVar)
|
||||
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
|
||||
logInfo("Spark home: " + sparkHome)
|
||||
createWorkDir()
|
||||
connectToMaster()
|
||||
|
@ -77,24 +72,15 @@ private[spark] class Worker(
|
|||
}
|
||||
|
||||
def connectToMaster() {
|
||||
masterUrl match {
|
||||
case MASTER_REGEX(masterHost, masterPort) => {
|
||||
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
|
||||
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
|
||||
try {
|
||||
master = context.actorFor(akkaUrl)
|
||||
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
|
||||
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
|
||||
context.watch(master) // Doesn't work with remote actors, but useful for testing
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logError("Failed to connect to master", e)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
case _ =>
|
||||
logError("Invalid master URL: " + masterUrl)
|
||||
logInfo("Connecting to master " + masterUrl)
|
||||
try {
|
||||
master = context.actorFor(Master.toAkkaUrl(masterUrl))
|
||||
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
|
||||
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
|
||||
context.watch(master) // Doesn't work with remote actors, but useful for testing
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logError("Failed to connect to master", e)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
@ -183,11 +169,19 @@ private[spark] class Worker(
|
|||
private[spark] object Worker {
|
||||
def main(argStrings: Array[String]) {
|
||||
val args = new WorkerArguments(argStrings)
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
|
||||
val actor = actorSystem.actorOf(
|
||||
Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory,
|
||||
args.master, args.workDir)),
|
||||
name = "Worker")
|
||||
val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort, args.cores,
|
||||
args.memory, args.master, args.workDir)
|
||||
actorSystem.awaitTermination()
|
||||
}
|
||||
|
||||
def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
|
||||
masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
|
||||
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
|
||||
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
|
||||
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
|
||||
val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
|
||||
masterUrl, workDir)), name = "Worker")
|
||||
(actorSystem, boundPort)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster(
|
|||
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
|
||||
|
||||
val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
|
||||
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
|
||||
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
|
||||
|
||||
val timeout = 10.seconds
|
||||
var driverActor: ActorRef = {
|
||||
|
|
|
@ -18,9 +18,13 @@ import java.util.concurrent.TimeoutException
|
|||
* Various utility classes for working with Akka.
|
||||
*/
|
||||
private[spark] object AkkaUtils {
|
||||
|
||||
/**
|
||||
* Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
|
||||
* ActorSystem itself and its port (which is hard to get from Akka).
|
||||
*
|
||||
* Note: the `name` parameter is important, as even if a client sends a message to right
|
||||
* host + port, if the system name is incorrect, Akka will drop the message.
|
||||
*/
|
||||
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
|
||||
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
|
||||
|
@ -41,7 +45,7 @@ private[spark] object AkkaUtils {
|
|||
akka.actor.default-dispatcher.throughput = %d
|
||||
""".format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize))
|
||||
|
||||
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
|
||||
val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader)
|
||||
|
||||
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
|
||||
// hack because Akka doesn't let you figure out the port through the public API yet.
|
||||
|
|
Loading…
Reference in a new issue