diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 010203d1ca..06d2d09fce 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -15,7 +15,6 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) { logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) - val ser = SparkEnv.get.serializer.newInstance() val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 5dcf25f997..602fcca0f9 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -1,14 +1,11 @@ package spark import akka.actor.ActorSystem -import akka.actor.ActorSystemImpl -import akka.remote.RemoteActorRefProvider - -import com.typesafe.config.ConfigFactory import spark.storage.BlockManager import spark.storage.BlockManagerMaster import spark.network.ConnectionManager +import spark.util.AkkaUtils class SparkEnv ( val actorSystem: ActorSystem, @@ -45,24 +42,12 @@ object SparkEnv { isLocal: Boolean ) : SparkEnv = { - val akkaConf = ConfigFactory.parseString(""" - akka.daemonic = on - akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] - akka.actor.provider = "akka.remote.RemoteActorRefProvider" - akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" - akka.remote.netty.hostname = "%s" - akka.remote.netty.port = %d - """.format(hostname, port)) - - val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port), // figure out which port number Akka actually bound to and set spark.master.port to it. - // Unfortunately Akka doesn't yet provide an API for this except if you cast objects as below. if (isMaster && port == 0) { - val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider - val port = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get - System.setProperty("spark.master.port", port.toString) + System.setProperty("spark.master.port", boundPort.toString) } val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer") diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 17670e077a..44dca2e4f1 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -205,12 +205,15 @@ object Utils { * example, 4,000,000 is returned as 4MB. */ def memoryBytesToString(size: Long): String = { + val TB = 1L << 40 val GB = 1L << 30 val MB = 1L << 20 val KB = 1L << 10 val (value, unit) = { - if (size >= 2*GB) { + if (size >= 2*TB) { + (size.asInstanceOf[Double] / TB, "TB") + } else if (size >= 2*GB) { (size.asInstanceOf[Double] / GB, "GB") } else if (size >= 2*MB) { (size.asInstanceOf[Double] / MB, "MB") @@ -220,6 +223,6 @@ object Utils { (size.asInstanceOf[Double], "B") } } - "%.1f%s".formatLocal(Locale.US, value, unit) + "%.1f %s".formatLocal(Locale.US, value, unit) } } diff --git a/core/src/main/scala/spark/deploy/Master.scala b/core/src/main/scala/spark/deploy/Master.scala new file mode 100644 index 0000000000..da2f678f52 --- /dev/null +++ b/core/src/main/scala/spark/deploy/Master.scala @@ -0,0 +1,44 @@ +package spark.deploy + +import akka.actor.{ActorRef, Props, Actor, ActorSystem} +import spark.{Logging, Utils} +import scala.collection.immutable.{::, Nil} +import spark.util.{AkkaUtils, IntParam} +import cc.spray.Directives + +sealed trait MasterMessage +case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends MasterMessage + +class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { + override def preStart() { + logInfo("Starting Spark master at spark://" + ip + ":" + port) + startWebUi() + } + + def startWebUi() { + val webUi = new MasterWebUI(context.system, self) + try { + AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler) + } catch { + case e: Exception => + logError("Failed to create web UI", e) + System.exit(1) + } + } + + override def receive = { + case RegisterSlave(host, slavePort, cores, memory) => + logInfo("Registering slave %s:%d with %d cores, %s RAM".format( + host, slavePort, cores, Utils.memoryBytesToString(memory * 1024L))) + } +} + +object Master { + def main(args: Array[String]) { + val params = new MasterArguments(args) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", params.ip, params.port) + val actor = actorSystem.actorOf( + Props(new Master(params.ip, boundPort, params.webUiPort)), name = "Master") + actorSystem.awaitTermination() + } +} diff --git a/core/src/main/scala/spark/deploy/MasterArguments.scala b/core/src/main/scala/spark/deploy/MasterArguments.scala new file mode 100644 index 0000000000..c948a405ef --- /dev/null +++ b/core/src/main/scala/spark/deploy/MasterArguments.scala @@ -0,0 +1,51 @@ +package spark.deploy + +import spark.util.IntParam +import spark.Utils + +/** + * Command-line parser for the master. + */ +class MasterArguments(args: Array[String]) { + var ip: String = Utils.localIpAddress() + var port: Int = 7077 + var webUiPort: Int = 8080 + + parse(args.toList) + + def parse(args: List[String]): Unit = args match { + case ("--ip" | "-i") :: value :: tail => + ip = value + parse(tail) + + case ("--port" | "-p") :: IntParam(value) :: tail => + port = value + parse(tail) + + case "--webui-port" :: IntParam(value) :: tail => + webUiPort = value + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => {} + + case _ => + printUsageAndExit(1) + } + + /** + * Print usage and exit JVM with the given exit code. + */ + def printUsageAndExit(exitCode: Int) { + System.err.println( + "Usage: spark-master [options]\n" + + "\n" + + "Options:\n" + + " -i IP, --ip IP IP address or DNS name to listen on\n" + + " -p PORT, --port PORT Port to listen on (default: 7077)\n" + + " --webui-port PORT Port for web UI (default: 8080)\n") + System.exit(exitCode) + } +} \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/MasterWebUI.scala b/core/src/main/scala/spark/deploy/MasterWebUI.scala new file mode 100644 index 0000000000..3f078322e1 --- /dev/null +++ b/core/src/main/scala/spark/deploy/MasterWebUI.scala @@ -0,0 +1,12 @@ +package spark.deploy + +import akka.actor.{ActorRef, ActorSystem} +import cc.spray.Directives + +class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { + val handler = { + path("") { + get { _.complete("Hello world!") } + } + } +} diff --git a/core/src/main/scala/spark/deploy/Worker.scala b/core/src/main/scala/spark/deploy/Worker.scala new file mode 100644 index 0000000000..7210a4b902 --- /dev/null +++ b/core/src/main/scala/spark/deploy/Worker.scala @@ -0,0 +1,10 @@ +package spark.deploy + +class Worker(cores: Int, memoryMb: Int) { + +} + +object Worker { + def main(args: Array[String]) { + } +} \ No newline at end of file diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala new file mode 100644 index 0000000000..84e942e5b7 --- /dev/null +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -0,0 +1,71 @@ +package spark.util + +import akka.actor.{Props, ActorSystemImpl, ActorSystem} +import com.typesafe.config.ConfigFactory +import akka.util.duration._ +import akka.pattern.ask +import akka.remote.RemoteActorRefProvider +import cc.spray.Route +import cc.spray.io.IoWorker +import cc.spray.{SprayCanRootService, HttpService} +import cc.spray.can.server.HttpServer +import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler +import akka.dispatch.Await +import spark.SparkException +import java.util.concurrent.TimeoutException + +/** + * Various utility classes for working with Akka. + */ +object AkkaUtils { + /** + * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the + * ActorSystem itself and its port (which is hard to get from Akka). + */ + def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { + val akkaConf = ConfigFactory.parseString(""" + akka.daemonic = on + akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] + akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" + akka.remote.netty.hostname = "%s" + akka.remote.netty.port = %d + """.format(host, port)) + + val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) + + // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a + // hack because Akka doesn't let you figure out the port through the public API yet. + val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider + val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get + return (actorSystem, boundPort) + + return (null, 0) + } + + /** + * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to + * handle requests. Throws a SparkException if this fails. + */ + def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) { + val ioWorker = new IoWorker(actorSystem).start() + val httpService = actorSystem.actorOf(Props(new HttpService(route))) + val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService))) + val server = actorSystem.actorOf( + Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer") + actorSystem.registerOnTermination { ioWorker.stop() } + val timeout = 1.seconds + val future = server.ask(HttpServer.Bind(ip, port))(timeout) + try { + Await.result(future, timeout) match { + case bound: HttpServer.Bound => + return + case other: Any => + throw new SparkException("Failed to bind web UI to port " + port + ": " + other) + } + } catch { + case e: TimeoutException => + throw new SparkException("Failed to bind web UI to port " + port) + } + } +} diff --git a/core/src/main/scala/spark/util/IntParam.scala b/core/src/main/scala/spark/util/IntParam.scala new file mode 100644 index 0000000000..c3ff063569 --- /dev/null +++ b/core/src/main/scala/spark/util/IntParam.scala @@ -0,0 +1,14 @@ +package spark.util + +/** + * An extractor object for parsing strings into integers. + */ +object IntParam { + def unapply(str: String): Option[Int] = { + try { + Some(str.toInt) + } catch { + case e: NumberFormatException => None + } + } +} diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala index 1ac4737f04..a46d90223e 100644 --- a/core/src/test/scala/spark/UtilsSuite.scala +++ b/core/src/test/scala/spark/UtilsSuite.scala @@ -7,12 +7,13 @@ import scala.util.Random class UtilsSuite extends FunSuite { test("memoryBytesToString") { - assert(Utils.memoryBytesToString(10) === "10.0B") - assert(Utils.memoryBytesToString(1500) === "1500.0B") - assert(Utils.memoryBytesToString(2000000) === "1953.1KB") - assert(Utils.memoryBytesToString(2097152) === "2.0MB") - assert(Utils.memoryBytesToString(2306867) === "2.2MB") - assert(Utils.memoryBytesToString(5368709120L) === "5.0GB") + assert(Utils.memoryBytesToString(10) === "10.0 B") + assert(Utils.memoryBytesToString(1500) === "1500.0 B") + assert(Utils.memoryBytesToString(2000000) === "1953.1 KB") + assert(Utils.memoryBytesToString(2097152) === "2.0 MB") + assert(Utils.memoryBytesToString(2306867) === "2.2 MB") + assert(Utils.memoryBytesToString(5368709120L) === "5.0 GB") + assert(Utils.memoryBytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB") } test("copyStream") { diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index be5f202dbe..6f5df5c743 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -46,7 +46,8 @@ object SparkBuild extends Build { resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", - "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/" + "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/", + "Spray Repository" at "http://repo.spray.cc/" ), libraryDependencies ++= Seq( "com.google.guava" % "guava" % "11.0.1", @@ -63,7 +64,9 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-slf4j" % "2.0.2", "org.jboss.netty" % "netty" % "3.2.6.Final", "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0" + "colt" % "colt" % "1.2.0", + "cc.spray" % "spray-can" % "1.0-M2.1", + "cc.spray" % "spray-server" % "1.0-M2.1" ) ) ++ assemblySettings ++ extraAssemblySettings