Merge branch 'master' into bootstrap-design

Conflicts:
	core/src/main/scala/spark/ui/UIUtils.scala
	core/src/main/scala/spark/ui/jobs/IndexPage.scala
	core/src/main/scala/spark/ui/storage/RDDPage.scala
This commit is contained in:
Patrick Wendell 2013-08-07 21:06:03 -07:00
commit 8c0d668468
48 changed files with 1150 additions and 412 deletions

View file

@ -1,48 +1,45 @@
# syntax: [instance].[sink|source].[name].[options]
# syntax: [instance].sink|source.[name].[options]=[value]
# "instance" specify "who" (the role) use metrics system. In spark there are
# several roles like master, worker, executor, driver, these roles will
# create metrics system for monitoring. So instance represents these roles.
# Currently in Spark, several instances have already implemented: master,
# worker, executor, driver.
# This file configures Spark's internal metrics system. The metrics system is
# divided into instances which correspond to internal components.
# Each instance can be configured to report its metrics to one or more sinks.
# Accepted values for [instance] are "master", "worker", "executor", "driver",
# and "applications". A wild card "*" can be used as an instance name, in
# which case all instances will inherit the supplied property.
#
# [instance] field can be "master", "worker", "executor", "driver", which means
# only the specified instance has this property.
# a wild card "*" can be used to represent instance name, which means all the
# instances will have this property.
# Within an instance, a "source" specifies a particular set of grouped metrics.
# there are two kinds of sources:
# 1. Spark internal sources, like MasterSource, WorkerSource, etc, which will
# collect a Spark component's internal state. Each instance is paired with a
# Spark source that is added automatically.
# 2. Common sources, like JvmSource, which will collect low level state.
# These can be added through configuration options and are then loaded
# using reflection.
#
# "source" specify "where" (source) to collect metrics data. In metrics system,
# there exists two kinds of source:
# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
# collect Spark component's internal state, these sources are related to
# instance and will be added after specific metrics system is created.
# 2. Common source, like JvmSource, which will collect low level state, is
# configured by configuration and loaded through reflection.
# A "sink" specifies where metrics are delivered to. Each instance can be
# assigned one or more sinks.
#
# "sink" specify "where" (destination) to output metrics data to. Several sinks
# can be coexisted and flush metrics to all these sinks.
# The sink|source field specifies whether the property relates to a sink or
# source.
#
# [sink|source] field specify this property is source related or sink, this
# field can only be source or sink.
# The [name] field specifies the name of source or sink.
#
# [name] field specify the name of source or sink, this is custom defined.
#
# [options] field is the specific property of this source or sink, this source
# or sink is responsible for parsing this property.
# The [options] field is the specific property of this source or sink. The
# source or sink is responsible for parsing this property.
#
# Notes:
# 1. Sinks should be added through configuration, like console sink, class
# full name should be specified by class property.
# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
# it should be attention minimal polling period is 1 seconds, any period
# below than 1s is illegal.
# 3. Wild card property can be overlapped by specific instance property, for
# example, *.sink.console.period can be overlapped by master.sink.console.period.
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
# 4. A metrics specific configuration
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
# added to Java property using -Dspark.metrics.conf=xxx if you want to
# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
# metrics system will search and load it automatically.
# added to Java properties using -Dspark.metrics.conf=xxx if you want to
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
# and it will be loaded automatically.
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink

View file

@ -47,3 +47,31 @@
padding-top: 7px;
padding-left: 4px;
}
.table td {
vertical-align: middle !important;
}
.progress-completed .bar,
.progress .bar-completed {
background-color: #b3def9;
background-image: -moz-linear-gradient(top, #addfff, #badcf2);
background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#addfff), to(#badcf2));
background-image: -webkit-linear-gradient(top, #addfff, #badcf2);
background-image: -o-linear-gradient(top, #addfff, #badcf2);
background-image: linear-gradient(to bottom, #addfff, #badcf2);
background-repeat: repeat-x;
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffaddfff', endColorstr='#ffbadcf2', GradientType=0);
}
.progress-running .bar,
.progress .bar-running {
background-color: #c2ebfa;
background-image: -moz-linear-gradient(top, #bdedff, #c7e8f5);
background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#bdedff), to(#c7e8f5));
background-image: -webkit-linear-gradient(top, #bdedff, #c7e8f5);
background-image: -o-linear-gradient(top, #bdedff, #c7e8f5);
background-image: linear-gradient(to bottom, #bdedff, #c7e8f5);
background-repeat: repeat-x;
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffbdedff', endColorstr='#ffc7e8f5', GradientType=0);
}

View file

@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.DynamicVariable
import scala.collection.mutable.{ConcurrentMap, HashMap}
@ -60,8 +61,10 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
@ -125,6 +128,8 @@ class SparkContext(
private[spark] val ui = new SparkUI(this)
ui.bind()
val startTime = System.currentTimeMillis()
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach { addJar(_) }
@ -262,12 +267,18 @@ class SparkContext(
localProperties.value = new Properties()
}
def addLocalProperties(key: String, value: String) {
def addLocalProperty(key: String, value: String) {
if(localProperties.value == null) {
localProperties.value = new Properties()
}
localProperties.value.setProperty(key,value)
}
/** Set a human readable description of the current job. */
def setDescription(value: String) {
addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
// Post init
taskScheduler.postStartHook()
@ -574,6 +585,28 @@ class SparkContext(
env.blockManager.master.getStorageStatus
}
/**
* Return pools for fair scheduler
* TODO(xiajunluan): We should take nested pools into account
*/
def getAllPools: ArrayBuffer[Schedulable] = {
taskScheduler.rootPool.schedulableQueue
}
/**
* Return the pool associated with the given name, if one exists
*/
def getPoolForName(pool: String): Option[Schedulable] = {
taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
}
/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
@ -816,6 +849,7 @@ class SparkContext(
* various Spark features.
*/
object SparkContext {
val SPARK_JOB_DESCRIPTION = "spark.job.description"
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
@ -933,7 +967,6 @@ object SparkContext {
}
}
/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
@ -945,3 +978,4 @@ private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

View file

@ -97,13 +97,26 @@ class SparkEnv (
object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _
def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
}
/**
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
* previously set in any thread.
*/
def get: SparkEnv = {
env.get()
Option(env.get()).getOrElse(lastSetSparkEnv)
}
/**
* Returns the ThreadLocal SparkEnv.
*/
def getThreadLocal : SparkEnv = {
env.get()
}
def createFromSystemProperties(

View file

@ -33,8 +33,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import spark.serializer.SerializerInstance
import spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
/**
@ -68,6 +69,47 @@ private object Utils extends Logging {
return ois.readObject.asInstanceOf[T]
}
/** Serialize via nested stream using specific serializer */
def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
val osWrapper = ser.serializeStream(new OutputStream {
def write(b: Int) = os.write(b)
override def write(b: Array[Byte], off: Int, len: Int) = os.write(b, off, len)
})
try {
f(osWrapper)
} finally {
osWrapper.close()
}
}
/** Deserialize via nested stream using specific serializer */
def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
val isWrapper = ser.deserializeStream(new InputStream {
def read(): Int = is.read()
override def read(b: Array[Byte], off: Int, len: Int): Int = is.read(b, off, len)
})
try {
f(isWrapper)
} finally {
isWrapper.close()
}
}
/**
* Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
*/
def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
if (bb.hasArray) {
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
} else {
val bbval = new Array[Byte](bb.remaining())
bb.get(bbval)
out.write(bbval)
}
}
def isAlpha(c: Char): Boolean = {
(c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
}

View file

@ -109,6 +109,7 @@ private[deploy] object DeployMessages {
}
// WorkerWebUI to Worker
case object RequestWorkerState
// Worker to WorkerWebUI
@ -120,4 +121,9 @@ private[deploy] object DeployMessages {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
}
// Actor System to Master
case object CheckForWorkerTimeOut
}

View file

@ -34,6 +34,7 @@ private[spark] class ApplicationInfo(
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L
val appSource = new ApplicationSource(this)
private var nextExecutorId = 0
@ -51,8 +52,10 @@ private[spark] class ApplicationInfo(
}
def removeExecutor(exec: ExecutorInfo) {
executors -= exec.id
coresGranted -= exec.cores
if (executors.contains(exec.id)) {
executors -= exec.id
coresGranted -= exec.cores
}
}
def coresLeft: Int = desc.maxCores - coresGranted

View file

@ -0,0 +1,24 @@
package spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
class ApplicationSource(val application: ApplicationInfo) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.%s.%s".format("application", application.desc.name,
System.currentTimeMillis())
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
override def getValue: String = application.state.toString
})
metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
override def getValue: Long = application.duration
})
metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
override def getValue: Int = application.coresGranted
})
}

View file

@ -38,7 +38,9 @@ import spark.util.AkkaUtils
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
@ -59,7 +61,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
val metricsSystem = MetricsSystem.createMetricsSystem("master")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
val masterPublicAddress = {
@ -77,15 +80,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
metricsSystem.registerSource(masterSource)
metricsSystem.start()
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
}
override def postStop() {
webUi.stop()
metricsSystem.stop()
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
}
override def receive = {
@ -171,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
}
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
}
/**
@ -275,6 +284,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val now = System.currentTimeMillis()
val date = new Date(now)
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
@ -300,7 +310,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp -= app.id
actorToApp -= app.driver
addressToApp -= app.driver.path.address
completedApps += app // Remember it in our history
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
@ -325,12 +342,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
for (worker <- toRemove) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT))
removeWorker(worker)
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT/1000))
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}

View file

@ -17,7 +17,7 @@
package spark.metrics
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
@ -93,6 +93,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
}
def removeSource(source: Source) {
sources -= source
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
})
}
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

View file

@ -88,6 +88,7 @@ class HadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val conf = confBroadcast.value.value

View file

@ -73,6 +73,7 @@ class NewHadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)

View file

@ -20,13 +20,15 @@ package spark.rdd
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
import spark.{RDD, TaskContext, SparkContext, Partition}
import spark._
import java.io._
import scala.Serializable
private[spark] class ParallelCollectionPartition[T: ClassManifest](
val rddId: Long,
val slice: Int,
values: Seq[T])
extends Partition with Serializable {
var rddId: Long,
var slice: Int,
var values: Seq[T])
extends Partition with Serializable {
def iterator: Iterator[T] = values.iterator
@ -37,15 +39,49 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
case _ => false
}
override val index: Int = slice
override def index: Int = slice
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = {
val sfactory = SparkEnv.get.serializer
// Treat java serializer with default action rather than going thru serialization, to avoid a
// separate serialization header.
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
case _ =>
out.writeLong(rddId)
out.writeInt(slice)
val ser = sfactory.newInstance()
Utils.serializeViaNestedStream(out, ser)(_.writeObject(values))
}
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
case _ =>
rddId = in.readLong()
slice = in.readInt()
val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser)(ds => values = ds.readObject())
}
}
}
private[spark] class ParallelCollectionRDD[T: ClassManifest](
@transient sc: SparkContext,
@transient data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int,Seq[String]])
extends RDD[T](sc, Nil) {
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
// instead.
@ -82,16 +118,17 @@ private object ParallelCollectionRDD {
1
}
slice(new Range(
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
}
case r: Range => {
(0 until numSlices).map(i => {
val start = ((i * r.length.toLong) / numSlices).toInt
val end = (((i+1) * r.length.toLong) / numSlices).toInt
val end = (((i + 1) * r.length.toLong) / numSlices).toInt
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}).asInstanceOf[Seq[Seq[T]]]
}
case nr: NumericRange[_] => { // For ranges of Long, Double, BigInteger, etc
case nr: NumericRange[_] => {
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
var r = nr
@ -102,10 +139,10 @@ private object ParallelCollectionRDD {
slices
}
case _ => {
val array = seq.toArray // To prevent O(n^2) operations for List etc
val array = seq.toArray // To prevent O(n^2) operations for List etc
(0 until numSlices).map(i => {
val start = ((i * array.length.toLong) / numSlices).toInt
val end = (((i+1) * array.length.toLong) / numSlices).toInt
val end = (((i + 1) * array.length.toLong) / numSlices).toInt
array.slice(start, end).toSeq
})
}

View file

@ -510,6 +510,12 @@ class DAGScheduler(
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties
sparkListeners.foreach(_.onStageSubmitted(
SparkListenerStageSubmitted(stage, tasks.size, properties)))
if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
@ -524,11 +530,9 @@ class DAGScheduler(
return
}
sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
val properties = idToActiveJob(stage.priority).properties
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
if (!stage.submissionTime.isDefined) {

View file

@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
import spark._
import spark.SparkContext
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
@ -62,7 +63,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
event match {
case SparkListenerJobStart(job, properties) =>
processJobStartEvent(job, properties)
case SparkListenerStageSubmitted(stage, taskSize) =>
case SparkListenerStageSubmitted(stage, taskSize, properties) =>
processStageSubmittedEvent(stage, taskSize)
case StageCompleted(stageInfo) =>
processStageCompletedEvent(stageInfo)
@ -317,8 +318,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
val annotation = properties.getProperty("spark.job.annotation", "")
jobLogInfo(jobID, annotation, false)
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
jobLogInfo(jobID, description, false)
}
}

View file

@ -25,7 +25,8 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
@ -34,10 +35,10 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
extends SparkListenerEvents
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
trait SparkListener {
@ -45,7 +46,7 @@ trait SparkListener {
* Called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted) { }
/**
* Called when a stage is submitted
*/
@ -65,12 +66,12 @@ trait SparkListener {
* Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
/**
* Called when a job ends
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
}
/**

View file

@ -21,6 +21,8 @@ import java.io._
import scala.collection.mutable.Map
import spark.executor.TaskMetrics
import spark.{Utils, SparkEnv}
import java.nio.ByteBuffer
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
@ -30,7 +32,13 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
def this() = this(null.asInstanceOf[T], null, null)
override def writeExternal(out: ObjectOutput) {
out.writeObject(value)
val objectSer = SparkEnv.get.serializer.newInstance()
val bb = objectSer.serialize(value)
out.writeInt(bb.remaining())
Utils.writeByteBuffer(bb, out)
out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) {
out.writeLong(key)
@ -40,7 +48,14 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
}
override def readExternal(in: ObjectInput) {
value = in.readObject().asInstanceOf[T]
val objectSer = SparkEnv.get.serializer.newInstance()
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
val numUpdates = in.readInt
if (numUpdates == 0) {
accumUpdates = null

View file

@ -17,6 +17,8 @@
package spark.scheduler
import spark.scheduler.cluster.Pool
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
@ -25,6 +27,11 @@ package spark.scheduler
* the TaskSchedulerListener interface.
*/
private[spark] trait TaskScheduler {
def rootPool: Pool
def schedulingMode: SchedulingMode
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).

View file

@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet
import spark._
import spark.TaskState.TaskState
import spark.scheduler._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
@ -114,6 +115,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.cluster.schedulingmode", "FIFO"))
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
@ -121,15 +125,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) {
backend = context
//default scheduler is FIFO
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case "FIFO" =>
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case "FAIR" =>
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}
@ -204,7 +206,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered")
"check your cluster UI to ensure that workers are registered " +
"and have sufficient memory")
} else {
this.cancel()
}
@ -270,10 +273,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
var launchedTask = false
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue)
{
logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
for (manager <- sortedTaskSetQueue) {
logDebug("parentName:%s, name:%s, runningTasks:%s".format(
manager.parent.name, manager.name, manager.runningTasks))
}
for (manager <- sortedTaskSetQueue) {
// Split offers based on node local, rack local and off-rack tasks.

View file

@ -92,7 +92,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val ser = SparkEnv.get.closureSerializer.newInstance()
val env = SparkEnv.get
val ser = env.closureSerializer.newInstance()
val tasks = taskSet.tasks
val numTasks = tasks.length
@ -107,9 +108,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_" + taskSet.stageId.toString
var name = "TaskSet_"+taskSet.stageId.toString
var parent: Schedulable = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@ -535,6 +535,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
taskFinished(tid, state, serializedData)
@ -697,18 +698,18 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
}
// TODO: for now we just find Pool not TaskSetManager,
// TODO(xiajunluan): for now we just find Pool not TaskSetManager
// we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
override def addSchedulable(schedulable:Schedulable) {
//nothing
// nothing
}
override def removeSchedulable(schedulable:Schedulable) {
//nothing
// nothing
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {

View file

@ -17,14 +17,18 @@
package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import scala.collection.mutable.ArrayBuffer
/**
* An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
var parent: Schedulable
// child queues
def schedulableQueue: ArrayBuffer[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def runningTasks: Int

View file

@ -41,10 +41,11 @@ private[spark] trait SchedulableBuilder {
def addTaskSetManager(manager: Schedulable, properties: Properties)
}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
override def buildPools() {
//nothing
// nothing
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
@ -52,7 +53,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
@ -103,9 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
//finally create "default" pool
// finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
@ -119,8 +122,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) {
//we will create a new pool that user has configured in app instead of being defined in xml file
parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
// we will create a new pool that user has configured in app
// instead of being defined in xml file
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))

View file

@ -17,8 +17,13 @@
package spark.scheduler.cluster
object SchedulingMode extends Enumeration("FAIR","FIFO"){
/**
* "FAIR" and "FIFO" determines which policy is used
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
type SchedulingMode = Value
val FAIR,FIFO = Value
val FAIR,FIFO,NONE = Value
}

View file

@ -23,7 +23,10 @@ import spark.TaskState.TaskState
import spark.scheduler.TaskSet
private[spark] trait TaskSetManager extends Schedulable {
def schedulableQueue = null
def schedulingMode = SchedulingMode.NONE
def taskSet: TaskSet
def slaveOffer(

View file

@ -29,6 +29,7 @@ import spark.TaskState.TaskState
import spark.executor.ExecutorURLClassLoader
import spark.scheduler._
import spark.scheduler.cluster._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import akka.actor._
/**
@ -85,6 +86,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.cluster.schedulingmode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@ -92,15 +95,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var localActor: ActorRef = null
override def start() {
//default scheduler is FIFO
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case "FIFO" =>
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case "FAIR" =>
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}
@ -168,7 +169,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
val ser = SparkEnv.get.closureSerializer.newInstance()
var attemptedTask: Option[Task[_]] = None
val objectSer = SparkEnv.get.serializer.newInstance()
var attemptedTask: Option[Task[_]] = None
val start = System.currentTimeMillis()
var taskStart: Long = 0
try {
@ -192,9 +194,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
val serResult = ser.serialize(result)
val serResult = objectSer.serialize(result)
deserializedTask.metrics.get.resultSize = serResult.limit()
val resultToReturn = ser.deserialize[Any](serResult)
val resultToReturn = objectSer.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
val serviceTime = System.currentTimeMillis() - taskStart

View file

@ -42,7 +42,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val taskInfos = new HashMap[Long, TaskInfo]
val numTasks = taskSet.tasks.size
var numFinished = 0
val ser = SparkEnv.get.closureSerializer.newInstance()
val env = SparkEnv.get
val ser = env.closureSerializer.newInstance()
val copiesRunning = new Array[Int](numTasks)
val finished = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
@ -63,11 +64,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def addSchedulable(schedulable: Schedulable): Unit = {
//nothing
// nothing
}
override def removeSchedulable(schedulable: Schedulable): Unit = {
//nothing
// nothing
}
override def getSchedulableByName(name: String): Schedulable = {
@ -75,7 +76,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def executorLost(executorId: String, host: String): Unit = {
//nothing
// nothing
}
override def checkSpeculatableTasks() = true
@ -143,6 +144,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
taskEnded(tid, state, serializedData)

View file

@ -28,14 +28,14 @@ private[spark] object UIUtils {
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
: Seq[Node] = {
val storage = page match {
case Storage => <li class="active"><a href="/storage">Storage</a></li>
case _ => <li><a href="/storage">Storage</a></li>
}
val jobs = page match {
case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
case _ => <li><a href="/stages">Jobs</a></li>
}
val storage = page match {
case Storage => <li class="active"><a href="/storage">Storage</a></li>
case _ => <li><a href="/storage">Storage</a></li>
}
val environment = page match {
case Environment => <li class="active"><a href="/environment">Environment</a></li>
case _ => <li><a href="/environment">Environment</a></li>
@ -65,17 +65,14 @@ private[spark] object UIUtils {
<div class="navbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></div>
<ul class="nav nav-pills">
{storage}
<a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
<ul class="nav">
{jobs}
{storage}
{environment}
{executors}
</ul>
<ul id="infolist" class="text">
<li>Application: <strong>{sc.appName}</strong></li>
<li>Executors: <strong>{sc.getExecutorStorageStatus.size}</strong></li>
</ul>
<p class="navbar-text pull-right">Application: <strong>{sc.appName}</strong></p>
</div>
</div>
</div>

View file

@ -21,7 +21,8 @@ import scala.util.Random
import spark.SparkContext
import spark.SparkContext._
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
@ -29,18 +30,29 @@ import spark.SparkContext._
*/
private[spark] object UIWorkloadGenerator {
val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 500
val INTER_JOB_WAIT_MS = 5000
def main(args: Array[String]) {
if (args.length < 2) {
println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
val master = args(0)
val schedulingMode = SchedulingMode.withName(args(1))
val appName = "Spark UI Tester"
if (schedulingMode == SchedulingMode.FAIR) {
System.setProperty("spark.cluster.schedulingmode", "FAIR")
}
val sc = new SparkContext(master, appName)
// NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
// but we pass it here anyways since it will be useful once we do.
def setName(s: String) = {
sc.addLocalProperties("spark.job.annotation", s)
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
}
sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
def nextFloat() = (new Random()).nextFloat()
@ -73,14 +85,18 @@ private[spark] object UIWorkloadGenerator {
while (true) {
for ((desc, job) <- jobs) {
try {
setName(desc)
job()
println("Job funished: " + desc)
} catch {
case e: Exception =>
println("Job Failed: " + desc)
}
new Thread {
override def run() {
try {
setProperties(desc)
job()
println("Job funished: " + desc)
} catch {
case e: Exception =>
println("Job Failed: " + desc)
}
}
}.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}

View file

@ -44,7 +44,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString),
("Scala Home", Properties.scalaHome)
)
).sorted
def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
def jvmTable = UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation)
@ -53,8 +53,8 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
.filter{case (k, v) => k.contains("java.class.path")}
.headOption
.getOrElse("", "")
val sparkProperties = properties.filter(_._1.startsWith("spark"))
val otherProperties = properties.diff(sparkProperties :+ classPathProperty)
val sparkProperties = properties.filter(_._1.startsWith("spark")).sorted
val otherProperties = properties.diff(sparkProperties :+ classPathProperty).sorted
val propertyHeaders = Seq("Name", "Value")
def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
@ -67,7 +67,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
val addedFiles = sc.addedFiles.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
val classPath = addedJars ++ addedFiles ++ classPathEntries
val classPath = (addedJars ++ addedFiles ++ classPathEntries).sorted
val classPathHeaders = Seq("Resource", "Source")
def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>

View file

@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
val totalTasks = activeTasks + failedTasks + completedTasks
Seq(
execId,
@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
activeTasks += taskStart.taskInfo
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
executorToTaskInfos(eid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}
}
}

View file

@ -17,25 +17,18 @@
package spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.HashSet
import scala.Some
import scala.xml.{NodeSeq, Node}
import spark.scheduler.cluster.TaskInfo
import spark.scheduler.Stage
import spark.storage.StorageLevel
import spark.scheduler.cluster.SchedulingMode
import spark.ui.Page._
import spark.ui.UIUtils._
import spark.Utils
/** Page showing list of all ongoing and recently finished stages */
/** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) {
def listener = parent.listener
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
val activeStages = listener.activeStages.toSeq
@ -48,29 +41,19 @@ private[spark] class IndexPage(parent: JobProgressUI) {
activeTime += t.timeRunning(now)
}
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Stage Id</th>
<th>Origin</th>
<th>Submitted</th>
<th>Duration</th>
<th colspan="2">Tasks: Complete/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
<th>Stored RDD</th>
</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>
</table>
}
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
val poolTable = new PoolTable(listener.sc.getAllPools, listener)
val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<li>
<strong>Duration: </strong>
{parent.formatDuration(now - listener.sc.startTime)}
</li>
<li>
<strong>CPU time: </strong>
{parent.formatDuration(listener.totalTime + activeTime)}
</li>
@ -86,79 +69,35 @@ private[spark] class IndexPage(parent: JobProgressUI) {
{Utils.memoryBytesToString(listener.totalShuffleWrite)}
</li>
}
<li>
<a href="#active"><strong>Active Stages:</strong></a>
{activeStages.size}
</li>
<li>
<a href="#completed"><strong>Completed Stages:</strong></a>
{completedStages.size}
</li>
<li>
<a href="#failed"><strong>Failed Stages:</strong></a>
{failedStages.size}
</li>
<li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
</ul>
</div>
val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
val completedStageTable = stageTable(stageRow, completedStages)
val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
val content = summary ++
<h4>Active Stages</h4> ++ activeStageTable ++
<h4>Completed Stages</h4> ++ completedStageTable ++
<h4>Failed Stages</h4> ++ failedStageTable
val content = summary ++
{if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
<h4>Pools</h4> ++ poolTable.toNodeSeq
} else {
Seq()
}} ++
<h4 id="active">Active Stages : {activeStages.size}</h4> ++
activeStagesTable.toNodeSeq++
<h4 id="completed">Completed Stages : {completedStages.size}</h4> ++
completedStagesTable.toNodeSeq++
<h4 id ="failed">Failed Stages : {failedStages.size}</h4> ++
failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
}
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
<div class="progress" style="height: 15px; margin-bottom: 0px">
<div class="bar" style={completeWidth}></div>
<div class="bar bar-info" style={startWidth}></div>
</div>
}
def stageRow(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
<tr>
<td>{s.id}</td>
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td>
<td style="border-left: 0; text-align: center;">
{completedTasks} / {totalTasks}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ =>
}}
</td>
<td>{shuffleRead}</td>
<td>{shuffleWrite}</td>
<td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
<a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
{Option(s.rdd.name).getOrElse(s.rdd.id)}
</a>
}}
</td>
</tr>
}
}

View file

@ -0,0 +1,167 @@
package spark.ui.jobs
import scala.Seq
import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
val DEFAULT_POOL_NAME = "default"
val stageToPool = new HashMap[Stage, String]()
val stageToDescription = new HashMap[Stage, String]()
val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
val activeStages = HashSet[Stage]()
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
// Total metrics reflect metrics only for completed tasks
var totalTime = 0L
var totalShuffleRead = 0L
var totalShuffleWrite = 0L
val stageToTime = HashMap[Int, Long]()
val stageToShuffleRead = HashMap[Int, Long]()
val stageToShuffleWrite = HashMap[Int, Long]()
val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = {
val stage = stageCompleted.stageInfo.stage
poolToActiveStages(stageToPool(stage)) -= stage
activeStages -= stage
completedStages += stage
trimIfNecessary(completedStages)
}
/** If stages is too large, remove and garbage collect old stages */
def trimIfNecessary(stages: ListBuffer[Stage]) {
if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id)
stageToTime.remove(s.id)
stageToShuffleRead.remove(s.id)
stageToShuffleWrite.remove(s.id)
stageToTasksActive.remove(s.id)
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
stageToPool.remove(s)
if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
})
stages.trimEnd(toRemove)
}
}
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
val stage = stageSubmitted.stage
activeStages += stage
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
stageToPool(stage) = poolName
val description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
description.map(d => stageToDescription(stage) = d)
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
stages += stage
}
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
stageToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time
totalTime += time
stageToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead
stageToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
jobEnd match {
case end: SparkListenerJobEnd =>
end.jobResult match {
case JobFailed(ex, Some(stage)) =>
activeStages -= stage
poolToActiveStages(stageToPool(stage)) -= stage
failedStages += stage
trimIfNecessary(failedStages)
case _ =>
}
case _ =>
}
}
/** Is this stage's input from a shuffle read. */
def hasShuffleRead(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
}
return false // No tasks have finished for this stage
}
/** Is this stage's output to a shuffle write. */
def hasShuffleWrite(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
}
return false // No tasks have finished for this stage
}
}

View file

@ -31,9 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import spark.ui.JettyUtils._
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) {
@ -43,9 +43,10 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
private val poolPage = new PoolPage(this)
def start() {
_listener = Some(new JobProgressListener)
_listener = Some(new JobProgressListener(sc))
sc.addSparkListener(listener)
}
@ -53,120 +54,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
("/stages", (request: HttpServletRequest) => indexPage.render(request))
)
}
private[spark] class JobProgressListener extends SparkListener {
// How many stages to remember
val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
val activeStages = HashSet[Stage]()
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
// Total metrics reflect metrics only for completed tasks
var totalTime = 0L
var totalShuffleRead = 0L
var totalShuffleWrite = 0L
val stageToTime = HashMap[Int, Long]()
val stageToShuffleRead = HashMap[Int, Long]()
val stageToShuffleWrite = HashMap[Int, Long]()
val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = {
val stage = stageCompleted.stageInfo.stage
activeStages -= stage
completedStages += stage
trimIfNecessary(completedStages)
}
/** If stages is too large, remove and garbage collect old stages */
def trimIfNecessary(stages: ListBuffer[Stage]) {
if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id)
stageToTime.remove(s.id)
stageToShuffleRead.remove(s.id)
stageToShuffleWrite.remove(s.id)
stageToTasksActive.remove(s.id)
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
})
stages.trimEnd(toRemove)
}
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
activeStages += stageSubmitted.stage
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
stageToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time
totalTime += time
stageToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead
stageToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
jobEnd match {
case end: SparkListenerJobEnd =>
end.jobResult match {
case JobFailed(ex, Some(stage)) =>
activeStages -= stage
failedStages += stage
trimIfNecessary(failedStages)
case _ =>
}
case _ =>
}
}
}

View file

@ -0,0 +1,30 @@
package spark.ui.jobs
import javax.servlet.http.HttpServletRequest
import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashSet
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
/** Page showing specific pool details */
private[spark] class PoolPage(parent: JobProgressUI) {
def listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val pool = listener.sc.getPoolForName(poolName).get
val poolTable = new PoolTable(Seq(pool), listener)
val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
<h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
}
}

View file

@ -0,0 +1,49 @@
package spark.ui.jobs
import scala.xml.Node
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.scheduler.Stage
import spark.scheduler.cluster.Schedulable
/** Table showing list of pools */
private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
def toNodeSeq(): Seq[Node] = {
poolTable(poolRow, pools)
}
// pool tables
def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
rows: Seq[Schedulable]
): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Pool Name</th>
<th>Minimum Share</th>
<th>Pool Weight</th>
<td>Active Stages</td>
<td>Running Tasks</td>
<td>SchedulingMode</td>
</thead>
<tbody>
{rows.map(r => makeRow(r, poolToActiveStages))}
</tbody>
</table>
}
def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
<tr>
<td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td>
<td>{p.runningTasks}</td>
<td>{p.schedulingMode}</td>
</tr>
}
}

View file

@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
val tasks = listener.stageToTaskInfos(stageId)
val tasks = listener.stageToTaskInfos(stageId).toSeq
val shuffleRead = listener.stageToShuffleRead(stageId) > 0
val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0

View file

@ -0,0 +1,116 @@
package spark.ui.jobs
import java.util.Date
import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest
import scala.Some
import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.Utils
import spark.storage.StorageLevel
/** Page showing list of all ongoing and recently finished stages */
private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
val listener = parent.listener
val dateFmt = parent.dateFmt
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
def toNodeSeq(): Seq[Node] = {
stageTable(stageRow, stages)
}
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Stage Id</th>
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
<td>Duration</td>
<td>Tasks: Succeeded/Total</td>
<td>Shuffle Read</td>
<td>Shuffle Write</td>
</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>
</table>
}
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
<div class="progress" style="height: 15px; margin-bottom: 0px; position: relative">
<span style="text-align:center; position:absolute; width:100%;">
{completed}/{total} {failed}
</span>
<div class="bar bar-completed" style={completeWidth}></div>
<div class="bar bar-running" style={startWidth}></div>
</div>
}
def stageRow(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ => ""
}
val totalTasks = s.numPartitions
val poolName = listener.stageToPool.get(s)
val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
val description = listener.stageToDescription.get(s)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
<tr>
<td>{s.id}</td>
{if (isFairScheduler) {
<td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
}
<td>{description}</td>
<td valign="middle">{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td class="progress-cell">
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
</td>
<td>{shuffleRead}</td>
<td>{shuffleWrite}</td>
</tr>
}
}

View file

@ -83,18 +83,19 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<hr/>
<div class="row">
<div class="span12">
<h3> Data Distribution Summary </h3>
{workerTable}
</div>
</div>
<hr/>
<div class="row">
<div class="span12">
<h4> RDD Summary </h4>
<h4> Partitions </h4>
{blockTable}
</div>
</div>;
headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs)
headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Storage)
}
def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {

View file

@ -22,7 +22,9 @@ import scala.collection.mutable
import org.scalatest.FunSuite
import com.esotericsoftware.kryo._
class KryoSerializerSuite extends FunSuite {
import KryoTest._
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
@ -124,6 +126,45 @@ class KryoSerializerSuite extends FunSuite {
System.clearProperty("spark.kryo.registrator")
}
test("kryo with collect") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)).collect().map(_.x)
assert(control === result.toSeq)
}
test("kryo with parallelize") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control.map(new ClassWithoutNoArgConstructor(_))).map(_.x).collect()
assert (control === result.toSeq)
}
test("kryo with reduce") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
.reduce((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
assert(control.sum === result)
}
// TODO: this still doesn't work
ignore("kryo with fold") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
.fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
assert(10 + control.sum === result)
}
override def beforeAll() {
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
super.beforeAll()
}
override def afterAll() {
super.afterAll()
System.clearProperty("spark.kryo.registrator")
System.clearProperty("spark.serializer")
}
}
object KryoTest {

View file

@ -32,6 +32,10 @@ import spark.{Dependency, ShuffleDependency, OneToOneDependency}
import spark.{FetchFailed, Success, TaskEndReason}
import spark.storage.{BlockManagerId, BlockManagerMaster}
import spark.scheduler.cluster.Pool
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
@ -49,6 +53,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {

View file

@ -57,7 +57,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")

View file

@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {

View file

@ -43,7 +43,7 @@ Finally, the following configuration options can be passed to the master and wor
</tr>
<tr>
<td><code>-p PORT</code>, <code>--port PORT</code></td>
<td>IP address or DNS name to listen on (default: 7077 for master, random for worker)</td>
<td>Port for service to listen on (default: 7077 for master, random for worker)</td>
</tr>
<tr>
<td><code>--webui-port PORT</code></td>

View file

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.examples;
import scala.Tuple2;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
import spark.api.java.function.PairFlatMapFunction;
import spark.api.java.function.PairFunction;
import java.util.List;
import java.util.ArrayList;
/**
* Computes the PageRank of URLs from an input file. Input file should
* be in format of:
* URL neighbor URL
* URL neighbor URL
* URL neighbor URL
* ...
* where URL and their neighbors are separated by space(s).
*/
public class JavaPageRank {
private static double sum(List<Double> numbers) {
double out = 0.0;
for (double number : numbers) {
out += number;
}
return out;
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
System.exit(1);
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = s.split("\\s+");
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
public Double call(List<String> rs) throws Exception {
return 1.0;
}
});
// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : s._1) {
results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
}
return results;
}
});
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.groupByKey().mapValues(new Function<List<Double>, Double>() {
@Override
public Double call(List<Double> cs) throws Exception {
return 0.15 + sum(cs) * 0.85;
}
});
}
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2 tuple : output) {
System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
}
System.exit(0);
}
}

View file

@ -418,6 +418,7 @@ object ALS {
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
System.setProperty("spark.kryo.referenceTracking", "false")
System.setProperty("spark.kryoserializer.buffer.mb", "8")
System.setProperty("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>

View file

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.mllib.recommendation
import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
import spark.mllib.util.MLUtils
/**
* Generate RDD(s) containing data for Matrix Factorization.
*
* This method samples training entries according to the oversampling factor
* 'trainSampFact', which is a multiplicative factor of the number of
* degrees of freedom of the matrix: rank*(m+n-rank).
*
* It optionally samples entries for a testing matrix using
* 'testSampFact', the percentage of the number of training entries
* to use for testing.
*
* This method takes the following inputs:
* sparkMaster (String) The master URL.
* outputPath (String) Directory to save output.
* m (Int) Number of rows in data matrix.
* n (Int) Number of columns in data matrix.
* rank (Int) Underlying rank of data matrix.
* trainSampFact (Double) Oversampling factor.
* noise (Boolean) Whether to add gaussian noise to training data.
* sigma (Double) Standard deviation of added gaussian noise.
* test (Boolean) Whether to create testing RDD.
* testSampFact (Double) Percentage of training data to use as test data.
*/
object MFDataGenerator{
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: MFDataGenerator " +
"<master> <outputDir> [m] [n] [rank] [trainSampFact] [noise] [sigma] [test] [testSampFact]")
System.exit(1)
}
val sparkMaster: String = args(0)
val outputPath: String = args(1)
val m: Int = if (args.length > 2) args(2).toInt else 100
val n: Int = if (args.length > 3) args(3).toInt else 100
val rank: Int = if (args.length > 4) args(4).toInt else 10
val trainSampFact: Double = if (args.length > 5) args(5).toDouble else 1.0
val noise: Boolean = if (args.length > 6) args(6).toBoolean else false
val sigma: Double = if (args.length > 7) args(7).toDouble else 0.1
val test: Boolean = if (args.length > 8) args(8).toBoolean else false
val testSampFact: Double = if (args.length > 9) args(9).toDouble else 0.1
val sc = new SparkContext(sparkMaster, "MFDataGenerator")
val A = DoubleMatrix.randn(m, rank)
val B = DoubleMatrix.randn(rank, n)
val z = 1 / (scala.math.sqrt(scala.math.sqrt(rank)))
A.mmuli(z)
B.mmuli(z)
val fullData = A.mmul(B)
val df = rank * (m + n - rank)
val sampSize = scala.math.min(scala.math.round(trainSampFact * df),
scala.math.round(.99 * m * n)).toInt
val rand = new Random()
val mn = m * n
val shuffled = rand.shuffle(1 to mn toIterable)
val omega = shuffled.slice(0, sampSize)
val ordered = omega.sortWith(_ < _).toArray
val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered)
.map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
// optionally add gaussian noise
if (noise) {
trainData.map(x => (x._1, x._2, x._3 + rand.nextGaussian * sigma))
}
trainData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
// optionally generate testing data
if (test) {
val testSampSize = scala.math
.min(scala.math.round(sampSize * testSampFact),scala.math.round(mn - sampSize)).toInt
val testOmega = shuffled.slice(sampSize, sampSize + testSampSize)
val testOrdered = testOmega.sortWith(_ < _).toArray
val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered)
.map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
}
sc.stop()
}
}

View file

@ -178,7 +178,7 @@ object SparkBuild extends Build {
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"net.liftweb" % "lift-json_2.9.2" % "2.5",
"org.apache.mesos" % "mesos" % "0.12.0-incubating",
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"com.codahale.metrics" % "metrics-core" % "3.0.0",

View file

@ -16,7 +16,8 @@
#
"""
This example requires numpy (http://www.numpy.org/)
A logistic regression implementation that uses NumPy (http://www.numpy.org) to act on batches
of input data using efficient matrix operations.
"""
from collections import namedtuple
from math import exp
@ -27,47 +28,45 @@ import numpy as np
from pyspark import SparkContext
N = 100000 # Number of data points
D = 10 # Number of dimensions
R = 0.7 # Scaling factor
ITERATIONS = 5
np.random.seed(42)
DataPoint = namedtuple("DataPoint", ['x', 'y'])
from logistic_regression import DataPoint # So that DataPoint is properly serialized
def generateData():
def generatePoint(i):
y = -1 if i % 2 == 0 else 1
x = np.random.normal(size=D) + (y * R)
return DataPoint(x, y)
return [generatePoint(i) for i in range(N)]
# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
# make further computations faster.
# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
def readPointBatch(iterator):
strs = list(iterator)
matrix = np.zeros((len(strs), D + 1))
for i in xrange(len(strs)):
matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
return [matrix]
if __name__ == "__main__":
if len(sys.argv) == 1:
print >> sys.stderr, "Usage: logistic_regression <master> [<slices>]"
if len(sys.argv) != 4:
print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
points = sc.parallelize(generateData(), slices).cache()
points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache()
iterations = int(sys.argv[3])
# Initialize w to a random value
w = 2 * np.random.ranf(size=D) - 1
print "Initial w: " + str(w)
# Compute logistic regression gradient for a matrix of data points
def gradient(matrix, w):
Y = matrix[:,0] # point labels (first column of input file)
X = matrix[:,1:] # point coordinates
# For each point (x, y), compute gradient function, then sum these up
return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
def add(x, y):
x += y
return x
for i in range(1, ITERATIONS + 1):
print "On iteration %i" % i
gradient = points.map(lambda p:
(1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x
).reduce(add)
w -= gradient
for i in range(iterations):
print "On iteration %i" % (i + 1)
w -= points.map(lambda m: gradient(m, w)).reduce(add)
print "Final w: " + str(w)