Initial framework to get a master and web UI up.

This commit is contained in:
Matei Zaharia 2012-06-30 14:45:55 -07:00
parent c53670b9bf
commit 2fb6e7d71e
11 changed files with 222 additions and 29 deletions

View file

@ -15,7 +15,6 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) { def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val ser = SparkEnv.get.serializer.newInstance()
val blockManager = SparkEnv.get.blockManager val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis

View file

@ -1,14 +1,11 @@
package spark package spark
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.ActorSystemImpl
import akka.remote.RemoteActorRefProvider
import com.typesafe.config.ConfigFactory
import spark.storage.BlockManager import spark.storage.BlockManager
import spark.storage.BlockManagerMaster import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager import spark.network.ConnectionManager
import spark.util.AkkaUtils
class SparkEnv ( class SparkEnv (
val actorSystem: ActorSystem, val actorSystem: ActorSystem,
@ -45,24 +42,12 @@ object SparkEnv {
isLocal: Boolean isLocal: Boolean
) : SparkEnv = { ) : SparkEnv = {
val akkaConf = ConfigFactory.parseString(""" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
""".format(hostname, port))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port), // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.master.port to it. // figure out which port number Akka actually bound to and set spark.master.port to it.
// Unfortunately Akka doesn't yet provide an API for this except if you cast objects as below.
if (isMaster && port == 0) { if (isMaster && port == 0) {
val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider System.setProperty("spark.master.port", boundPort.toString)
val port = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
System.setProperty("spark.master.port", port.toString)
} }
val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer") val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer")

View file

@ -205,12 +205,15 @@ object Utils {
* example, 4,000,000 is returned as 4MB. * example, 4,000,000 is returned as 4MB.
*/ */
def memoryBytesToString(size: Long): String = { def memoryBytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30 val GB = 1L << 30
val MB = 1L << 20 val MB = 1L << 20
val KB = 1L << 10 val KB = 1L << 10
val (value, unit) = { val (value, unit) = {
if (size >= 2*GB) { if (size >= 2*TB) {
(size.asInstanceOf[Double] / TB, "TB")
} else if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB") (size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2*MB) { } else if (size >= 2*MB) {
(size.asInstanceOf[Double] / MB, "MB") (size.asInstanceOf[Double] / MB, "MB")
@ -220,6 +223,6 @@ object Utils {
(size.asInstanceOf[Double], "B") (size.asInstanceOf[Double], "B")
} }
} }
"%.1f%s".formatLocal(Locale.US, value, unit) "%.1f %s".formatLocal(Locale.US, value, unit)
} }
} }

View file

@ -0,0 +1,44 @@
package spark.deploy
import akka.actor.{ActorRef, Props, Actor, ActorSystem}
import spark.{Logging, Utils}
import scala.collection.immutable.{::, Nil}
import spark.util.{AkkaUtils, IntParam}
import cc.spray.Directives
sealed trait MasterMessage
case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends MasterMessage
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
startWebUi()
}
def startWebUi() {
val webUi = new MasterWebUI(context.system, self)
try {
AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
} catch {
case e: Exception =>
logError("Failed to create web UI", e)
System.exit(1)
}
}
override def receive = {
case RegisterSlave(host, slavePort, cores, memory) =>
logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
host, slavePort, cores, Utils.memoryBytesToString(memory * 1024L)))
}
}
object Master {
def main(args: Array[String]) {
val params = new MasterArguments(args)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", params.ip, params.port)
val actor = actorSystem.actorOf(
Props(new Master(params.ip, boundPort, params.webUiPort)), name = "Master")
actorSystem.awaitTermination()
}
}

View file

@ -0,0 +1,51 @@
package spark.deploy
import spark.util.IntParam
import spark.Utils
/**
* Command-line parser for the master.
*/
class MasterArguments(args: Array[String]) {
var ip: String = Utils.localIpAddress()
var port: Int = 7077
var webUiPort: Int = 8080
parse(args.toList)
def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
ip = value
parse(tail)
case ("--port" | "-p") :: IntParam(value) :: tail =>
port = value
parse(tail)
case "--webui-port" :: IntParam(value) :: tail =>
webUiPort = value
parse(tail)
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)
case Nil => {}
case _ =>
printUsageAndExit(1)
}
/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
System.err.println(
"Usage: spark-master [options]\n" +
"\n" +
"Options:\n" +
" -i IP, --ip IP IP address or DNS name to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)\n")
System.exit(exitCode)
}
}

View file

@ -0,0 +1,12 @@
package spark.deploy
import akka.actor.{ActorRef, ActorSystem}
import cc.spray.Directives
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val handler = {
path("") {
get { _.complete("Hello world!") }
}
}
}

View file

@ -0,0 +1,10 @@
package spark.deploy
class Worker(cores: Int, memoryMb: Int) {
}
object Worker {
def main(args: Array[String]) {
}
}

View file

@ -0,0 +1,71 @@
package spark.util
import akka.actor.{Props, ActorSystemImpl, ActorSystem}
import com.typesafe.config.ConfigFactory
import akka.util.duration._
import akka.pattern.ask
import akka.remote.RemoteActorRefProvider
import cc.spray.Route
import cc.spray.io.IoWorker
import cc.spray.{SprayCanRootService, HttpService}
import cc.spray.can.server.HttpServer
import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler
import akka.dispatch.Await
import spark.SparkException
import java.util.concurrent.TimeoutException
/**
* Various utility classes for working with Akka.
*/
object AkkaUtils {
/**
* Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
* ActorSystem itself and its port (which is hard to get from Akka).
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
""".format(host, port))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.
val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
return (actorSystem, boundPort)
return (null, 0)
}
/**
* Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
* handle requests. Throws a SparkException if this fails.
*/
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) {
val ioWorker = new IoWorker(actorSystem).start()
val httpService = actorSystem.actorOf(Props(new HttpService(route)))
val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
val server = actorSystem.actorOf(
Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
actorSystem.registerOnTermination { ioWorker.stop() }
val timeout = 1.seconds
val future = server.ask(HttpServer.Bind(ip, port))(timeout)
try {
Await.result(future, timeout) match {
case bound: HttpServer.Bound =>
return
case other: Any =>
throw new SparkException("Failed to bind web UI to port " + port + ": " + other)
}
} catch {
case e: TimeoutException =>
throw new SparkException("Failed to bind web UI to port " + port)
}
}
}

View file

@ -0,0 +1,14 @@
package spark.util
/**
* An extractor object for parsing strings into integers.
*/
object IntParam {
def unapply(str: String): Option[Int] = {
try {
Some(str.toInt)
} catch {
case e: NumberFormatException => None
}
}
}

View file

@ -7,12 +7,13 @@ import scala.util.Random
class UtilsSuite extends FunSuite { class UtilsSuite extends FunSuite {
test("memoryBytesToString") { test("memoryBytesToString") {
assert(Utils.memoryBytesToString(10) === "10.0B") assert(Utils.memoryBytesToString(10) === "10.0 B")
assert(Utils.memoryBytesToString(1500) === "1500.0B") assert(Utils.memoryBytesToString(1500) === "1500.0 B")
assert(Utils.memoryBytesToString(2000000) === "1953.1KB") assert(Utils.memoryBytesToString(2000000) === "1953.1 KB")
assert(Utils.memoryBytesToString(2097152) === "2.0MB") assert(Utils.memoryBytesToString(2097152) === "2.0 MB")
assert(Utils.memoryBytesToString(2306867) === "2.2MB") assert(Utils.memoryBytesToString(2306867) === "2.2 MB")
assert(Utils.memoryBytesToString(5368709120L) === "5.0GB") assert(Utils.memoryBytesToString(5368709120L) === "5.0 GB")
assert(Utils.memoryBytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
} }
test("copyStream") { test("copyStream") {

View file

@ -46,7 +46,8 @@ object SparkBuild extends Build {
resolvers ++= Seq( resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/" "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/",
"Spray Repository" at "http://repo.spray.cc/"
), ),
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "11.0.1", "com.google.guava" % "guava" % "11.0.1",
@ -63,7 +64,9 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.2", "com.typesafe.akka" % "akka-slf4j" % "2.0.2",
"org.jboss.netty" % "netty" % "3.2.6.Final", "org.jboss.netty" % "netty" % "3.2.6.Final",
"it.unimi.dsi" % "fastutil" % "6.4.4", "it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0" "colt" % "colt" % "1.2.0",
"cc.spray" % "spray-can" % "1.0-M2.1",
"cc.spray" % "spray-server" % "1.0-M2.1"
) )
) ++ assemblySettings ++ extraAssemblySettings ) ++ assemblySettings ++ extraAssemblySettings