BlockManager UI.

This commit is contained in:
Denny 2012-10-29 14:53:47 -07:00
parent 51477e8874
commit 531ac136bf
13 changed files with 283 additions and 10 deletions

View file

@ -107,6 +107,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
// Variables relating to persistence
private var storageLevel: StorageLevel = StorageLevel.NONE
/* Assign a name to this RDD */
def name(name: String) = {
sc.rddNames(this.id) = name
this
}
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
@ -118,6 +124,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
"Cannot change storage level of an RDD after it was already assigned a level")
}
storageLevel = newLevel
// Register the RDD with the SparkContext
sc.persistentRdds(id) = this
this
}

View file

@ -1,6 +1,7 @@
package spark
import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.net.{URI, URLClassLoader}
@ -102,10 +103,19 @@ class SparkContext(
isLocal)
SparkEnv.set(env)
// Start the BlockManager UI
spark.storage.BlockManagerUI.start(SparkEnv.get.actorSystem,
SparkEnv.get.blockManager.master.masterActor, this)
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new ConcurrentHashMap[Int, RDD[_]]()
// A HashMap for friendly RDD Names
private[spark] val rddNames = new ConcurrentHashMap[Int, String]()
// Add each JAR given through the constructor
jars.foreach { addJar(_) }

View file

@ -3,7 +3,8 @@ package spark.storage
import java.io._
import java.util.{HashMap => JHashMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.util.Random
import akka.actor._
@ -90,6 +91,15 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark]
case object GetMemoryStatus extends ToBlockManagerMaster
private[spark]
case class GetStorageStatus extends ToBlockManagerMaster
private[spark]
case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
private[spark]
case class StorageStatus(maxMem: Long, remainingMem: Long, blocks: Map[String, BlockStatus])
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
@ -99,7 +109,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val maxMem: Long) {
private var _lastSeenMs = timeMs
private var _remainingMem = maxMem
private val _blocks = new JHashMap[String, StorageLevel]
private val _blocks = new JHashMap[String, BlockStatus]
logInfo("Registering block manager %s:%d with %s RAM".format(
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
@ -115,7 +126,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val originalLevel: StorageLevel = _blocks.get(blockId)
val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
if (originalLevel.useMemory) {
_remainingMem += memSize
@ -124,7 +135,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (storageLevel.isValid) {
// isValid means it is either stored in-memory or on-disk.
_blocks.put(blockId, storageLevel)
_blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
if (storageLevel.useMemory) {
_remainingMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
@ -137,7 +148,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val originalLevel: StorageLevel = _blocks.get(blockId)
val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
_blocks.remove(blockId)
if (originalLevel.useMemory) {
_remainingMem += memSize
@ -152,6 +163,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
def blocks: JHashMap[String, BlockStatus] = _blocks
def remainingMem: Long = _remainingMem
def lastSeenMs: Long = _lastSeenMs
@ -198,6 +211,9 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case GetMemoryStatus =>
getMemoryStatus
case GetStorageStatus =>
getStorageStatus
case RemoveHost(host) =>
removeHost(host)
sender ! true
@ -219,6 +235,13 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! res
}
private def getStorageStatus() {
val res = blockManagerInfo.map { case(blockManagerId, info) =>
StorageStatus(info.maxMem, info.remainingMem, info.blocks.asScala)
}
sender ! res
}
private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "

View file

@ -0,0 +1,102 @@
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
import cc.spray.Directives
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import scala.collection.mutable.ArrayBuffer
import spark.{Logging, SparkContext, SparkEnv}
import spark.util.AkkaUtils
private[spark]
object BlockManagerUI extends Logging {
/* Starts the Web interface for the BlockManager */
def start(actorSystem : ActorSystem, masterActor: ActorRef, sc: SparkContext) {
val webUIDirectives = new BlockManagerUIDirectives(actorSystem, masterActor, sc)
try {
logInfo("Starting BlockManager WebUI.")
val port = Option(System.getenv("BLOCKMANAGER_UI_PORT")).getOrElse("9080").toInt
AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, webUIDirectives.handler, "BlockManagerHTTPServer")
} catch {
case e: Exception =>
logError("Failed to create BlockManager WebUI", e)
System.exit(1)
}
}
}
private[spark]
case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, numPartitions: Int, memSize: Long, diskSize: Long)
private[spark]
class BlockManagerUIDirectives(val actorSystem: ActorSystem, master: ActorRef, sc: SparkContext) extends Directives {
val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(1 seconds)
val handler = {
get { path("") { completeWith {
// Request the current storage status from the Master
val future = master ? GetStorageStatus
future.map { status =>
val storageStati = status.asInstanceOf[ArrayBuffer[StorageStatus]]
// Calculate macro-level statistics
val maxMem = storageStati.map(_.maxMem).reduce(_+_)
val remainingMem = storageStati.map(_.remainingMem).reduce(_+_)
val diskSpaceUsed = storageStati.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
// Filter out everything that's not and rdd.
val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => k.startsWith("rdd") }.toMap
val rdds = rddInfoFromBlockStati(rddBlocks)
spark.storage.html.index.render(maxMem, remainingMem, diskSpaceUsed, rdds.toList)
}
}}} ~
get { path("rdd") { parameter("id") { id => { completeWith {
val future = master ? GetStorageStatus
future.map { status =>
val prefix = "rdd_" + id.toString
val storageStati = status.asInstanceOf[ArrayBuffer[StorageStatus]]
val rddBlocks = storageStati.flatMap(_.blocks).filter { case(k,v) => k.startsWith(prefix) }.toMap
val rddInfo = rddInfoFromBlockStati(rddBlocks).first
spark.storage.html.rdd.render(rddInfo, rddBlocks)
}
}}}}} ~
pathPrefix("static") {
getFromResourceDirectory(STATIC_RESOURCE_DIR)
}
}
private def rddInfoFromBlockStati(infos: Map[String, BlockStatus]) : Array[RDDInfo] = {
infos.groupBy { case(k,v) =>
// Group by rdd name, ignore the partition name
k.substring(0,k.lastIndexOf('_'))
}.map { case(k,v) =>
val blockStati = v.map(_._2).toArray
// Add up memory and disk sizes
val tmp = blockStati.map { x => (x.memSize, x.diskSize)}.reduce { (x,y) =>
(x._1 + y._1, x._2 + y._2)
}
// Get the friendly name for the rdd, if available.
// This is pretty hacky, is there a better way?
val rddId = k.split("_").last.toInt
val rddName : String = Option(sc.rddNames.get(rddId)).getOrElse(k)
val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel
RDDInfo(rddId, rddName, rddStorageLevel, blockStati.length, tmp._1, tmp._2)
}.toArray
}
}

View file

@ -50,12 +50,13 @@ private[spark] object AkkaUtils {
* Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
* handle requests. Throws a SparkException if this fails.
*/
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) {
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route,
name: String = "HttpServer") {
val ioWorker = new IoWorker(actorSystem).start()
val httpService = actorSystem.actorOf(Props(new HttpService(route)))
val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
val server = actorSystem.actorOf(
Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = name)
actorSystem.registerOnTermination { ioWorker.stop() }
val timeout = 3.seconds
val future = server.ask(HttpServer.Bind(ip, port))(timeout)

View file

@ -1,7 +1,7 @@
@(state: spark.deploy.MasterState)
@import spark.deploy.master._
@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) {
@spark.common.html.layout(title = "Spark Master on " + state.uri) {
<!-- Cluster Details -->
<div class="row">

View file

@ -1,6 +1,6 @@
@(job: spark.deploy.master.JobInfo)
@spark.deploy.common.html.layout(title = "Job Details") {
@spark.common.html.layout(title = "Job Details") {
<!-- Job Details -->
<div class="row">

View file

@ -1,6 +1,6 @@
@(worker: spark.deploy.WorkerState)
@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) {
@spark.common.html.layout(title = "Spark Worker on " + worker.uri) {
<!-- Worker Details -->
<div class="row">

View file

@ -0,0 +1,28 @@
@(maxMem: Long, remainingMem: Long, diskSpaceUsed: Long, rdds: List[spark.storage.RDDInfo])
@spark.common.html.layout(title = "Storage Dashboard") {
<!-- High-Level Information -->
<div class="row">
<div class="span12">
<ul class="unstyled">
<li><strong>Memory:</strong>
@{spark.Utils.memoryBytesToString(maxMem - remainingMem)} Used
(@{spark.Utils.memoryBytesToString(remainingMem)} Available) </li>
<li><strong>Disk:</strong> @{spark.Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
</ul>
</div>
</div>
<hr/>
<!-- RDD Summary (Running) -->
<div class="row">
<div class="span12">
<h3> RDD Summary </h3>
<br/>
@rdd_table(rdds)
</div>
</div>
}

View file

@ -0,0 +1,65 @@
@(rddInfo: spark.storage.RDDInfo, blocks: Map[String, spark.storage.BlockStatus])
@spark.common.html.layout(title = "RDD Info ") {
<!-- High-Level Information -->
<div class="row">
<div class="span12">
<ul class="unstyled">
<li>
<strong>Storage Level:</strong>
@(if (rddInfo.storageLevel.useDisk) "Disk" else "")
@(if (rddInfo.storageLevel.useMemory) "Memory" else "")
@(if (rddInfo.storageLevel.deserialized) "Deserialized" else "")
@(rddInfo.storageLevel.replication)x Replicated
<li>
<strong>Partitions:</strong>
@(rddInfo.numPartitions)
</li>
<li>
<strong>Memory Size:</strong>
@{spark.Utils.memoryBytesToString(rddInfo.memSize)}
</li>
<li>
<strong>Disk Size:</strong>
@{spark.Utils.memoryBytesToString(rddInfo.diskSize)}
</li>
</ul>
</div>
</div>
<hr/>
<!-- RDD Summary -->
<div class="row">
<div class="span12">
<h3> RDD Summary </h3>
<br/>
<!-- Block Table Summary -->
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<tr>
<th>Block Name</th>
<th>Storage Level</th>
<th>Size in Memory</th>
<th>Size on Disk</th>
</tr>
</thead>
<tbody>
@blocks.map { case (k,v) =>
<tr>
<td>@k</td>
<td>@v.storageLevel</td>
<td>@{spark.Utils.memoryBytesToString(v.memSize)}</td>
<td>@{spark.Utils.memoryBytesToString(v.diskSize)}</td>
</tr>
}
</tbody>
</table>
</div>
</div>
}

View file

@ -0,0 +1,18 @@
@(rdd: spark.storage.RDDInfo)
<tr>
<td>
<a href="rdd?id=@(rdd.id)">
@rdd.name
</a>
</td>
<td>
@(if (rdd.storageLevel.useDisk) "Disk" else "")
@(if (rdd.storageLevel.useMemory) "Memory" else "")
@(if (rdd.storageLevel.deserialized) "Deserialized" else "")
@(rdd.storageLevel.replication)x Replicated
</td>
<td>@rdd.numPartitions</td>
<td>@{spark.Utils.memoryBytesToString(rdd.memSize)}</td>
<td>@{spark.Utils.memoryBytesToString(rdd.diskSize)}</td>
</tr>

View file

@ -0,0 +1,18 @@
@(rdds: List[spark.storage.RDDInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<tr>
<th>RDD Name</th>
<th>Storage Level</th>
<th>Partitions</th>
<th>Size in Memory</th>
<th>Size on Disk</th>
</tr>
</thead>
<tbody>
@for(rdd <- rdds) {
@rdd_row(rdd)
}
</tbody>
</table>