Merge pull request #316 from JoshRosen/fix/ec2-web-ui-links

Use external addresses in standalone web UI when running on EC2
This commit is contained in:
Matei Zaharia 2012-12-04 15:34:27 -08:00
commit ddf6cd012c
8 changed files with 58 additions and 15 deletions

View file

@ -7,4 +7,13 @@ bin=`cd "$bin"; pwd`
. "$bin/spark-config.sh"
"$bin"/spark-daemon.sh start spark.deploy.master.Master
# Set SPARK_PUBLIC_DNS so the master report the correct webUI address to the slaves
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then
echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
"$bin"/spark-daemon.sh start spark.deploy.master.Master

15
bin/start-slave.sh Executable file
View file

@ -0,0 +1,15 @@
#!/usr/bin/env bash
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# Set SPARK_PUBLIC_DNS so slaves can be linked in master web UI
if [ "$SPARK_PUBLIC_DNS" = "" ]; then
# If we appear to be running on EC2, use the public address by default:
if [[ `hostname` == *ec2.internal ]]; then
echo "RUNNING ON EC2"
export SPARK_PUBLIC_DNS=`wget -q -O - http://instance-data.ec2.internal/latest/meta-data/public-hostname`
fi
fi
"$bin"/spark-daemon.sh start spark.deploy.worker.Worker $1

View file

@ -18,6 +18,7 @@ if [ "$SPARK_MASTER_IP" = "" ]; then
SPARK_MASTER_IP=`hostname`
fi
echo "Master IP: $ip"
echo "Master IP: $SPARK_MASTER_IP"
"$bin"/spark-daemons.sh start spark.deploy.worker.Worker spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
# Launch the slaves
exec "$bin/slaves.sh" cd "$SPARK_HOME" \; "$bin/start-slave.sh" spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT

View file

@ -11,8 +11,15 @@ private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
private[spark]
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
private[spark]
case class RegisterWorker(
id: String,
host: String,
port: Int,
cores: Int,
memory: Int,
webUiPort: Int,
publicAddress: String)
extends DeployMessage
private[spark]

View file

@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
}
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each job
// among all the nodes) instead of trying to consolidate each job onto a small # of nodes.
@ -55,15 +60,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
override def receive = {
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort) => {
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
addWorker(id, host, workerPort, cores, memory, worker_webUiPort)
addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredWorker("http://" + ip + ":" + webUiPort)
sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
schedule()
}
}
@ -196,8 +201,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort)
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker

View file

@ -10,7 +10,8 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val actor: ActorRef,
val webUiPort: Int) {
val webUiPort: Int,
val publicAddress: String) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
@ -37,8 +38,8 @@ private[spark] class WorkerInfo(
def hasExecutor(job: JobInfo): Boolean = {
executors.values.exists(_.job == job)
}
def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort
"http://" + this.publicAddress + ":" + this.webUiPort
}
}

View file

@ -36,6 +36,10 @@ private[spark] class Worker(
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
val publicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
}
var coresUsed = 0
var memoryUsed = 0
@ -79,7 +83,7 @@ private[spark] class Worker(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort)
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {

View file

@ -4,7 +4,7 @@
<tr>
<td>
<a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
<a href="@worker.webUiAddress">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
<td>@worker.cores (@worker.coresUsed Used)</td>