Merge branch 'master' of git://github.com/mesos/spark into sgd-cleanup

Conflicts:
	mllib/src/main/scala/spark/mllib/util/MLUtils.scala
This commit is contained in:
Shivaram Venkataraman 2013-08-06 21:21:55 -07:00
commit 338b7a7455
69 changed files with 1529 additions and 479 deletions

View file

@ -62,43 +62,31 @@
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<artifactId>spark-mllib</artifactId>
<classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<classifier>javadoc</classifier>
<version>0.8.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<classifier>sources</classifier>
<version>0.8.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId>
<classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<classifier>${classifier.name}</classifier>
<version>0.8.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View file

@ -49,7 +49,7 @@
<include>org.spark-project:*:jar</include>
</includes>
<excludes>
<exclude>org.spark-project:spark-dist:jar</exclude>
<exclude>org.spark-project:spark-assembly:jar</exclude>
</excludes>
</dependencySet>
<dependencySet>

View file

@ -1,48 +1,45 @@
# syntax: [instance].[sink|source].[name].[options]
# syntax: [instance].sink|source.[name].[options]=[value]
# "instance" specify "who" (the role) use metrics system. In spark there are
# several roles like master, worker, executor, driver, these roles will
# create metrics system for monitoring. So instance represents these roles.
# Currently in Spark, several instances have already implemented: master,
# worker, executor, driver.
# This file configures Spark's internal metrics system. The metrics system is
# divided into instances which correspond to internal components.
# Each instance can be configured to report its metrics to one or more sinks.
# Accepted values for [instance] are "master", "worker", "executor", "driver",
# and "applications". A wild card "*" can be used as an instance name, in
# which case all instances will inherit the supplied property.
#
# [instance] field can be "master", "worker", "executor", "driver", which means
# only the specified instance has this property.
# a wild card "*" can be used to represent instance name, which means all the
# instances will have this property.
# Within an instance, a "source" specifies a particular set of grouped metrics.
# there are two kinds of sources:
# 1. Spark internal sources, like MasterSource, WorkerSource, etc, which will
# collect a Spark component's internal state. Each instance is paired with a
# Spark source that is added automatically.
# 2. Common sources, like JvmSource, which will collect low level state.
# These can be added through configuration options and are then loaded
# using reflection.
#
# "source" specify "where" (source) to collect metrics data. In metrics system,
# there exists two kinds of source:
# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
# collect Spark component's internal state, these sources are related to
# instance and will be added after specific metrics system is created.
# 2. Common source, like JvmSource, which will collect low level state, is
# configured by configuration and loaded through reflection.
# A "sink" specifies where metrics are delivered to. Each instance can be
# assigned one or more sinks.
#
# "sink" specify "where" (destination) to output metrics data to. Several sinks
# can be coexisted and flush metrics to all these sinks.
# The sink|source field specifies whether the property relates to a sink or
# source.
#
# [sink|source] field specify this property is source related or sink, this
# field can only be source or sink.
# The [name] field specifies the name of source or sink.
#
# [name] field specify the name of source or sink, this is custom defined.
#
# [options] field is the specific property of this source or sink, this source
# or sink is responsible for parsing this property.
# The [options] field is the specific property of this source or sink. The
# source or sink is responsible for parsing this property.
#
# Notes:
# 1. Sinks should be added through configuration, like console sink, class
# full name should be specified by class property.
# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
# it should be attention minimal polling period is 1 seconds, any period
# below than 1s is illegal.
# 3. Wild card property can be overlapped by specific instance property, for
# example, *.sink.console.period can be overlapped by master.sink.console.period.
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
# 4. A metrics specific configuration
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
# added to Java property using -Dspark.metrics.conf=xxx if you want to
# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
# metrics system will search and load it automatically.
# added to Java properties using -Dspark.metrics.conf=xxx if you want to
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
# and it will be loaded automatically.
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink

View file

@ -48,6 +48,10 @@
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>

View file

@ -47,3 +47,31 @@
padding-top: 7px;
padding-left: 4px;
}
.table td {
vertical-align: middle !important;
}
.progress-completed .bar,
.progress .bar-completed {
background-color: #b3def9;
background-image: -moz-linear-gradient(top, #addfff, #badcf2);
background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#addfff), to(#badcf2));
background-image: -webkit-linear-gradient(top, #addfff, #badcf2);
background-image: -o-linear-gradient(top, #addfff, #badcf2);
background-image: linear-gradient(to bottom, #addfff, #badcf2);
background-repeat: repeat-x;
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffaddfff', endColorstr='#ffbadcf2', GradientType=0);
}
.progress-running .bar,
.progress .bar-running {
background-color: #c2ebfa;
background-image: -moz-linear-gradient(top, #bdedff, #c7e8f5);
background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#bdedff), to(#c7e8f5));
background-image: -webkit-linear-gradient(top, #bdedff, #c7e8f5);
background-image: -o-linear-gradient(top, #bdedff, #c7e8f5);
background-image: linear-gradient(to bottom, #bdedff, #c7e8f5);
background-repeat: repeat-x;
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffbdedff', endColorstr='#ffc7e8f5', GradientType=0);
}

View file

@ -65,17 +65,9 @@ object Partitioner {
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
def getPartition(key: Any): Int = {
if (key == null) {
return 0
} else {
val mod = key.hashCode % partitions
if (mod < 0) {
mod + partitions
} else {
mod // Guard against negative hash codes
}
}
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {

View file

@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.DynamicVariable
import scala.collection.mutable.{ConcurrentMap, HashMap}
@ -60,8 +61,10 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
@ -125,6 +128,8 @@ class SparkContext(
private[spark] val ui = new SparkUI(this)
ui.bind()
val startTime = System.currentTimeMillis()
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach { addJar(_) }
@ -262,12 +267,18 @@ class SparkContext(
localProperties.value = new Properties()
}
def addLocalProperties(key: String, value: String) {
def addLocalProperty(key: String, value: String) {
if(localProperties.value == null) {
localProperties.value = new Properties()
}
localProperties.value.setProperty(key,value)
}
/** Set a human readable description of the current job. */
def setDescription(value: String) {
addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
// Post init
taskScheduler.postStartHook()
@ -574,6 +585,28 @@ class SparkContext(
env.blockManager.master.getStorageStatus
}
/**
* Return pools for fair scheduler
* TODO(xiajunluan): We should take nested pools into account
*/
def getAllPools: ArrayBuffer[Schedulable] = {
taskScheduler.rootPool.schedulableQueue
}
/**
* Return the pool associated with the given name, if one exists
*/
def getPoolForName(pool: String): Option[Schedulable] = {
taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
}
/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
@ -816,6 +849,7 @@ class SparkContext(
* various Spark features.
*/
object SparkContext {
val SPARK_JOB_DESCRIPTION = "spark.job.description"
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
@ -933,7 +967,6 @@ object SparkContext {
}
}
/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
@ -945,3 +978,4 @@ private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

View file

@ -97,13 +97,26 @@ class SparkEnv (
object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
@volatile private var lastSetSparkEnv : SparkEnv = _
def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
}
/**
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
* previously set in any thread.
*/
def get: SparkEnv = {
env.get()
Option(env.get()).getOrElse(lastSetSparkEnv)
}
/**
* Returns the ThreadLocal SparkEnv.
*/
def getThreadLocal : SparkEnv = {
env.get()
}
def createFromSystemProperties(

View file

@ -596,7 +596,7 @@ private object Utils extends Logging {
output.toString
}
/**
/**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
@ -756,4 +756,13 @@ private object Utils extends Logging {
}
return buf
}
/* Calculates 'x' modulo 'mod', takes to consideration sign of x,
* i.e. if 'x' is negative, than 'x' % 'mod' is negative too
* so function return (x % mod) + mod in that case.
*/
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
}

View file

@ -18,7 +18,7 @@
package spark.api.python
import spark.Partitioner
import spark.Utils
import java.util.Arrays
/**
@ -35,25 +35,10 @@ private[spark] class PythonPartitioner(
val pyPartitionFunctionId: Long)
extends Partitioner {
override def getPartition(key: Any): Int = {
if (key == null) {
return 0
}
else {
val hashCode = {
if (key.isInstanceOf[Array[Byte]]) {
Arrays.hashCode(key.asInstanceOf[Array[Byte]])
} else {
key.hashCode()
}
}
val mod = hashCode % numPartitions
if (mod < 0) {
mod + numPartitions
} else {
mod // Guard against negative hash codes
}
}
override def getPartition(key: Any): Int = key match {
case null => 0
case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
}
override def equals(other: Any): Boolean = other match {

View file

@ -17,21 +17,20 @@
package spark.broadcast
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.io._
import java.net._
import java.util.UUID
import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
import java.net.URL
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark._
import spark.{HttpServer, Logging, SparkEnv, Utils}
import spark.io.CompressionCodec
import spark.storage.StorageLevel
import util.{MetadataCleaner, TimeStampedHashSet}
import spark.util.{MetadataCleaner, TimeStampedHashSet}
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@ -85,6 +84,7 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) {
synchronized {
@ -122,10 +122,12 @@ private object HttpBroadcast extends Logging {
def write(id: Long, value: Any) {
val file = new File(broadcastDir, "broadcast-" + id)
val out: OutputStream = if (compress) {
new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
@ -136,10 +138,12 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = {
val url = serverUri + "/broadcast-" + id
var in = if (compress) {
new LZFInputStream(new URL(url).openStream()) // Does its own buffering
} else {
new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
val in = {
if (compress) {
compressionCodec.compressedInputStream(new URL(url).openStream())
} else {
new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)

View file

@ -109,6 +109,7 @@ private[deploy] object DeployMessages {
}
// WorkerWebUI to Worker
case object RequestWorkerState
// Worker to WorkerWebUI
@ -120,4 +121,9 @@ private[deploy] object DeployMessages {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
}
// Actor System to Master
case object CheckForWorkerTimeOut
}

View file

@ -34,6 +34,7 @@ private[spark] class ApplicationInfo(
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L
val appSource = new ApplicationSource(this)
private var nextExecutorId = 0
@ -51,8 +52,10 @@ private[spark] class ApplicationInfo(
}
def removeExecutor(exec: ExecutorInfo) {
executors -= exec.id
coresGranted -= exec.cores
if (executors.contains(exec.id)) {
executors -= exec.id
coresGranted -= exec.cores
}
}
def coresLeft: Int = desc.maxCores - coresGranted

View file

@ -0,0 +1,24 @@
package spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
class ApplicationSource(val application: ApplicationInfo) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.%s.%s".format("application", application.desc.name,
System.currentTimeMillis())
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
override def getValue: String = application.state.toString
})
metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
override def getValue: Long = application.duration
})
metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
override def getValue: Int = application.coresGranted
})
}

View file

@ -38,7 +38,9 @@ import spark.util.AkkaUtils
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
@ -59,7 +61,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
val metricsSystem = MetricsSystem.createMetricsSystem("master")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
val masterPublicAddress = {
@ -77,15 +80,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
metricsSystem.registerSource(masterSource)
metricsSystem.start()
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
}
override def postStop() {
webUi.stop()
metricsSystem.stop()
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
}
override def receive = {
@ -171,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
}
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
}
/**
@ -275,6 +284,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val now = System.currentTimeMillis()
val date = new Date(now)
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
@ -300,7 +310,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp -= app.id
actorToApp -= app.driver
addressToApp -= app.driver.path.address
completedApps += app // Remember it in our history
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
@ -325,12 +342,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
for (worker <- toRemove) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT))
removeWorker(worker)
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT/1000))
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}

View file

@ -25,17 +25,25 @@ import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
import spark.Utils
import spark.deploy.DeployWebUI
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import spark.deploy.JsonProtocol
import spark.deploy.master.{ApplicationInfo, WorkerInfo}
import spark.ui.UIUtils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.master
implicit val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]

View file

@ -61,6 +61,7 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
("*", (request: HttpServletRequest) => indexPage.render(request))
)

View file

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.io
import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
/**
* CompressionCodec allows the customization of choosing different compression implementations
* to be used in block storage.
*/
trait CompressionCodec {
def compressedOutputStream(s: OutputStream): OutputStream
def compressedInputStream(s: InputStream): InputStream
}
private[spark] object CompressionCodec {
def createCodec(): CompressionCodec = {
// Set the default codec to Snappy since the LZF implementation initializes a pretty large
// buffer for every stream, which results in a lot of memory overhead when the number of
// shuffle reduce buckets are large.
createCodec(classOf[SnappyCompressionCodec].getName)
}
def createCodec(codecName: String): CompressionCodec = {
Class.forName(
System.getProperty("spark.io.compression.codec", codecName),
true,
Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
}
}
/**
* LZF implementation of [[spark.io.CompressionCodec]].
*/
class LZFCompressionCodec extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
}
override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
}
/**
* Snappy implementation of [[spark.io.CompressionCodec]].
* Block size can be configured by spark.io.compression.snappy.block.size.
*/
class SnappyCompressionCodec extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}

View file

@ -17,7 +17,7 @@
package spark.metrics
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
@ -93,6 +93,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
}
def removeSource(source: Source) {
sources -= source
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
})
}
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

View file

@ -88,6 +88,7 @@ class HadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val conf = confBroadcast.value.value

View file

@ -73,6 +73,7 @@ class NewHadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)

View file

@ -510,6 +510,12 @@ class DAGScheduler(
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties
sparkListeners.foreach(_.onStageSubmitted(
SparkListenerStageSubmitted(stage, tasks.size, properties)))
if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
@ -524,11 +530,9 @@ class DAGScheduler(
return
}
sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
val properties = idToActiveJob(stage.priority).properties
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
if (!stage.submissionTime.isDefined) {

View file

@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
import spark._
import spark.SparkContext
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
@ -62,7 +63,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
event match {
case SparkListenerJobStart(job, properties) =>
processJobStartEvent(job, properties)
case SparkListenerStageSubmitted(stage, taskSize) =>
case SparkListenerStageSubmitted(stage, taskSize, properties) =>
processStageSubmittedEvent(stage, taskSize)
case StageCompleted(stageInfo) =>
processStageCompletedEvent(stageInfo)
@ -317,8 +318,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
val annotation = properties.getProperty("spark.job.annotation", "")
jobLogInfo(jobID, annotation, false)
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
jobLogInfo(jobID, description, false)
}
}

View file

@ -118,6 +118,7 @@ private[spark] class ResultTask[T, U](
out.write(bytes)
out.writeInt(partition)
out.writeInt(outputId)
out.writeLong(generation)
out.writeObject(split)
}
}
@ -132,6 +133,7 @@ private[spark] class ResultTask[T, U](
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
val outputId = in.readInt()
generation = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
}

View file

@ -18,16 +18,9 @@
package spark.scheduler
import java.io._
import java.util.{HashMap => JHashMap}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConversions._
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import com.ning.compress.lzf.LZFInputStream
import com.ning.compress.lzf.LZFOutputStream
import scala.collection.mutable.HashMap
import spark._
import spark.executor.ShuffleWriteMetrics
@ -109,11 +102,7 @@ private[spark] class ShuffleMapTask(
preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
}
var split = if (rdd == null) {
null
} else {
rdd.partitions(partition)
}
var split = if (rdd == null) null else rdd.partitions(partition)
override def writeExternal(out: ObjectOutput) {
RDDCheckpointData.synchronized {

View file

@ -25,7 +25,8 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
@ -34,10 +35,10 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
extends SparkListenerEvents
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
trait SparkListener {
@ -45,7 +46,7 @@ trait SparkListener {
* Called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted) { }
/**
* Called when a stage is submitted
*/
@ -65,12 +66,12 @@ trait SparkListener {
* Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
/**
* Called when a job ends
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
}
/**

View file

@ -17,6 +17,8 @@
package spark.scheduler
import spark.scheduler.cluster.Pool
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
@ -25,6 +27,11 @@ package spark.scheduler
* the TaskSchedulerListener interface.
*/
private[spark] trait TaskScheduler {
def rootPool: Pool
def schedulingMode: SchedulingMode
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).

View file

@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet
import spark._
import spark.TaskState.TaskState
import spark.scheduler._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
@ -114,6 +115,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.cluster.schedulingmode", "FIFO"))
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
@ -121,15 +125,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) {
backend = context
//default scheduler is FIFO
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case "FIFO" =>
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case "FAIR" =>
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}
@ -204,7 +206,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered")
"check your cluster UI to ensure that workers are registered " +
"and have sufficient memory")
} else {
this.cancel()
}
@ -270,10 +273,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
var launchedTask = false
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue)
{
logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
for (manager <- sortedTaskSetQueue) {
logDebug("parentName:%s, name:%s, runningTasks:%s".format(
manager.parent.name, manager.name, manager.runningTasks))
}
for (manager <- sortedTaskSetQueue) {
// Split offers based on node local, rack local and off-rack tasks.

View file

@ -85,7 +85,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4
val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
@ -107,9 +107,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_" + taskSet.stageId.toString
var name = "TaskSet_"+taskSet.stageId.toString
var parent: Schedulable = null
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@ -697,18 +696,18 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
}
// TODO: for now we just find Pool not TaskSetManager,
// TODO(xiajunluan): for now we just find Pool not TaskSetManager
// we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
override def addSchedulable(schedulable:Schedulable) {
//nothing
// nothing
}
override def removeSchedulable(schedulable:Schedulable) {
//nothing
// nothing
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {

View file

@ -17,14 +17,18 @@
package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import scala.collection.mutable.ArrayBuffer
/**
* An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
var parent: Schedulable
// child queues
def schedulableQueue: ArrayBuffer[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def runningTasks: Int

View file

@ -41,10 +41,11 @@ private[spark] trait SchedulableBuilder {
def addTaskSetManager(manager: Schedulable, properties: Properties)
}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
override def buildPools() {
//nothing
// nothing
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
@ -52,7 +53,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
@ -103,9 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
//finally create "default" pool
// finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
@ -119,8 +122,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) {
//we will create a new pool that user has configured in app instead of being defined in xml file
parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
// we will create a new pool that user has configured in app
// instead of being defined in xml file
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))

View file

@ -17,8 +17,13 @@
package spark.scheduler.cluster
object SchedulingMode extends Enumeration("FAIR","FIFO"){
/**
* "FAIR" and "FIFO" determines which policy is used
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
type SchedulingMode = Value
val FAIR,FIFO = Value
val FAIR,FIFO,NONE = Value
}

View file

@ -23,7 +23,10 @@ import spark.TaskState.TaskState
import spark.scheduler.TaskSet
private[spark] trait TaskSetManager extends Schedulable {
def schedulableQueue = null
def schedulingMode = SchedulingMode.NONE
def taskSet: TaskSet
def slaveOffer(

View file

@ -29,6 +29,7 @@ import spark.TaskState.TaskState
import spark.executor.ExecutorURLClassLoader
import spark.scheduler._
import spark.scheduler.cluster._
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import akka.actor._
/**
@ -85,6 +86,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.cluster.schedulingmode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@ -92,15 +95,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var localActor: ActorRef = null
override def start() {
//default scheduler is FIFO
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case "FIFO" =>
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case "FAIR" =>
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}

View file

@ -63,11 +63,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def addSchedulable(schedulable: Schedulable): Unit = {
//nothing
// nothing
}
override def removeSchedulable(schedulable: Schedulable): Unit = {
//nothing
// nothing
}
override def getSchedulableByName(name: String): Schedulable = {
@ -75,7 +75,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def executorLost(executorId: String, host: String): Unit = {
//nothing
// nothing
}
override def checkSpeculatableTasks() = true

View file

@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val runScript = new File(sparkHome, "run").getCanonicalPath
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend(
.setValue(value)
.build())
}
return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
val runScript = new File(sparkHome, "run").getCanonicalPath
command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
return command.build()
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}

View file

@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend(
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend(
.setValue(value)
.build())
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build()
val command = CommandInfo.newBuilder()
.setValue(execScript)
.setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)

View file

@ -27,11 +27,10 @@ import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{Logging, SparkEnv, SparkException, Utils}
import spark.io.CompressionCodec
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@ -158,6 +157,13 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
// The compression codec to use. Note that the "lazy" val is necessary because we want to delay
// the initialization of the compression codec until it is first used. The reason is that a Spark
// program could be using a user-defined codec in a third party jar, which is loaded in
// Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
// loaded yet.
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
@ -919,18 +925,14 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
if (shouldCompress(blockId)) {
(new LZFOutputStream(s)).setFinishBlockOnFlush(true)
} else {
s
}
if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}
/**
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
if (shouldCompress(blockId)) new LZFInputStream(s) else s
if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}
def dataSerialize(

View file

@ -66,7 +66,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
objOut.close()
bs.close()
channel = null
bs = null
objOut = null

View file

@ -28,14 +28,14 @@ private[spark] object UIUtils {
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
: Seq[Node] = {
val storage = page match {
case Storage => <li class="active"><a href="/storage">Storage</a></li>
case _ => <li><a href="/storage">Storage</a></li>
}
val jobs = page match {
case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
case _ => <li><a href="/stages">Jobs</a></li>
}
val storage = page match {
case Storage => <li class="active"><a href="/storage">Storage</a></li>
case _ => <li><a href="/storage">Storage</a></li>
}
val environment = page match {
case Environment => <li class="active"><a href="/environment">Environment</a></li>
case _ => <li><a href="/environment">Environment</a></li>
@ -65,18 +65,14 @@ private[spark] object UIUtils {
<div class="navbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></div>
<a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
<ul class="nav">
{storage}
{jobs}
{storage}
{environment}
{executors}
</ul>
<ul id="infolist">
<li>Application: <strong>{sc.appName}</strong></li>
<li>Master: <strong>{sc.master}</strong></li>
<li>Executors: <strong>{sc.getExecutorStorageStatus.size}</strong></li>
</ul>
<p class="navbar-text pull-right">Application: <strong>{sc.appName}</strong></p>
</div>
</div>
</div>
@ -117,9 +113,9 @@ private[spark] object UIUtils {
<img src="/static/spark_logo.png" />
</div>
<div class="span10">
<h1 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
<h3 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
{title}
</h1>
</h3>
</div>
</div>
{content}

View file

@ -21,7 +21,8 @@ import scala.util.Random
import spark.SparkContext
import spark.SparkContext._
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
@ -29,18 +30,29 @@ import spark.SparkContext._
*/
private[spark] object UIWorkloadGenerator {
val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 500
val INTER_JOB_WAIT_MS = 5000
def main(args: Array[String]) {
if (args.length < 2) {
println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
val master = args(0)
val schedulingMode = SchedulingMode.withName(args(1))
val appName = "Spark UI Tester"
if (schedulingMode == SchedulingMode.FAIR) {
System.setProperty("spark.cluster.schedulingmode", "FAIR")
}
val sc = new SparkContext(master, appName)
// NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
// but we pass it here anyways since it will be useful once we do.
def setName(s: String) = {
sc.addLocalProperties("spark.job.annotation", s)
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
}
sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
def nextFloat() = (new Random()).nextFloat()
@ -73,14 +85,18 @@ private[spark] object UIWorkloadGenerator {
while (true) {
for ((desc, job) <- jobs) {
try {
setName(desc)
job()
println("Job funished: " + desc)
} catch {
case e: Exception =>
println("Job Failed: " + desc)
}
new Thread {
override def run() {
try {
setProperties(desc)
job()
println("Job funished: " + desc)
} catch {
case e: Exception =>
println("Job Failed: " + desc)
}
}
}.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}

View file

@ -44,7 +44,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString),
("Scala Home", Properties.scalaHome)
)
).sorted
def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
def jvmTable = UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation)
@ -53,8 +53,8 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
.filter{case (k, v) => k.contains("java.class.path")}
.headOption
.getOrElse("", "")
val sparkProperties = properties.filter(_._1.startsWith("spark"))
val otherProperties = properties.diff(sparkProperties :+ classPathProperty)
val sparkProperties = properties.filter(_._1.startsWith("spark")).sorted
val otherProperties = properties.diff(sparkProperties :+ classPathProperty).sorted
val propertyHeaders = Seq("Name", "Value")
def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
@ -67,7 +67,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
val addedFiles = sc.addedFiles.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
val classPath = addedJars ++ addedFiles ++ classPathEntries
val classPath = (addedJars ++ addedFiles ++ classPathEntries).sorted
val classPathHeaders = Seq("Resource", "Source")
def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>

View file

@ -17,25 +17,18 @@
package spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.HashSet
import scala.Some
import scala.xml.{NodeSeq, Node}
import spark.scheduler.cluster.TaskInfo
import spark.scheduler.Stage
import spark.storage.StorageLevel
import spark.scheduler.cluster.SchedulingMode
import spark.ui.Page._
import spark.ui.UIUtils._
import spark.Utils
/** Page showing list of all ongoing and recently finished stages */
/** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) {
def listener = parent.listener
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
val activeStages = listener.activeStages.toSeq
@ -48,29 +41,19 @@ private[spark] class IndexPage(parent: JobProgressUI) {
activeTime += t.timeRunning(now)
}
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Stage Id</th>
<th>Origin</th>
<th>Submitted</th>
<th>Duration</th>
<th colspan="2">Tasks: Complete/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
<th>Stored RDD</th>
</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>
</table>
}
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
val poolTable = new PoolTable(listener.sc.getAllPools, listener)
val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<li>
<strong>Duration: </strong>
{parent.formatDuration(now - listener.sc.startTime)}
</li>
<li>
<strong>CPU time: </strong>
{parent.formatDuration(listener.totalTime + activeTime)}
</li>
@ -86,78 +69,35 @@ private[spark] class IndexPage(parent: JobProgressUI) {
{Utils.memoryBytesToString(listener.totalShuffleWrite)}
</li>
}
<li>
<a href="#active"><strong>Active Stages:</strong></a>
{activeStages.size}
</li>
<li>
<a href="#completed"><strong>Completed Stages:</strong></a>
{completedStages.size}
</li>
<li>
<a href="#failed"><strong>Failed Stages:</strong></a>
{failedStages.size}
</li>
<li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
</ul>
</div>
val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
val completedStageTable = stageTable(stageRow, completedStages)
val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
val content = summary ++
<h2>Active Stages</h2> ++ activeStageTable ++
<h2>Completed Stages</h2> ++ completedStageTable ++
<h2>Failed Stages</h2> ++ failedStageTable
val content = summary ++
{if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
<h3>Pools</h3> ++ poolTable.toNodeSeq
} else {
Seq()
}} ++
<h3 id="active">Active Stages : {activeStages.size}</h3> ++
activeStagesTable.toNodeSeq++
<h3 id="completed">Completed Stages : {completedStages.size}</h3> ++
completedStagesTable.toNodeSeq++
<h3 id ="failed">Failed Stages : {failedStages.size}</h3> ++
failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
}
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
<div class="progress" style="height: 15px; margin-bottom: 0px">
<div class="bar" style={completeWidth}></div>
<div class="bar bar-info" style={startWidth}></div>
</div>
}
def stageRow(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
<tr>
<td>{s.id}</td>
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td>
<td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ =>
}}
</td>
<td>{shuffleRead}</td>
<td>{shuffleWrite}</td>
<td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
<a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
{Option(s.rdd.name).getOrElse(s.rdd.id)}
</a>
}}
</td>
</tr>
}
}

View file

@ -0,0 +1,167 @@
package spark.ui.jobs
import scala.Seq
import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
// How many stages to remember
val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
val DEFAULT_POOL_NAME = "default"
val stageToPool = new HashMap[Stage, String]()
val stageToDescription = new HashMap[Stage, String]()
val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
val activeStages = HashSet[Stage]()
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
// Total metrics reflect metrics only for completed tasks
var totalTime = 0L
var totalShuffleRead = 0L
var totalShuffleWrite = 0L
val stageToTime = HashMap[Int, Long]()
val stageToShuffleRead = HashMap[Int, Long]()
val stageToShuffleWrite = HashMap[Int, Long]()
val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = {
val stage = stageCompleted.stageInfo.stage
poolToActiveStages(stageToPool(stage)) -= stage
activeStages -= stage
completedStages += stage
trimIfNecessary(completedStages)
}
/** If stages is too large, remove and garbage collect old stages */
def trimIfNecessary(stages: ListBuffer[Stage]) {
if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id)
stageToTime.remove(s.id)
stageToShuffleRead.remove(s.id)
stageToShuffleWrite.remove(s.id)
stageToTasksActive.remove(s.id)
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
stageToPool.remove(s)
if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
})
stages.trimEnd(toRemove)
}
}
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
val stage = stageSubmitted.stage
activeStages += stage
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
stageToPool(stage) = poolName
val description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
description.map(d => stageToDescription(stage) = d)
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
stages += stage
}
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
stageToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time
totalTime += time
stageToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead
stageToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
jobEnd match {
case end: SparkListenerJobEnd =>
end.jobResult match {
case JobFailed(ex, Some(stage)) =>
activeStages -= stage
poolToActiveStages(stageToPool(stage)) -= stage
failedStages += stage
trimIfNecessary(failedStages)
case _ =>
}
case _ =>
}
}
/** Is this stage's input from a shuffle read. */
def hasShuffleRead(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
}
return false // No tasks have finished for this stage
}
/** Is this stage's output to a shuffle write. */
def hasShuffleWrite(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
}
return false // No tasks have finished for this stage
}
}

View file

@ -31,9 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import spark.ui.JettyUtils._
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import collection.mutable
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) {
@ -43,9 +43,10 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
private val poolPage = new PoolPage(this)
def start() {
_listener = Some(new JobProgressListener)
_listener = Some(new JobProgressListener(sc))
sc.addSparkListener(listener)
}
@ -53,120 +54,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
("/stages", (request: HttpServletRequest) => indexPage.render(request))
)
}
private[spark] class JobProgressListener extends SparkListener {
// How many stages to remember
val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
val activeStages = HashSet[Stage]()
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
// Total metrics reflect metrics only for completed tasks
var totalTime = 0L
var totalShuffleRead = 0L
var totalShuffleWrite = 0L
val stageToTime = HashMap[Int, Long]()
val stageToShuffleRead = HashMap[Int, Long]()
val stageToShuffleWrite = HashMap[Int, Long]()
val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = {
val stage = stageCompleted.stageInfo.stage
activeStages -= stage
completedStages += stage
trimIfNecessary(completedStages)
}
/** If stages is too large, remove and garbage collect old stages */
def trimIfNecessary(stages: ListBuffer[Stage]) {
if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id)
stageToTime.remove(s.id)
stageToShuffleRead.remove(s.id)
stageToShuffleWrite.remove(s.id)
stageToTasksActive.remove(s.id)
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
})
stages.trimEnd(toRemove)
}
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
activeStages += stageSubmitted.stage
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
stageToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time
totalTime += time
stageToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead
stageToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
jobEnd match {
case end: SparkListenerJobEnd =>
end.jobResult match {
case JobFailed(ex, Some(stage)) =>
activeStages -= stage
failedStages += stage
trimIfNecessary(failedStages)
case _ =>
}
case _ =>
}
}
}

View file

@ -0,0 +1,30 @@
package spark.ui.jobs
import javax.servlet.http.HttpServletRequest
import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashSet
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
/** Page showing specific pool details */
private[spark] class PoolPage(parent: JobProgressUI) {
def listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
val pool = listener.sc.getPoolForName(poolName).get
val poolTable = new PoolTable(Seq(pool), listener)
val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
<h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
}
}

View file

@ -0,0 +1,49 @@
package spark.ui.jobs
import scala.xml.Node
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.scheduler.Stage
import spark.scheduler.cluster.Schedulable
/** Table showing list of pools */
private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
def toNodeSeq(): Seq[Node] = {
poolTable(poolRow, pools)
}
// pool tables
def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
rows: Seq[Schedulable]
): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Pool Name</th>
<th>Minimum Share</th>
<th>Pool Weight</th>
<td>Active Stages</td>
<td>Running Tasks</td>
<td>SchedulingMode</td>
</thead>
<tbody>
{rows.map(r => makeRow(r, poolToActiveStages))}
</tbody>
</table>
}
def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
<tr>
<td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td>
<td>{p.runningTasks}</td>
<td>{p.schedulingMode}</td>
</tr>
}
}

View file

@ -0,0 +1,116 @@
package spark.ui.jobs
import java.util.Date
import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest
import scala.Some
import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.Utils
import spark.storage.StorageLevel
/** Page showing list of all ongoing and recently finished stages */
private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
val listener = parent.listener
val dateFmt = parent.dateFmt
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
def toNodeSeq(): Seq[Node] = {
stageTable(stageRow, stages)
}
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Stage Id</th>
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
<td>Duration</td>
<td>Tasks: Succeeded/Total</td>
<td>Shuffle Read</td>
<td>Shuffle Write</td>
</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>
</table>
}
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
<div class="progress" style="height: 15px; margin-bottom: 0px; position: relative">
<span style="text-align:center; position:absolute; width:100%;">
{completed}/{total} {failed}
</span>
<div class="bar bar-completed" style={completeWidth}></div>
<div class="bar bar-running" style={startWidth}></div>
</div>
}
def stageRow(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ => ""
}
val totalTasks = s.numPartitions
val poolName = listener.stageToPool.get(s)
val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
val description = listener.stageToDescription.get(s)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
<tr>
<td>{s.id}</td>
{if (isFairScheduler) {
<td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
}
<td>{description}</td>
<td valign="middle">{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td class="progress-cell">
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
</td>
<td>{shuffleRead}</td>
<td>{shuffleWrite}</td>
</tr>
}
}

View file

@ -83,18 +83,19 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<hr/>
<div class="row">
<div class="span12">
<h3> Data Distribution Summary </h3>
{workerTable}
</div>
</div>
<hr/>
<div class="row">
<div class="span12">
<h3> RDD Summary </h3>
<h3> Partitions </h3>
{blockTable}
</div>
</div>;
headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs)
headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Storage)
}
def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {

View file

@ -73,7 +73,6 @@ class Vector(val elements: Array[Double]) extends Serializable {
def += (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
var ans = 0.0
var i = 0
while (i < length) {
elements(i) += other(i)
@ -117,9 +116,7 @@ object Vector {
def apply(elements: Double*) = new Vector(elements.toArray)
def apply(length: Int, initializer: Int => Double): Vector = {
val elements = new Array[Double](length)
for (i <- 0 until length)
elements(i) = initializer(i)
val elements: Array[Double] = Array.tabulate(length)(initializer)
return new Vector(elements)
}

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.io
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import org.scalatest.FunSuite
class CompressionCodecSuite extends FunSuite {
def testCodec(codec: CompressionCodec) {
// Write 1000 integers to the output stream, compressed.
val outputStream = new ByteArrayOutputStream()
val out = codec.compressedOutputStream(outputStream)
for (i <- 1 until 1000) {
out.write(i % 256)
}
out.close()
// Read the 1000 integers back.
val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
val in = codec.compressedInputStream(inputStream)
for (i <- 1 until 1000) {
assert(in.read() === i % 256)
}
in.close()
}
test("default compression codec") {
val codec = CompressionCodec.createCodec()
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
test("lzf compression codec") {
val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
assert(codec.getClass === classOf[LZFCompressionCodec])
testCodec(codec)
}
test("snappy compression codec") {
val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
assert(codec.getClass === classOf[SnappyCompressionCodec])
testCodec(codec)
}
}

View file

@ -32,6 +32,10 @@ import spark.{Dependency, ShuffleDependency, OneToOneDependency}
import spark.{FetchFailed, Success, TaskEndReason}
import spark.storage.{BlockManagerId, BlockManagerMaster}
import spark.scheduler.cluster.Pool
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
@ -49,6 +53,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {

View file

@ -57,7 +57,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")

View file

@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {

View file

@ -35,7 +35,7 @@ for these variables.
* `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of
the `spark.executor.memory` system property, so we recommend using that in new code.
@ -77,7 +77,7 @@ there are at least five properties that you will commonly want to control:
Class to use for serializing objects that will be sent over the network or need to be cached
in serialized form. The default of Java serialization works with any Serializable Java object but is
quite slow, so we recommend <a href="tuning.html">using <code>spark.KryoSerializer</code>
and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
<a href="api/core/index.html#spark.Serializer"><code>spark.Serializer</code></a>).
</td>
</tr>
@ -86,7 +86,7 @@ there are at least five properties that you will commonly want to control:
<td>(none)</td>
<td>
If you use Kryo serialization, set this class to register your custom classes with Kryo.
You need to set it to a class that extends
You need to set it to a class that extends
<a href="api/core/index.html#spark.KryoRegistrator"><code>spark.KryoRegistrator</code></a>).
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
</td>
@ -180,6 +180,21 @@ Apart from these, the following properties are also available, and may be useful
Can save substantial space at the cost of some extra CPU time.
</td>
</tr>
<tr>
<td>spark.io.compression.codec</td>
<td>spark.io.SnappyCompressionCodec</td>
<td>
The compression codec class to use for various compressions. By default, Spark provides two
codecs: <code>spark.io.LZFCompressionCodec</code> and <code>spark.io.SnappyCompressionCodec</code>.
</td>
</tr>
<tr>
<td>spark.io.compression.snappy.block.size</td>
<td>32768</td>
<td>
Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used.
</td>
</tr>
<tr>
<td>spark.reducer.maxMbInFlight</td>
<td>48</td>
@ -295,6 +310,14 @@ Apart from these, the following properties are also available, and may be useful
Duration (milliseconds) of how long to batch new objects coming from network receivers.
</td>
</tr>
<tr>
<td>spark.task.maxFailures</td>
<td>4</td>
<td>
Number of individual task failures before giving up on the job.
Should be greater than or equal to 1. Number of allowed retries = this value - 1.
</td>
</tr>
</table>

View file

@ -43,7 +43,7 @@ Finally, the following configuration options can be passed to the master and wor
</tr>
<tr>
<td><code>-p PORT</code>, <code>--port PORT</code></td>
<td>IP address or DNS name to listen on (default: 7077 for master, random for worker)</td>
<td>Port for service to listen on (default: 7077 for master, random for worker)</td>
</tr>
<tr>
<td><code>--webui-port PORT</code></td>

View file

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.examples;
import scala.Tuple2;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
import spark.api.java.function.PairFlatMapFunction;
import spark.api.java.function.PairFunction;
import java.util.List;
import java.util.ArrayList;
/**
* Computes the PageRank of URLs from an input file. Input file should
* be in format of:
* URL neighbor URL
* URL neighbor URL
* URL neighbor URL
* ...
* where URL and their neighbors are separated by space(s).
*/
public class JavaPageRank {
private static double sum(List<Double> numbers) {
double out = 0.0;
for (double number : numbers) {
out += number;
}
return out;
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
System.exit(1);
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = s.split("\\s+");
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
public Double call(List<String> rs) throws Exception {
return 1.0;
}
});
// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : s._1) {
results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
}
return results;
}
});
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.groupByKey().mapValues(new Function<List<Double>, Double>() {
@Override
public Double call(List<Double> cs) throws Exception {
return 0.15 + sum(cs) * 0.85;
}
});
}
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2 tuple : output) {
System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
}
System.exit(0);
}
}

View file

@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/conf" "$DISTDIR"
cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
cp "$FWDIR/spark-executor" "$DISTDIR"
if [ "$1" == "tgz" ]; then

165
mllib/pom.xml Normal file
View file

@ -0,0 +1,165 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<packaging>jar</packaging>
<name>Spark Project ML Library</name>
<url>http://spark-project.org/</url>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View file

@ -315,14 +315,15 @@ object KMeans {
}
def main(args: Array[String]) {
if (args.length != 4) {
println("Usage: KMeans <master> <input_file> <k> <max_iterations>")
if (args.length < 4) {
println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]")
System.exit(1)
}
val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt)
val runs = if (args.length >= 5) args(4).toInt else 1
val sc = new SparkContext(master, "KMeans")
val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble))
val model = KMeans.train(data, k, iters)
val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache()
val model = KMeans.train(data, k, iters, runs)
val cost = model.computeCost(data)
println("Cluster centers:")
for (c <- model.clusterCenters) {

View file

@ -418,6 +418,7 @@ object ALS {
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
System.setProperty("spark.kryo.referenceTracking", "false")
System.setProperty("spark.kryoserializer.buffer.mb", "8")
System.setProperty("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>

View file

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.mllib.util
import scala.util.Random
import spark.{RDD, SparkContext}
object KMeansDataGenerator {
/**
* Generate an RDD containing test data for KMeans. This function chooses k cluster centers
* from a d-dimensional Gaussian distribution scaled by factor r, then creates a Gaussian
* cluster with scale 1 around each center.
*
* @param sc SparkContext to use for creating the RDD
* @param numPoints Number of points that will be contained in the RDD
* @param k Number of clusters
* @param d Number of dimensions
* @parak r Scaling factor for the distribution of the initial centers
* @param numPartitions Number of partitions of the generated RDD; default 2
*/
def generateKMeansRDD(
sc: SparkContext,
numPoints: Int,
k: Int,
d: Int,
r: Double,
numPartitions: Int = 2)
: RDD[Array[Double]] =
{
// First, generate some centers
val rand = new Random(42)
val centers = Array.fill(k)(Array.fill(d)(rand.nextGaussian() * r))
// Then generate points around each center
sc.parallelize(0 until numPoints, numPartitions).map { idx =>
val center = centers(idx % k)
val rand2 = new Random(42 + idx)
Array.tabulate(d)(i => center(i) + rand2.nextGaussian())
}
}
def main(args: Array[String]) {
if (args.length < 6) {
println("Usage: KMeansGenerator " +
"<master> <output_dir> <num_points> <k> <d> <r> [<num_partitions>]")
System.exit(1)
}
val sparkMaster = args(0)
val outputPath = args(1)
val numPoints = args(2).toInt
val k = args(3).toInt
val d = args(4).toInt
val r = args(5).toDouble
val parts = if (args.length >= 7) args(6).toInt else 2
val sc = new SparkContext(sparkMaster, "KMeansDataGenerator")
val data = generateKMeansRDD(sc, numPoints, k, d, r, parts)
data.map(_.mkString(" ")).saveAsTextFile(outputPath)
System.exit(0)
}
}

View file

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.mllib.recommendation
import scala.util.Random
import org.jblas.DoubleMatrix
import spark.{RDD, SparkContext}
import spark.mllib.util.MLUtils
/**
* Generate RDD(s) containing data for Matrix Factorization.
*
* This method samples training entries according to the oversampling factor
* 'trainSampFact', which is a multiplicative factor of the number of
* degrees of freedom of the matrix: rank*(m+n-rank).
*
* It optionally samples entries for a testing matrix using
* 'testSampFact', the percentage of the number of training entries
* to use for testing.
*
* This method takes the following inputs:
* sparkMaster (String) The master URL.
* outputPath (String) Directory to save output.
* m (Int) Number of rows in data matrix.
* n (Int) Number of columns in data matrix.
* rank (Int) Underlying rank of data matrix.
* trainSampFact (Double) Oversampling factor.
* noise (Boolean) Whether to add gaussian noise to training data.
* sigma (Double) Standard deviation of added gaussian noise.
* test (Boolean) Whether to create testing RDD.
* testSampFact (Double) Percentage of training data to use as test data.
*/
object MFDataGenerator{
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: MFDataGenerator " +
"<master> <outputDir> [m] [n] [rank] [trainSampFact] [noise] [sigma] [test] [testSampFact]")
System.exit(1)
}
val sparkMaster: String = args(0)
val outputPath: String = args(1)
val m: Int = if (args.length > 2) args(2).toInt else 100
val n: Int = if (args.length > 3) args(3).toInt else 100
val rank: Int = if (args.length > 4) args(4).toInt else 10
val trainSampFact: Double = if (args.length > 5) args(5).toDouble else 1.0
val noise: Boolean = if (args.length > 6) args(6).toBoolean else false
val sigma: Double = if (args.length > 7) args(7).toDouble else 0.1
val test: Boolean = if (args.length > 8) args(8).toBoolean else false
val testSampFact: Double = if (args.length > 9) args(9).toDouble else 0.1
val sc = new SparkContext(sparkMaster, "MFDataGenerator")
val A = DoubleMatrix.randn(m, rank)
val B = DoubleMatrix.randn(rank, n)
val z = 1 / (scala.math.sqrt(scala.math.sqrt(rank)))
A.mmuli(z)
B.mmuli(z)
val fullData = A.mmul(B)
val df = rank * (m + n - rank)
val sampSize = scala.math.min(scala.math.round(trainSampFact * df),
scala.math.round(.99 * m * n)).toInt
val rand = new Random()
val mn = m * n
val shuffled = rand.shuffle(1 to mn toIterable)
val omega = shuffled.slice(0, sampSize)
val ordered = omega.sortWith(_ < _).toArray
val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered)
.map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
// optionally add gaussian noise
if (noise) {
trainData.map(x => (x._1, x._2, x._3 + rand.nextGaussian * sigma))
}
trainData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
// optionally generate testing data
if (test) {
val testSampSize = scala.math
.min(scala.math.round(sampSize * testSampFact),scala.math.round(mn - sampSize)).toInt
val testOmega = shuffled.slice(sampSize, sampSize + testSampSize)
val testOrdered = testOmega.sortWith(_ < _).toArray
val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered)
.map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
}
sc.stop()
}
}

View file

@ -39,9 +39,9 @@ object MLUtils {
*/
def loadLabeledData(sc: SparkContext, dir: String): RDD[LabeledPoint] = {
sc.textFile(dir).map { line =>
val parts = line.split(",")
val parts = line.split(',')
val label = parts(0).toDouble
val features = parts(1).trim().split(" ").map(_.toDouble)
val features = parts(1).trim().split(' ').map(_.toDouble)
LabeledPoint(label, features)
}
}

View file

@ -58,6 +58,7 @@
<module>core</module>
<module>bagel</module>
<module>examples</module>
<module>mllib</module>
<module>tools</module>
<module>streaming</module>
<module>repl</module>
@ -182,6 +183,11 @@
<artifactId>compress-lzf</artifactId>
<version>0.8.4</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>

View file

@ -44,7 +44,7 @@ object SparkBuild extends Build {
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core)
lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn(bagel) dependsOn(mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib)
@ -151,6 +151,7 @@ object SparkBuild extends Build {
val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
val excludeAsm = ExclusionRule(organization = "asm")
val excludeSnappy = ExclusionRule(organization = "org.xerial.snappy")
def coreSettings = sharedSettings ++ Seq(
name := "spark-core",
@ -168,6 +169,7 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"commons-daemon" % "commons-daemon" % "1.0.10",
"com.ning" % "compress-lzf" % "0.8.4",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty),
@ -235,6 +237,7 @@ object SparkBuild extends Build {
exclude("jline","jline")
exclude("log4j","log4j")
exclude("org.apache.cassandra.deps", "avro")
excludeAll(excludeSnappy)
)
)
@ -242,7 +245,9 @@ object SparkBuild extends Build {
name := "spark-tools"
)
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
def bagelSettings = sharedSettings ++ Seq(
name := "spark-bagel"
)
def mllibSettings = sharedSettings ++ Seq(
name := "spark-mllib",
@ -257,7 +262,7 @@ object SparkBuild extends Build {
"Akka Repository" at "http://repo.akka.io/releases/"
),
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty),
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty)

View file

@ -831,6 +831,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
var sparkContext: SparkContext = null
def createSparkContext(): SparkContext = {
val uri = System.getenv("SPARK_EXECUTOR_URI")
if (uri != null) {
System.setProperty("spark.executor.uri", uri)
}
val master = this.master match {
case Some(m) => m
case None => {

5
run
View file

@ -72,7 +72,10 @@ esac
# hard to kill the child with stuff like Process.destroy(). However, for
# the Spark shell, the wrapper is necessary to properly reset the terminal
# when we exit, so we allow it to set a variable to launch with scala.
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
# We still fall back on java for the shell if this is a "release" created
# from make-distribution.sh since it's possible scala is not installed
# but we have everything we need to run the shell.
if [[ "$SPARK_LAUNCH_WITH_SCALA" == "1" && ! -f "$FWDIR/RELEASE" ]]; then
if [ "$SCALA_HOME" ]; then
RUNNER="${SCALA_HOME}/bin/scala"
else

View file

@ -17,16 +17,17 @@
package spark.streaming
import spark.{Logging, Utils}
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import java.io._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import spark.Logging
import spark.io.CompressionCodec
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
@ -49,6 +50,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
@ -66,6 +68,8 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
private val compressionCodec = CompressionCodec.createCodec()
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
@ -103,7 +107,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
def write(checkpoint: Checkpoint) {
val bos = new ByteArrayOutputStream()
val zos = new LZFOutputStream(bos)
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
oos.writeObject(checkpoint)
oos.close()
@ -137,6 +141,8 @@ object CheckpointReader extends Logging {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
val compressionCodec = CompressionCodec.createCodec()
attempts.foreach(file => {
if (fs.exists(file)) {
logInfo("Attempting to load checkpoint from file '" + file + "'")
@ -147,7 +153,7 @@ object CheckpointReader extends Logging {
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val zis = new LZFInputStream(fis)
val zis = compressionCodec.compressedInputStream(fis)
val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
@ -170,7 +176,9 @@ object CheckpointReader extends Logging {
}
private[streaming]
class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
extends ObjectInputStream(inputStream_) {
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
return loader.loadClass(desc.getName())