Merge branch 'master' of https://github.com/mesos/spark into indexed_rdd2

This commit is contained in:
Joseph E. Gonzalez 2013-08-18 10:45:30 -07:00
commit 2b568520bf
75 changed files with 894 additions and 555 deletions

View file

@ -3,8 +3,8 @@
# This file configures Spark's internal metrics system. The metrics system is
# divided into instances which correspond to internal components.
# Each instance can be configured to report its metrics to one or more sinks.
# Accepted values for [instance] are "master", "worker", "executor", "driver",
# and "applications". A wild card "*" can be used as an instance name, in
# Accepted values for [instance] are "master", "worker", "executor", "driver",
# and "applications". A wild card "*" can be used as an instance name, in
# which case all instances will inherit the supplied property.
#
# Within an instance, a "source" specifies a particular set of grouped metrics.
@ -19,7 +19,7 @@
# A "sink" specifies where metrics are delivered to. Each instance can be
# assigned one or more sinks.
#
# The sink|source field specifies whether the property relates to a sink or
# The sink|source field specifies whether the property relates to a sink or
# source.
#
# The [name] field specifies the name of source or sink.
@ -28,18 +28,24 @@
# source or sink is responsible for parsing this property.
#
# Notes:
# 1. To add a new sink, set the "class" option to a fully qualified class
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
# 4. A metrics specific configuration
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
# added to Java properties using -Dspark.metrics.conf=xxx if you want to
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
# and it will be loaded automatically.
# 5. MetricsServlet is added by default as a sink in master, worker and client
# driver, you can send http request "/metrics/json" to get a snapshot of all the
# registered metrics in json format. For master, requests "/metrics/master/json" and
# "/metrics/applications/json" can be sent seperately to get metrics snapshot of
# instance master and applications. MetricsServlet may not be configured by self.
#
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink

View file

@ -1,2 +1,2 @@
# A Spark Worker will be started on each of the machines listes below.
# A Spark Worker will be started on each of the machines listed below.
localhost

View file

@ -90,10 +90,6 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.9.2</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
@ -126,6 +122,10 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-json</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>

View file

@ -5,10 +5,6 @@
padding: 0;
}
body {
font-size: 15px !important;
}
.version {
line-height: 30px;
vertical-align: bottom;
@ -53,6 +49,10 @@ body {
line-height: 15px !important;
}
.table-fixed {
table-layout:fixed;
}
.table td {
vertical-align: middle !important;
}

View file

@ -39,7 +39,6 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Represents a dependency on the output of a shuffle stage.
* @param shuffleId the shuffle id
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializerClass class name of the serializer to use

View file

@ -85,17 +85,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(aggregator.combineValuesByKey(_), true)
self.mapPartitions(aggregator.combineValuesByKey, true)
} else if (mapSideCombine) {
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey, true)
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
.setSerializer(serializerClass)
partitioned.mapPartitions(aggregator.combineCombinersByKey, true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
values.mapPartitions(aggregator.combineValuesByKey(_), true)
val values = new ShuffledRDD[K, V](self, partitioner).setSerializer(serializerClass)
values.mapPartitions(aggregator.combineValuesByKey, true)
}
}
@ -233,31 +234,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
* is true, Spark will group values of the same key together on the map side before the
* repartitioning, to only send each key over the network once. If a large number of
* duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
* be set to true.
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
if (getKeyClass().isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
if (mapSideCombine) {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.flatMapValues(buf => buf)
} else {
new ShuffledRDD[K, V](self, partitioner)
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
new ShuffledRDD[K, V](self, partitioner)
}
/**

View file

@ -515,22 +515,19 @@ abstract class RDD[T: ClassManifest](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
def zipPartitions[B: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B]) => Iterator[V],
rdd2: RDD[B]): RDD[V] =
def zipPartitions[B: ClassManifest, V: ClassManifest]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C]): RDD[V] =
def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C],
rdd4: RDD[D]): RDD[V] =
def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)

View file

@ -267,16 +267,20 @@ class SparkContext(
localProperties.value = new Properties()
}
def addLocalProperty(key: String, value: String) {
if(localProperties.value == null) {
def setLocalProperty(key: String, value: String) {
if (localProperties.value == null) {
localProperties.value = new Properties()
}
localProperties.value.setProperty(key,value)
if (value == null) {
localProperties.value.remove(key)
} else {
localProperties.value.setProperty(key, value)
}
}
/** Set a human readable description of the current job. */
def setDescription(value: String) {
addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
// Post init

View file

@ -24,9 +24,11 @@ private[spark] object TaskState
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
type TaskState = Value
def isFinished(state: TaskState) = Seq(FINISHED, FAILED, LOST).contains(state)
def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
def toMesos(state: TaskState): MesosTaskState = state match {
case LAUNCHING => MesosTaskState.TASK_STARTING

View file

@ -521,9 +521,9 @@ private object Utils extends Logging {
}
/**
* Convert a memory quantity in bytes to a human-readable string such as "4.0 MB".
* Convert a quantity in bytes to a human-readable string such as "4.0 MB".
*/
def memoryBytesToString(size: Long): String = {
def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
@ -566,10 +566,10 @@ private object Utils extends Logging {
}
/**
* Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
* Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
*/
def memoryMegabytesToString(megabytes: Long): String = {
memoryBytesToString(megabytes * 1024L * 1024L)
def megabytesToString(megabytes: Long): String = {
bytesToString(megabytes * 1024L * 1024L)
}
/**

View file

@ -23,6 +23,7 @@ import java.util.Comparator
import scala.Tuple2
import scala.collection.JavaConversions._
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
@ -252,11 +253,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(rdd.subtract(other, p))
/**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
* is true, Spark will group values of the same key together on the map side before the
* repartitioning, to only send each key over the network once. If a large number of
* duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
* be set to true.
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.partitionBy(partitioner))
@ -276,8 +273,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* partition the output RDD.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (V, Option[W])] =
fromRDD(rdd.leftOuterJoin(other, partitioner))
: JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@ -286,8 +285,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* partition the output RDD.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other, partitioner))
: JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
@ -340,8 +341,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
fromRDD(rdd.leftOuterJoin(other))
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@ -349,8 +352,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Option[W])] =
fromRDD(rdd.leftOuterJoin(other, numPartitions))
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
}
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@ -358,8 +363,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other))
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@ -367,8 +374,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other, numPartitions))
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}
/**
* Return the key-value pairs in this RDD to the master as a Map.

View file

@ -207,12 +207,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of elements in each partition.
*/
def zipPartitions[U, V](
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V],
other: JavaRDDLike[U, _]): JavaRDD[V] = {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
rdd.zipPartitions(fn, other.rdd)(other.classManifest, f.elementType()))(f.elementType())
rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
}
// Actions (launch a job to return a value to the user program)
@ -366,10 +366,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile(): Optional[String] = {
rdd.getCheckpointFile match {
case Some(file) => Optional.of(file)
case _ => Optional.absent()
}
JavaUtils.optionToOptional(rdd.getCheckpointFile)
}
/** A description of this RDD and its recursive dependencies for debugging. */

View file

@ -32,6 +32,8 @@ import spark.SparkContext.IntAccumulatorParam
import spark.SparkContext.DoubleAccumulatorParam
import spark.broadcast.Broadcast
import com.google.common.base.Optional
/**
* A Java-friendly version of [[spark.SparkContext]] that returns [[spark.api.java.JavaRDD]]s and
* works with Java collections instead of Scala ones.
@ -337,7 +339,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* or the spark.home Java property, or the SPARK_HOME environment variable
* (in that order of preference). If neither of these is set, return None.
*/
def getSparkHome(): Option[String] = sc.getSparkHome()
def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())
/**
* Add a file to be downloaded with this Spark job on every node.

View file

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.api.java
import com.google.common.base.Optional
object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
option match {
case Some(value) => Optional.of(value)
case None => Optional.absent()
}
}

View file

@ -17,7 +17,7 @@
package spark.deploy
import net.liftweb.json.JsonDSL._
import scala.util.parsing.json.{JSONArray, JSONObject, JSONType}
import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import spark.deploy.master.{ApplicationInfo, WorkerInfo}
@ -25,61 +25,63 @@ import spark.deploy.worker.ExecutorRunner
private[spark] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo) = {
("id" -> obj.id) ~
("host" -> obj.host) ~
("port" -> obj.port) ~
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed)
}
def writeApplicationInfo(obj: ApplicationInfo) = {
("starttime" -> obj.startTime) ~
("id" -> obj.id) ~
("name" -> obj.desc.name) ~
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerSlave) ~
("submitdate" -> obj.submitDate.toString)
}
def writeWorkerInfo(obj: WorkerInfo): JSONType = JSONObject(Map(
"id" -> obj.id,
"host" -> obj.host,
"port" -> obj.port,
"webuiaddress" -> obj.webUiAddress,
"cores" -> obj.cores,
"coresused" -> obj.coresUsed,
"memory" -> obj.memory,
"memoryused" -> obj.memoryUsed,
"state" -> obj.state.toString
))
def writeApplicationDescription(obj: ApplicationDescription) = {
("name" -> obj.name) ~
("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~
("user" -> obj.user)
}
def writeApplicationInfo(obj: ApplicationInfo): JSONType = JSONObject(Map(
"starttime" -> obj.startTime,
"id" -> obj.id,
"name" -> obj.desc.name,
"cores" -> obj.desc.maxCores,
"user" -> obj.desc.user,
"memoryperslave" -> obj.desc.memoryPerSlave,
"submitdate" -> obj.submitDate.toString
))
def writeExecutorRunner(obj: ExecutorRunner) = {
("id" -> obj.execId) ~
("memory" -> obj.memory) ~
("appid" -> obj.appId) ~
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
def writeApplicationDescription(obj: ApplicationDescription): JSONType = JSONObject(Map(
"name" -> obj.name,
"cores" -> obj.maxCores,
"memoryperslave" -> obj.memoryPerSlave,
"user" -> obj.user
))
def writeMasterState(obj: MasterStateResponse) = {
("url" -> ("spark://" + obj.uri)) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
("coresused" -> obj.workers.map(_.coresUsed).sum) ~
("memory" -> obj.workers.map(_.memory).sum) ~
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
}
def writeExecutorRunner(obj: ExecutorRunner): JSONType = JSONObject(Map(
"id" -> obj.execId,
"memory" -> obj.memory,
"appid" -> obj.appId,
"appdesc" -> writeApplicationDescription(obj.appDesc)
))
def writeWorkerState(obj: WorkerStateResponse) = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
("masterwebuiurl" -> obj.masterWebUiUrl) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
}
def writeMasterState(obj: MasterStateResponse): JSONType = JSONObject(Map(
"url" -> ("spark://" + obj.uri),
"workers" -> obj.workers.toList.map(writeWorkerInfo),
"cores" -> obj.workers.map(_.cores).sum,
"coresused" -> obj.workers.map(_.coresUsed).sum,
"memory" -> obj.workers.map(_.memory).sum,
"memoryused" -> obj.workers.map(_.memoryUsed).sum,
"activeapps" -> JSONArray(obj.activeApps.toList.map(writeApplicationInfo)),
"completedapps" -> JSONArray(obj.completedApps.toList.map(writeApplicationInfo))
))
def writeWorkerState(obj: WorkerStateResponse): JSONType = JSONObject(Map(
"id" -> obj.workerId,
"masterurl" -> obj.masterUrl,
"masterwebuiurl" -> obj.masterWebUiUrl,
"cores" -> obj.cores,
"coresused" -> obj.coresUsed,
"memory" -> obj.memory,
"memoryused" -> obj.memoryUsed,
"executors" -> JSONArray(obj.executors.toList.map(writeExecutorRunner)),
"finishedexecutors" -> JSONArray(obj.finishedExecutors.toList.map(writeExecutorRunner))
))
}

View file

@ -57,14 +57,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
var firstApp: Option[ApplicationInfo] = None
val webUi = new MasterWebUI(self, webUiPort)
Utils.checkHost(host, "Expected hostname")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
val webUi = new MasterWebUI(this, webUiPort)
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
@ -96,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def receive = {
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
host, workerPort, cores, Utils.megabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {

View file

@ -17,6 +17,7 @@
package spark.deploy.master.ui
import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
@ -25,19 +26,17 @@ import akka.util.duration._
import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import spark.deploy.JsonProtocol
import spark.deploy.master.ExecutorInfo
import spark.ui.UIUtils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.master
val master = parent.masterActorRef
implicit val timeout = parent.timeout
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
def renderJson(request: HttpServletRequest): JSONType = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)

View file

@ -19,14 +19,13 @@ package spark.deploy.master.ui
import javax.servlet.http.HttpServletRequest
import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
import spark.Utils
import spark.deploy.DeployWebUI
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
@ -35,10 +34,10 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo}
import spark.ui.UIUtils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.master
val master = parent.masterActorRef
implicit val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
def renderJson(request: HttpServletRequest): JSONType = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeMasterState(state)
@ -53,7 +52,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
val workers = state.workers.sortBy(_.id)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
val appHeaders = Seq("ID", "Description", "Cores", "Memory per Node", "Submit Time", "User",
val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
"State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
@ -70,8 +69,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
{state.workers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory:</strong>
{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
{Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
{Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Applications:</strong>
{state.activeApps.size} Running,
{state.completedApps.size} Completed </li>
@ -116,8 +115,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td>{worker.state}</td>
<td>{worker.cores} ({worker.coresUsed} Used)</td>
<td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
{Utils.memoryMegabytesToString(worker.memory)}
({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)
{Utils.megabytesToString(worker.memory)}
({Utils.megabytesToString(worker.memoryUsed)} Used)
</td>
</tr>
}
@ -135,7 +134,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>

View file

@ -17,7 +17,6 @@
package spark.deploy.master.ui
import akka.actor.ActorRef
import akka.util.Duration
import javax.servlet.http.HttpServletRequest
@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, Utils}
import spark.deploy.master.Master
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
@ -32,12 +32,14 @@ import spark.ui.JettyUtils._
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName()
val port = requestedPort
val masterActorRef = master.self
var server: Option[Server] = None
var boundPort: Option[Int] = None
@ -57,7 +59,10 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
}
}
val handlers = Array[(String, Handler)](
val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
master.applicationMetricsSystem.getServletHandlers
val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
("/app", (request: HttpServletRequest) => applicationPage.render(request)),

View file

@ -170,10 +170,10 @@ private[spark] class ExecutorRunner(
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
Files.write(header, stdout, Charsets.UTF_8)
redirectStream(process.getInputStream, stdout)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, Charsets.UTF_8)
redirectStream(process.getErrorStream, stderr)
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run

View file

@ -96,11 +96,12 @@ private[spark] class Worker(
override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.memoryMegabytesToString(memory)))
host, port, cores, Utils.megabytesToString(memory)))
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()

View file

@ -19,14 +19,13 @@ package spark.deploy.worker.ui
import javax.servlet.http.HttpServletRequest
import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
import spark.Utils
import spark.deploy.JsonProtocol
import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
@ -39,7 +38,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val worker = parent.worker
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
def renderJson(request: HttpServletRequest): JSONType = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
@ -65,8 +64,8 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
Master URL:</strong> {workerState.masterUrl}
</li>
<li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
<li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
<li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
</ul>
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
</div>
@ -97,7 +96,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<td>{executor.execId}</td>
<td>{executor.cores}</td>
<td sorttable_customkey={executor.memory.toString}>
{Utils.memoryMegabytesToString(executor.memory)}
{Utils.megabytesToString(executor.memory)}
</td>
<td>
<ul class="unstyled">

View file

@ -17,7 +17,6 @@
package spark.deploy.worker.ui
import akka.actor.ActorRef
import akka.util.{Duration, Timeout}
import java.io.{FileInputStream, File}
@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val indexPage = new IndexPage(this)
val handlers = Array[(String, Handler)](
val metricsHandlers = worker.metricsSystem.getServletHandlers
val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
("/logPage", (request: HttpServletRequest) => logPage(request)),
@ -113,7 +114,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
.format(appId, executorId, logType, math.max(startByte-byteLength, 0),
byteLength)}>
<button type="button" class="btn btn-default">
Previous {Utils.memoryBytesToString(math.min(byteLength, startByte))}
Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
</a>
}
@ -128,7 +129,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
<a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
format(appId, executorId, logType, endByte, byteLength)}>
<button type="button" class="btn btn-default">
Next {Utils.memoryBytesToString(math.min(byteLength, logLength-endByte))}
Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
</button>
</a>
}

View file

@ -17,18 +17,17 @@
package spark.executor
import java.io.{File, FileOutputStream}
import java.net.{URI, URL, URLClassLoader}
import java.io.{File}
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent._
import org.apache.hadoop.fs.FileUtil
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer, Map, HashMap}
import spark.broadcast._
import spark.scheduler._
import spark._
import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
@ -116,6 +115,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var attemptedTask: Option[Task[Any]] = None
var taskStart: Long = 0
def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
val startGCTime = getTotalGCTime
try {
SparkEnv.set(env)
Accumulators.clear()
@ -128,10 +130,11 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
task.metrics.foreach{ m =>
for (m <- task.metrics) {
m.hostname = Utils.localHostName
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = getTotalGCTime - startGCTime
}
//TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c
// we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could
@ -155,7 +158,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
metrics.foreach{m => m.executorRunTime = serviceTime}
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = getTotalGCTime - startGCTime
}
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

View file

@ -31,13 +31,18 @@ class TaskMetrics extends Serializable {
/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
var executorRunTime:Int = _
var executorRunTime: Int = _
/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
var resultSize: Long = _
/**
* Amount of time the JVM spent in garbage collection while executing this task
*/
var jvmGCTime: Long = _
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/

View file

@ -36,7 +36,11 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
var propertyCategories: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
// empty function, any default property can be set here
prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
prop.setProperty("*.sink.servlet.uri", "/metrics/json")
prop.setProperty("*.sink.servlet.sample", "false")
prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
}
def initialize() {

View file

@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
import spark.Logging
import spark.metrics.sink.Sink
import spark.metrics.sink.{MetricsServlet, Sink}
import spark.metrics.source.Source
/**
@ -35,7 +35,7 @@ import spark.metrics.source.Source
* "instance" specify "who" (the role) use metrics system. In spark there are several roles
* like master, worker, executor, client driver, these roles will create metrics system
* for monitoring. So instance represents these roles. Currently in Spark, several instances
* have already implemented: master, worker, executor, driver.
* have already implemented: master, worker, executor, driver, applications.
*
* "source" specify "where" (source) to collect metrics data. In metrics system, there exists
* two kinds of source:
@ -51,8 +51,8 @@ import spark.metrics.source.Source
* Metrics configuration format is like below:
* [instance].[sink|source].[name].[options] = xxxx
*
* [instance] can be "master", "worker", "executor", "driver", which means only the specified
* instance has this property.
* [instance] can be "master", "worker", "executor", "driver", "applications" which means only
* the specified instance has this property.
* wild card "*" can be used to replace instance name, which means all the instances will have
* this property.
*
@ -72,6 +72,12 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry()
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None
/** Get any UI handlers used by this metrics system. */
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
metricsConfig.initialize()
registerSources()
registerSinks()
@ -126,7 +132,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
sinks += sink.asInstanceOf[Sink]
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
}

View file

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.metrics.sink
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.json.MetricsModule
import com.fasterxml.jackson.databind.ObjectMapper
import java.util.Properties
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
import spark.ui.JettyUtils
class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
val SERVLET_KEY_URI = "uri"
val SERVLET_KEY_SAMPLE = "sample"
val servletURI = property.getProperty(SERVLET_KEY_URI)
val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
def getHandlers = Array[(String, Handler)](
(servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
)
def getMetricsSnapshot(request: HttpServletRequest): String = {
mapper.writeValueAsString(registry)
}
override def start() { }
override def stop() { }
}

View file

@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@ -52,13 +52,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
override def hashCode(): Int = idx
}
private[spark] class CoGroupAggregator
extends Aggregator[Any, Any, ArrayBuffer[Any]](
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 })
with Serializable
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
@ -66,34 +59,25 @@ private[spark] class CoGroupAggregator
*
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
* @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
* is on, Spark does an extra pass over the data on the map side to merge
* all values belonging to the same key together. This can reduce the amount
* of data shuffled if and only if the number of distinct keys is very small,
* and the ratio of key size to value size is also very small.
*/
class CoGroupedRDD[K](
@transient var rdds: Seq[RDD[(K, _)]],
part: Partitioner,
val mapSideCombine: Boolean = false,
val serializerClass: String = null)
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private val aggr = new CoGroupAggregator
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
serializerClass = cls
this
}
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd =>
rdds.map { rdd: RDD[(K, _)] =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
if (mapSideCombine) {
val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass)
} else {
new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
}
new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
}
}
}
@ -145,16 +129,8 @@ class CoGroupedRDD[K](
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
if (mapSideCombine) {
// With map side combine on, for each key, the shuffle fetcher returns a list of values.
fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
case (key, values) => getSeq(key)(depNum) ++= values
}
} else {
// With map side combine off, for each key the shuffle fetcher returns a single value.
fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
case (key, value) => getSeq(key)(depNum) += value
}
fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
case (key, value) => getSeq(key)(depNum) += value
}
}
}

View file

@ -17,8 +17,9 @@
package spark.rdd
import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
import spark.SparkContext._
import spark._
import scala.Some
import scala.Some
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@ -30,15 +31,24 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* The resulting RDD from a shuffle (e.g. repartitioning of data).
* @param prev the parent RDD.
* @param part the partitioner used to partition the RDD
* @param serializerClass class name of the serializer to use.
* @tparam K the key class.
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
@transient prev: RDD[(K, V)],
part: Partitioner,
serializerClass: String = null)
extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part, serializerClass))) {
@transient var prev: RDD[(K, V)],
part: Partitioner)
extends RDD[(K, V)](prev.context, Nil) {
private var serializerClass: String = null
def setSerializer(cls: String): ShuffledRDD[K, V] = {
serializerClass = cls
this
}
override def getDependencies: Seq[Dependency[_]] = {
List(new ShuffleDependency(prev, part, serializerClass))
}
override val partitioner = Some(part)
@ -51,4 +61,9 @@ class ShuffledRDD[K, V](
SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics,
SparkEnv.get.serializerManager.get(serializerClass))
}
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}

View file

@ -49,10 +49,16 @@ import spark.OneToOneDependency
private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
@transient var rdd1: RDD[(K, V)],
@transient var rdd2: RDD[(K, W)],
part: Partitioner,
val serializerClass: String = null)
part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {
private var serializerClass: String = null
def setSerializer(cls: String): SubtractedRDD[K, V, W] = {
serializerClass = cls
this
}
override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
if (rdd.partitioner == Some(part)) {

View file

@ -153,7 +153,7 @@ object StatsReportListener extends Logging {
}
def showBytesDistribution(heading: String, dist: Distribution) {
showDistribution(heading, dist, (d => Utils.memoryBytesToString(d.toLong)): Double => String)
showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
}
def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {

View file

@ -17,19 +17,14 @@
package spark.scheduler.cluster
import java.io.{File, FileInputStream, FileOutputStream}
import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.util.control.Breaks._
import scala.xml._
import scala.xml.XML
import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.util.Properties
/**
* An interface to build Schedulable tree
@ -56,7 +51,7 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@ -69,39 +64,44 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, schedulingMode, minShare, weight))
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
} else {
throw new java.io.FileNotFoundException(
"Fair scheduler allocation file not found: " + schedulerAllocFile)
}
}
@ -110,7 +110,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
@ -127,7 +127,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}

View file

@ -77,7 +77,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(executorId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
executorId, hostPort, cores, Utils.memoryMegabytesToString(memory)))
executorId, hostPort, cores, Utils.megabytesToString(memory)))
}
override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {

View file

@ -18,8 +18,11 @@
package spark.scheduler.local
import java.io.File
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
@ -173,6 +176,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var attemptedTask: Option[Task[_]] = None
val start = System.currentTimeMillis()
var taskStart: Long = 0
def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
val startGCTime = getTotalGCTime
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
@ -202,6 +208,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val serviceTime = System.currentTimeMillis() - taskStart
logInfo("Finished " + taskId)
deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
@ -210,7 +217,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
case t: Throwable => {
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
for (m <- metrics) {
m.executorRunTime = serviceTime.toInt
m.jvmGCTime = getTotalGCTime - startGCTime
}
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
}

View file

@ -111,7 +111,7 @@ object BlockFetcherIterator {
protected def sendRequest(req: FetchRequest) {
logDebug("Sending request for %d blocks (%s) from %s".format(
req.blocks.size, Utils.memoryBytesToString(req.size), req.address.hostPort))
req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
val cmId = new ConnectionManagerId(req.address.host, req.address.port)
val blockMessageArray = new BlockMessageArray(req.blocks.map {
case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))
@ -310,7 +310,7 @@ object BlockFetcherIterator {
}
logDebug("Sending request for %d blocks (%s) from %s".format(
req.blocks.size, Utils.memoryBytesToString(req.size), req.address.host))
req.blocks.size, Utils.bytesToString(req.size), req.address.host))
val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
val cpier = new ShuffleCopier
cpier.getBlocks(cmId, req.blocks, putResult)

View file

@ -332,7 +332,7 @@ object BlockManagerMasterActor {
private val _blocks = new JHashMap[String, BlockStatus]
logInfo("Registering block manager %s with %s RAM".format(
blockManagerId.hostPort, Utils.memoryBytesToString(maxMem)))
blockManagerId.hostPort, Utils.bytesToString(maxMem)))
def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
@ -358,12 +358,12 @@ object BlockManagerMasterActor {
if (storageLevel.useMemory) {
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(_remainingMem)))
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.memoryBytesToString(diskSize)))
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
@ -372,12 +372,12 @@ object BlockManagerMasterActor {
if (blockStatus.storageLevel.useMemory) {
_remainingMem += blockStatus.memSize
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(_remainingMem)))
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
Utils.bytesToString(_remainingMem)))
}
if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s on disk (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.memoryBytesToString(diskSize)))
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
}
}

View file

@ -147,7 +147,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
blockId, Utils.memoryBytesToString(bytes.limit), (finishTime - startTime)))
blockId, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
}
private def getFileBytes(file: File): ByteBuffer = {
@ -181,7 +181,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val timeTaken = System.currentTimeMillis - startTime
logDebug("Block %s stored as %s file on disk in %d ms".format(
blockId, Utils.memoryBytesToString(length), timeTaken))
blockId, Utils.bytesToString(length), timeTaken))
if (returnValues) {
// Return a byte buffer for the contents of the file

View file

@ -38,7 +38,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// blocks from the memory store.
private val putLock = new Object()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
logInfo("MemoryStore started with capacity %s.".format(Utils.bytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
@ -164,10 +164,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
} else {
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
}
true
} else {

View file

@ -42,9 +42,9 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
extends Ordered[RDDInfo] {
override def toString = {
import Utils.memoryBytesToString
import Utils.bytesToString
"RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
storageLevel.toString, numCachedPartitions, numPartitions, bytesToString(memSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo) = {

View file

@ -17,21 +17,20 @@
package spark.ui
import annotation.tailrec
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import net.liftweb.json.{JValue, pretty, render}
import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.util.parsing.json.JSONType
import scala.xml.Node
import org.eclipse.jetty.server.{Server, Request, Handler}
import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool
import scala.util.{Try, Success, Failure}
import scala.xml.Node
import spark.Logging
/** Utilities for launching a web server using Jetty's HTTP Server class */
private[spark] object JettyUtils extends Logging {
// Base type for a function that returns something based on an HTTP request. Allows for
@ -39,8 +38,8 @@ private[spark] object JettyUtils extends Logging {
type Responder[T] = HttpServletRequest => T
// Conversions from various types of Responder's to jetty Handlers
implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
implicit def jsonResponderToHandler(responder: Responder[JSONType]): Handler =
createHandler(responder, "text/json", (in: JSONType) => in.toString)
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString)
@ -48,7 +47,7 @@ private[spark] object JettyUtils extends Logging {
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
extractFn: T => String = (in: Any) => in.toString): Handler = {
new AbstractHandler {
def handle(target: String,

View file

@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, SparkContext, Utils}
import spark.{Logging, SparkContext, SparkEnv, Utils}
import spark.ui.env.EnvironmentUI
import spark.ui.exec.ExecutorsUI
import spark.ui.storage.BlockManagerUI
@ -43,8 +43,12 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val jobs = new JobProgressUI(sc)
val env = new EnvironmentUI(sc)
val exec = new ExecutorsUI(sc)
// Add MetricsServlet handlers by default
val metricsServletHandlers = SparkEnv.get.metricsSystem.getServletHandlers
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
exec.getHandlers ++ handlers
exec.getHandlers ++ metricsServletHandlers ++ handlers
/** Bind the HTTP server which backs this web interface */
def bind() {

View file

@ -125,9 +125,21 @@ private[spark] object UIUtils {
}
/** Returns an HTML table constructed by generating a row for each object in a sequence. */
def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>{headers.map(h => <th>{h}</th>)}</thead>
def listingTable[T](
headers: Seq[String],
makeRow: T => Seq[Node],
rows: Seq[T],
fixedWidth: Boolean = false): Seq[Node] = {
val colWidth = 100.toDouble / headers.size
val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
var tableClass = "table table-bordered table-striped table-condensed sortable"
if (fixedWidth) {
tableClass += " table-fixed"
}
<table class={tableClass}>
<thead>{headers.map(h => <th width={colWidthAttr}>{h}</th>)}</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>

View file

@ -22,7 +22,8 @@ import scala.util.Random
import spark.SparkContext
import spark.SparkContext._
import spark.scheduler.cluster.SchedulingMode
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
@ -48,9 +49,9 @@ private[spark] object UIWorkloadGenerator {
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s)
}
sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)

View file

@ -19,18 +19,17 @@ package spark.ui.env
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
import scala.collection.JavaConversions._
import scala.util.Properties
import scala.xml.Node
import org.eclipse.jetty.server.Handler
import spark.ui.JettyUtils._
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
import spark.ui.Page.Environment
import spark.SparkContext
import spark.ui.UIUtils
import scala.xml.Node
private[spark] class EnvironmentUI(sc: SparkContext) {
@ -46,20 +45,22 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
("Scala Home", Properties.scalaHome)
).sorted
def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
def jvmTable = UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation)
def jvmTable =
UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation, fixedWidth = true)
val properties = System.getProperties.iterator.toSeq
val classPathProperty = properties
.filter{case (k, v) => k.contains("java.class.path")}
.headOption
.getOrElse("", "")
val classPathProperty = properties.find { case (k, v) =>
k.contains("java.class.path")
}.getOrElse(("", ""))
val sparkProperties = properties.filter(_._1.startsWith("spark")).sorted
val otherProperties = properties.diff(sparkProperties :+ classPathProperty).sorted
val propertyHeaders = Seq("Name", "Value")
def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
val sparkPropertyTable = UIUtils.listingTable(propertyHeaders, propertyRow, sparkProperties)
val otherPropertyTable = UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties)
val sparkPropertyTable =
UIUtils.listingTable(propertyHeaders, propertyRow, sparkProperties, fixedWidth = true)
val otherPropertyTable =
UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
val classPathEntries = classPathProperty._2
.split(System.getProperty("path.separator", ":"))
@ -71,16 +72,23 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
val classPathHeaders = Seq("Resource", "Source")
def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
val classPathTable = UIUtils.listingTable(classPathHeaders, classPathRow, classPath)
val classPathTable =
UIUtils.listingTable(classPathHeaders, classPathRow, classPath, fixedWidth = true)
val content =
<span>
<h4>Runtime Information</h4> {jvmTable}
<h4>Spark Properties</h4> {sparkPropertyTable}
<h4>System Properties</h4> {otherPropertyTable}
<h4>Classpath Entries</h4> {classPathTable}
<hr/>
<h4>{sparkProperties.size} Spark Properties</h4>
{sparkPropertyTable}
<hr/>
<h4>{otherProperties.size} System Properties</h4>
{otherPropertyTable}
<hr/>
<h4>{classPath.size} Classpath Entries</h4>
{classPathTable}
</span>
headerSparkPage(content, sc, "Environment", Environment)
UIUtils.headerSparkPage(content, sc, "Environment", Environment)
}
}

View file

@ -1,25 +1,20 @@
package spark.ui.exec
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.{HashMap, HashSet}
import scala.xml.Node
import org.eclipse.jetty.server.Handler
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Properties
import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
import spark.{ExceptionFailure, Logging, Utils, SparkContext}
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
import spark.scheduler._
import spark.SparkContext
import spark.storage.{StorageStatus, StorageUtils}
import spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
import spark.ui.JettyUtils._
import spark.ui.Page.Executors
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
import scala.xml.{Node, XML}
private[spark] class ExecutorsUI(val sc: SparkContext) {
@ -38,32 +33,32 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_+_)
val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
def execRow(kv: Seq[String]) =
def execRow(kv: Seq[String]) = {
<tr>
<td>{kv(0)}</td>
<td>{kv(1)}</td>
<td>{kv(2)}</td>
<td sorttable_customkey={kv(3)}>
{Utils.memoryBytesToString(kv(3).toLong)} / {Utils.memoryBytesToString(kv(4).toLong)}
{Utils.bytesToString(kv(3).toLong)} / {Utils.bytesToString(kv(4).toLong)}
</td>
<td sorttable_customkey={kv(5)}>
{Utils.memoryBytesToString(kv(5).toLong)}
{Utils.bytesToString(kv(5).toLong)}
</td>
<td>{kv(6)}</td>
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
yield getExecInfo(b)
}
val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
val content =
@ -71,9 +66,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<div class="span12">
<ul class="unstyled">
<li><strong>Memory:</strong>
{Utils.memoryBytesToString(memUsed)} Used
({Utils.memoryBytesToString(maxMem)} Total) </li>
<li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
{Utils.bytesToString(memUsed)} Used
({Utils.bytesToString(maxMem)} Total) </li>
<li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
</ul>
</div>
</div>
@ -83,7 +78,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
</div>
</div>;
headerSparkPage(content, sc, "Executors", Executors)
UIUtils.headerSparkPage(content, sc, execInfo.size + " Executors", Executors)
}
def getExecInfo(a: Int): Seq[String] = {
@ -93,10 +88,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size)
.getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0)
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
Seq(
@ -106,10 +100,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
memUsed,
maxMem,
diskUsed,
activeTasks,
failedTasks,
completedTasks,
totalTasks
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
totalTasks.toString
)
}

View file

@ -24,7 +24,7 @@ import scala.xml.{NodeSeq, Node}
import spark.scheduler.cluster.SchedulingMode
import spark.ui.Page._
import spark.ui.UIUtils._
import spark.Utils
/** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) {
@ -46,7 +46,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
val poolTable = new PoolTable(listener.sc.getAllPools, listener)
val pools = listener.sc.getAllPools
val poolTable = new PoolTable(pools, listener)
val summary: NodeSeq =
<div>
<ul class="unstyled">
@ -76,15 +77,15 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val content = summary ++
{if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
<h4>Pools</h4> ++ poolTable.toNodeSeq
<hr/><h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else {
Seq()
}} ++
<h4 id="active">Active Stages: {activeStages.size}</h4> ++
<hr/><h4 id="active">{activeStages.size} Active Stages</h4> ++
activeStagesTable.toNodeSeq++
<h4 id="completed">Completed Stages: {completedStages.size}</h4> ++
<hr/><h4 id="completed">{completedStages.size} Completed Stages</h4> ++
completedStagesTable.toNodeSeq++
<h4 id ="failed">Failed Stages: {failedStages.size}</h4> ++
<hr/><h4 id ="failed">{failedStages.size} Failed Stages</h4> ++
failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs)

View file

@ -23,10 +23,11 @@ private[spark] class PoolPage(parent: JobProgressUI) {
val pool = listener.sc.getPoolForName(poolName).get
val poolTable = new PoolTable(Seq(pool), listener)
val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
<h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
val content = <h4>Summary </h4> ++ poolTable.toNodeSeq() ++
<hr/>
<h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq()
headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
headerSparkPage(content, parent.sc, "Fair Scheduler Pool: " + poolName, Jobs)
}
}
}

View file

@ -1,8 +1,8 @@
package spark.ui.jobs
import scala.xml.Node
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.xml.Node
import spark.scheduler.Stage
import spark.scheduler.cluster.Schedulable
@ -21,14 +21,14 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
rows: Seq[Schedulable]
): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<table class="table table-bordered table-striped table-condensed sortable table-fixed">
<thead>
<th>Pool Name</th>
<th>Minimum Share</th>
<th>Pool Weight</th>
<td>Active Stages</td>
<td>Running Tasks</td>
<td>SchedulingMode</td>
<th>Active Stages</th>
<th>Running Tasks</th>
<th>SchedulingMode</th>
</thead>
<tbody>
{rows.map(r => makeRow(r, poolToActiveStages))}
@ -36,7 +36,8 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
</table>
}
private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]])
: Seq[Node] = {
val activeStages = poolToActiveStages.get(p.name) match {
case Some(stages) => stages.size
case None => 0

View file

@ -46,11 +46,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
<h4>Summary Metrics</h4> No tasks have started yet
<h4>Tasks</h4> No tasks have started yet
</div>
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
return headerSparkPage(content, parent.sc, "Details for Stage %s".format(stageId), Jobs)
}
val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
val numCompleted = tasks.count(_._1.finished)
val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L)
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L)
@ -69,25 +70,26 @@ private[spark] class StagePage(parent: JobProgressUI) {
{if (hasShuffleRead)
<li>
<strong>Shuffle read: </strong>
{Utils.memoryBytesToString(shuffleReadBytes)}
{Utils.bytesToString(shuffleReadBytes)}
</li>
}
{if (hasShuffleWrite)
<li>
<strong>Shuffle write: </strong>
{Utils.memoryBytesToString(shuffleWriteBytes)}
{Utils.bytesToString(shuffleWriteBytes)}
</li>
}
</ul>
</div>
val taskHeaders: Seq[String] =
Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Details")
Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
Seq("GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Errors")
val taskTable = listingTable(taskHeaders, taskRow, tasks)
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@ -103,7 +105,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
ms => parent.formatDuration(ms.toLong))
def getQuantileCols(data: Seq[Double]) =
Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
val shuffleReadSizes = validTasks.map {
case(info, metrics, exception) =>
@ -121,21 +123,25 @@ private[spark] class StagePage(parent: JobProgressUI) {
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings))
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val content =
summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
<h2>Tasks</h2> ++ taskTable;
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<hr/><h4>Tasks</h4> ++ taskTable;
headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Jobs)
}
}
def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
@ -144,20 +150,28 @@ private[spark] class StagePage(parent: JobProgressUI) {
else metrics.map(m => m.executorRunTime).getOrElse(1)
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
<tr>
<td>{info.taskId}</td>
<td>{info.status}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
<td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")}
{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
<td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")}
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) parent.formatDuration(gcTime) else ""}
</td>
{if (shuffleRead) {
<td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
}}
{if (shuffleWrite) {
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
}}
<td>{exception.map(e =>
<span>
{e.className} ({e.description})<br/>

View file

@ -1,21 +1,14 @@
package spark.ui.jobs
import java.util.Date
import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest
import scala.Some
import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashMap
import scala.xml.Node
import scala.collection.mutable.HashSet
import spark.Utils
import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.Utils
import spark.storage.StorageLevel
/** Page showing list of all ongoing and recently finished stages */
private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
@ -38,10 +31,10 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
<td>Duration</td>
<td>Tasks: Succeeded/Total</td>
<td>Shuffle Read</td>
<td>Shuffle Write</td>
<th>Duration</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
</thead>
<tbody>
{rows.map(r => makeRow(r))}
@ -49,13 +42,6 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
</table>
}
private def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
@ -78,11 +64,11 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
case b => Utils.bytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
case b => Utils.bytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
@ -98,6 +84,8 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
val description = listener.stageToDescription.get(s)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
val duration = s.submissionTime.map(t => finishTime - t)
<tr>
<td>{s.id}</td>
@ -106,8 +94,9 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
}
<td>{description}</td>
<td valign="middle">{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>
{duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")}
</td>
<td class="progress-cell">
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
</td>

View file

@ -58,8 +58,8 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
</td>
<td>{rdd.numCachedPartitions}</td>
<td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
<td>{Utils.memoryBytesToString(rdd.memSize)}</td>
<td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
<td>{Utils.bytesToString(rdd.memSize)}</td>
<td>{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
}
}

View file

@ -21,12 +21,13 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import spark.storage.{StorageStatus, StorageUtils}
import spark.ui.UIUtils._
import spark.Utils
import spark.storage.{StorageStatus, StorageUtils}
import spark.storage.BlockManagerMasterActor.BlockStatus
import spark.ui.UIUtils._
import spark.ui.Page._
/** Page showing storage details for a given RDD */
private[spark] class RDDPage(parent: BlockManagerUI) {
val sc = parent.sc
@ -44,7 +45,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
val workerTable = listingTable(workerHeaders, workerRow, workers)
val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk",
"Locations")
"Executors")
val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
@ -71,11 +72,11 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
</li>
<li>
<strong>Memory Size:</strong>
{Utils.memoryBytesToString(rddInfo.memSize)}
{Utils.bytesToString(rddInfo.memSize)}
</li>
<li>
<strong>Disk Size:</strong>
{Utils.memoryBytesToString(rddInfo.diskSize)}
{Utils.bytesToString(rddInfo.diskSize)}
</li>
</ul>
</div>
@ -83,19 +84,19 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<hr/>
<div class="row">
<div class="span12">
<h3> Data Distribution Summary </h3>
<h4> Data Distribution on {workers.size} Executors </h4>
{workerTable}
</div>
</div>
<hr/>
<div class="row">
<div class="span12">
<h4> Partitions </h4>
<h4> {blocks.size} Partitions </h4>
{blockTable}
</div>
</div>;
headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Storage)
headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage)
}
def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
@ -106,10 +107,10 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
{block.storageLevel.description}
</td>
<td sorttable_customkey={block.memSize.toString}>
{Utils.memoryBytesToString(block.memSize)}
{Utils.bytesToString(block.memSize)}
</td>
<td sorttable_customkey={block.diskSize.toString}>
{Utils.memoryBytesToString(block.diskSize)}
{Utils.bytesToString(block.diskSize)}
</td>
<td>
{locations.map(l => <span>{l}<br/></span>)}
@ -122,10 +123,10 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<tr>
<td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
<td>
{Utils.memoryBytesToString(status.memUsed(prefix))}
({Utils.memoryBytesToString(status.memRemaining)} Total Available)
{Utils.bytesToString(status.memUsed(prefix))}
({Utils.bytesToString(status.memRemaining)} Remaining)
</td>
<td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
<td>{Utils.bytesToString(status.diskUsed(prefix))}</td>
</tr>
}
}

View file

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import com.google.common.base.Optional;
import scala.Tuple2;
import com.google.common.base.Charsets;
@ -197,6 +198,35 @@ public class JavaAPISuite implements Serializable {
cogrouped.collect();
}
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<Integer, Character>(1, 'x'),
new Tuple2<Integer, Character>(2, 'y'),
new Tuple2<Integer, Character>(2, 'z'),
new Tuple2<Integer, Character>(4, 'w')
));
List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
Assert.assertEquals(5, joined.size());
Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(
new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
@Override
public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
throws Exception {
return !tup._2()._2().isPresent();
}
}).first();
Assert.assertEquals(3, firstUnmatched._1().intValue());
}
@Test
public void foldReduce() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
@ -718,7 +748,7 @@ public class JavaAPISuite implements Serializable {
}
};
JavaRDD<Integer> sizes = rdd1.zipPartitions(sizesFn, rdd2);
JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}

View file

@ -17,17 +17,8 @@
package spark
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
import com.google.common.io.Files
import spark.rdd.ShuffledRDD
import spark.SparkContext._
@ -59,8 +50,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS),
classOf[spark.KryoSerializer].getName)
val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS))
.setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 10)
@ -81,7 +72,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
val c = new ShuffledRDD(b, new HashPartitioner(3))
.setSerializer(classOf[spark.KryoSerializer].getName)
assert(c.count === 10)
}
@ -96,7 +88,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
val c = new ShuffledRDD(b, new HashPartitioner(10), classOf[spark.KryoSerializer].getName)
val c = new ShuffledRDD(b, new HashPartitioner(10))
.setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 4)

View file

@ -26,14 +26,14 @@ import scala.util.Random
class UtilsSuite extends FunSuite {
test("memoryBytesToString") {
assert(Utils.memoryBytesToString(10) === "10.0 B")
assert(Utils.memoryBytesToString(1500) === "1500.0 B")
assert(Utils.memoryBytesToString(2000000) === "1953.1 KB")
assert(Utils.memoryBytesToString(2097152) === "2.0 MB")
assert(Utils.memoryBytesToString(2306867) === "2.2 MB")
assert(Utils.memoryBytesToString(5368709120L) === "5.0 GB")
assert(Utils.memoryBytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")
assert(Utils.bytesToString(2000000) === "1953.1 KB")
assert(Utils.bytesToString(2097152) === "2.0 MB")
assert(Utils.bytesToString(2306867) === "2.2 MB")
assert(Utils.bytesToString(5368709120L) === "5.0 GB")
assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
}
test("copyStream") {

View file

@ -40,7 +40,7 @@ class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)

View file

@ -1,12 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.metrics
import java.util.Properties
import java.io.{File, FileOutputStream}
import org.scalatest.{BeforeAndAfter, FunSuite}
import spark.metrics._
class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
@ -18,11 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
assert(conf.properties.size() === 0)
assert(conf.properties.size() === 5)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
assert(property.size() === 0)
assert(property.size() === 3)
assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
assert(property.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with properties set") {
@ -30,16 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 3)
assert(masterProp.size() === 6)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
assert(masterProp.getProperty("sink.servlet.sample") === "false")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 3)
assert(workerProp.size() === 6)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
assert(workerProp.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with subProperties") {
@ -47,7 +68,7 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val propCategories = conf.propertyCategories
assert(propCategories.size === 2)
assert(propCategories.size === 3)
val masterProp = conf.getInstance("master")
val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
@ -55,10 +76,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
assert(sinkProps.size === 1)
assert(sinkProps.size === 2)
assert(sinkProps.contains("console"))
assert(sinkProps.contains("servlet"))
val consoleProps = sinkProps("console")
assert(consoleProps.size() === 2)
val servletProps = sinkProps("servlet")
assert(servletProps.size() === 3)
}
}

View file

@ -1,12 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.metrics
import java.util.Properties
import java.io.{File, FileOutputStream}
import org.scalatest.{BeforeAndAfter, FunSuite}
import spark.metrics._
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
@ -22,6 +34,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
assert(sources.length === 0)
assert(sinks.length === 0)
assert(!metricsSystem.getServletHandlers.isEmpty)
}
test("MetricsSystem with sources add") {
@ -31,6 +44,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
assert(sources.length === 0)
assert(sinks.length === 1)
assert(!metricsSystem.getServletHandlers.isEmpty)
val source = new spark.deploy.master.MasterSource(null)
metricsSystem.registerSource(source)

View file

@ -57,23 +57,23 @@ object TaskThreadInfo {
* 1. each thread contains one job.
* 2. each job contains one stage.
* 3. each stage only contains one task.
* 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
* it will get cpu core resource, and will wait to finished after user manually
* release "Lock" and then cluster will contain another free cpu cores.
* 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
* it will get cpu core resource, and will wait to finished after user manually
* release "Lock" and then cluster will contain another free cpu cores.
* 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* thus it will be scheduled later when cluster has free cpu cores.
*/
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
TaskThreadInfo.threadToRunning(threadIndex) = false
val nums = sc.parallelize(threadIndex to threadIndex, 1)
TaskThreadInfo.threadToLock(threadIndex) = new Lock()
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {
@ -88,7 +88,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
}
}.start()
}
test("Local FIFO scheduler end-to-end test") {
System.setProperty("spark.cluster.schedulingmode", "FIFO")
sc = new SparkContext("local[4]", "test")
@ -103,8 +103,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
createThread(4,null,sc,sem)
TaskThreadInfo.threadToStarted(4).await()
// thread 5 and 6 (stage pending)must meet following two points
// 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
// queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
// queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
// So I just use "sleep" 1s here for each thread.
// TODO: any better solution?
@ -112,24 +112,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
Thread.sleep(1000)
createThread(6,null,sc,sem)
Thread.sleep(1000)
assert(TaskThreadInfo.threadToRunning(1) === true)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === false)
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(1).jobFinished()
TaskThreadInfo.threadToStarted(5).await()
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === false)
TaskThreadInfo.threadToLock(3).jobFinished()
TaskThreadInfo.threadToStarted(6).await()
@ -139,7 +139,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === true)
TaskThreadInfo.threadToLock(2).jobFinished()
TaskThreadInfo.threadToLock(4).jobFinished()
TaskThreadInfo.threadToLock(5).jobFinished()
@ -160,18 +160,18 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(20).await()
createThread(30,"3",sc,sem)
TaskThreadInfo.threadToStarted(30).await()
assert(TaskThreadInfo.threadToRunning(10) === true)
assert(TaskThreadInfo.threadToRunning(20) === true)
assert(TaskThreadInfo.threadToRunning(30) === true)
createThread(11,"1",sc,sem)
TaskThreadInfo.threadToStarted(11).await()
createThread(21,"2",sc,sem)
TaskThreadInfo.threadToStarted(21).await()
createThread(31,"3",sc,sem)
TaskThreadInfo.threadToStarted(31).await()
assert(TaskThreadInfo.threadToRunning(11) === true)
assert(TaskThreadInfo.threadToRunning(21) === true)
assert(TaskThreadInfo.threadToRunning(31) === true)
@ -185,19 +185,19 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(12) === true)
assert(TaskThreadInfo.threadToRunning(22) === true)
assert(TaskThreadInfo.threadToRunning(32) === false)
TaskThreadInfo.threadToLock(10).jobFinished()
TaskThreadInfo.threadToStarted(32).await()
assert(TaskThreadInfo.threadToRunning(32) === true)
//1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
//1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
// queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
//2. priority of 23 and 33 will be meaningless as using fair scheduler here.
createThread(23,"2",sc,sem)
createThread(33,"3",sc,sem)
Thread.sleep(1000)
TaskThreadInfo.threadToLock(11).jobFinished()
TaskThreadInfo.threadToStarted(23).await()
@ -206,7 +206,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(12).jobFinished()
TaskThreadInfo.threadToStarted(33).await()
assert(TaskThreadInfo.threadToRunning(33) === true)
TaskThreadInfo.threadToLock(20).jobFinished()
@ -217,7 +217,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(31).jobFinished()
TaskThreadInfo.threadToLock(32).jobFinished()
TaskThreadInfo.threadToLock(33).jobFinished()
sem.acquire(11)
sem.acquire(11)
}
}

View file

@ -67,7 +67,12 @@ object LogisticRegressionSuite {
}
class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
val sc = new SparkContext("local", "test")
@transient private var sc: SparkContext = _
override def beforeAll() {
sc = new SparkContext("local", "test")
}
override def afterAll() {
sc.stop()

View file

@ -62,7 +62,11 @@ object SVMSuite {
}
class SVMSuite extends FunSuite with BeforeAndAfterAll {
val sc = new SparkContext("local", "test")
@transient private var sc: SparkContext = _
override def beforeAll() {
sc = new SparkContext("local", "test")
}
override def afterAll() {
sc.stop()

View file

@ -27,9 +27,12 @@ import spark.SparkContext._
import org.jblas._
class KMeansSuite extends FunSuite with BeforeAndAfterAll {
val sc = new SparkContext("local", "test")
@transient private var sc: SparkContext = _
override def beforeAll() {
sc = new SparkContext("local", "test")
}
override def afterAll() {
sc.stop()

View file

@ -66,7 +66,11 @@ object ALSSuite {
class ALSSuite extends FunSuite with BeforeAndAfterAll {
val sc = new SparkContext("local", "test")
@transient private var sc: SparkContext = _
override def beforeAll() {
sc = new SparkContext("local", "test")
}
override def afterAll() {
sc.stop()

View file

@ -57,7 +57,12 @@ object LassoSuite {
}
class LassoSuite extends FunSuite with BeforeAndAfterAll {
val sc = new SparkContext("local", "test")
@transient private var sc: SparkContext = _
override def beforeAll() {
sc = new SparkContext("local", "test")
}
override def afterAll() {
sc.stop()

View file

@ -27,7 +27,11 @@ import spark.SparkContext._
class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
val sc = new SparkContext("local", "test")
@transient private var sc: SparkContext = _
override def beforeAll() {
sc = new SparkContext("local", "test")
}
override def afterAll() {
sc.stop()

12
pom.xml
View file

@ -70,7 +70,7 @@
<java.version>1.5</java.version>
<scala.version>2.9.3</scala.version>
<mesos.version>0.9.0-incubating</mesos.version>
<mesos.version>0.12.1</mesos.version>
<akka.version>2.0.3</akka.version>
<slf4j.version>1.7.2</slf4j.version>
<cdh.version>4.1.2</cdh.version>
@ -254,11 +254,6 @@
<version>10.4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.9.2</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
@ -269,6 +264,11 @@
<artifactId>metrics-jvm</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-json</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>

View file

@ -180,12 +180,12 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"net.liftweb" % "lift-json_2.9.2" % "2.5",
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"org.apache.mesos" % "mesos" % "0.12.1",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
) ++ (

70
python/examples/pagerank.py Executable file
View file

@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#!/usr/bin/env python
import re, sys
from operator import add
from pyspark import SparkContext
def computeContribs(urls, rank):
"""Calculates URL contributions to the rank of other URLs."""
num_urls = len(urls)
for url in urls: yield (url, rank / num_urls)
def parseNeighbors(urls):
"""Parses a urls pair string into urls pair."""
parts = re.split(r'\s+', urls)
return parts[0], parts[1]
if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, "Usage: pagerank <master> <file> <number_of_iterations>"
exit(-1)
# Initialize the spark context.
sc = SparkContext(sys.argv[1], "PythonPageRank")
# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# ...
lines = sc.textFile(sys.argv[2], 1)
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda (url, neighbors): (url, 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in xrange(int(sys.argv[3])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
computeContribs(urls, rank))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
print "%s has rank: %s." % (link, rank)

View file

@ -24,10 +24,15 @@ import os
import pyspark
from pyspark.context import SparkContext
# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None
sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell")
sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files)
print "Spark context avaiable as sc."
if add_files != None:
print "Adding files: [%s]" % ", ".join(add_files)
# The ./pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP,
# which allows us to execute the user's PYTHONSTARTUP file:
_pythonstartup = os.environ.get('OLD_PYTHONSTARTUP')

View file

@ -64,7 +64,7 @@ class TestCheckpoint(PySparkTestCase):
flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
self.assertFalse(flatMappedRDD.isCheckpointed())
self.assertIsNone(flatMappedRDD.getCheckpointFile())
self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
flatMappedRDD.checkpoint()
result = flatMappedRDD.collect()
@ -79,13 +79,13 @@ class TestCheckpoint(PySparkTestCase):
flatMappedRDD = parCollection.flatMap(lambda x: [x])
self.assertFalse(flatMappedRDD.isCheckpointed())
self.assertIsNone(flatMappedRDD.getCheckpointFile())
self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
flatMappedRDD.checkpoint()
flatMappedRDD.count() # forces a checkpoint to be computed
time.sleep(1) # 1 second
self.assertIsNotNone(flatMappedRDD.getCheckpointFile())
self.assertTrue(flatMappedRDD.getCheckpointFile() is not None)
recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile())
self.assertEquals([1, 2, 3, 4], recovered.collect())
@ -164,9 +164,12 @@ class TestDaemon(unittest.TestCase):
time.sleep(1)
# daemon should no longer accept connections
with self.assertRaises(EnvironmentError) as trap:
try:
self.connect(port)
self.assertEqual(trap.exception.errno, ECONNREFUSED)
except EnvironmentError as exception:
self.assertEqual(exception.errno, ECONNREFUSED)
else:
self.fail("Expected EnvironmentError to be raised")
def test_termination_stdin(self):
"""Ensure that daemon and workers terminate when stdin is closed."""

View file

@ -26,20 +26,18 @@ cd "$FWDIR/python"
FAILED=0
$FWDIR/pyspark pyspark/rdd.py
FAILED=$(($?||$FAILED))
rm -f unit-tests.log
$FWDIR/pyspark pyspark/context.py
FAILED=$(($?||$FAILED))
function run_test() {
$FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log
FAILED=$((PIPESTATUS[0]||$FAILED))
}
$FWDIR/pyspark -m doctest pyspark/broadcast.py
FAILED=$(($?||$FAILED))
$FWDIR/pyspark -m doctest pyspark/accumulators.py
FAILED=$(($?||$FAILED))
$FWDIR/pyspark -m unittest pyspark.tests
FAILED=$(($?||$FAILED))
run_test "pyspark/rdd.py"
run_test "pyspark/context.py"
run_test "-m doctest pyspark/broadcast.py"
run_test "-m doctest pyspark/accumulators.py"
run_test "pyspark/tests.py"
if [[ $FAILED != 0 ]]; then
echo -en "\033[31m" # Red

View file

@ -73,6 +73,35 @@
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>test</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<property name="spark.classpath" refid="maven.test.classpath"/>
<property environment="env"/>
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
<isset property="env.SCALA_HOME"/>
<isset property="env.SCALA_LIBRARY_PATH"/>
</or>
</not>
</condition>
</fail>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
@ -80,6 +109,7 @@
<environmentVariables>
<SPARK_HOME>${basedir}/..</SPARK_HOME>
<SPARK_TESTING>1</SPARK_TESTING>
<SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH>
</environmentVariables>
</configuration>
</plugin>

View file

@ -29,7 +29,7 @@ import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.{JavaRDD, JavaPairRDD}
import spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
import spark.RDD
@ -401,10 +401,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
(Seq[V], Option[S]) => Option[S] = {
val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
val list: JList[V] = values
val scalaState: Optional[S] = state match {
case Some(s) => Optional.of(s)
case _ => Optional.absent()
}
val scalaState: Optional[S] = JavaUtils.optionToOptional(state)
val result: Optional[S] = in.apply(list, scalaState)
result.isPresent match {
case true => Some(result.get())

View file

@ -121,7 +121,7 @@ object JavaAPICompletenessChecker {
SparkMethod(name, returnType, parameters)
}
private def toJavaType(scalaType: SparkType): SparkType = {
private def toJavaType(scalaType: SparkType, isReturnType: Boolean): SparkType = {
val renameSubstitutions = Map(
"scala.collection.Map" -> "java.util.Map",
// TODO: the JavaStreamingContext API accepts Array arguments
@ -140,40 +140,43 @@ object JavaAPICompletenessChecker {
case "spark.RDD" =>
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
parameters(0).asInstanceOf[ParameterizedType].parameters.map(toJavaType)
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
ParameterizedType(classOf[JavaPairRDD[_, _]].getName, tupleParams)
} else {
ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(toJavaType))
ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
}
case "spark.streaming.DStream" =>
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
parameters(0).asInstanceOf[ParameterizedType].parameters.map(toJavaType)
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
ParameterizedType("spark.streaming.api.java.JavaPairDStream", tupleParams)
} else {
ParameterizedType("spark.streaming.api.java.JavaDStream",
parameters.map(toJavaType))
parameters.map(applySubs))
}
// TODO: Spark Streaming uses Guava's Optional in place of Option, leading to some
// false-positives here:
case "scala.Option" =>
toJavaType(parameters(0))
case "scala.Option" => {
if (isReturnType) {
ParameterizedType("com.google.common.base.Optional", parameters.map(applySubs))
} else {
applySubs(parameters(0))
}
}
case "scala.Function1" =>
val firstParamName = parameters.last.name
if (firstParamName.startsWith("scala.collection.Traversable") ||
firstParamName.startsWith("scala.collection.Iterator")) {
ParameterizedType("spark.api.java.function.FlatMapFunction",
Seq(parameters(0),
parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(toJavaType))
parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(applySubs))
} else if (firstParamName == "scala.runtime.BoxedUnit") {
ParameterizedType("spark.api.java.function.VoidFunction",
parameters.dropRight(1).map(toJavaType))
parameters.dropRight(1).map(applySubs))
} else {
ParameterizedType("spark.api.java.function.Function", parameters.map(toJavaType))
ParameterizedType("spark.api.java.function.Function", parameters.map(applySubs))
}
case _ =>
ParameterizedType(renameSubstitutions.getOrElse(name, name),
parameters.map(toJavaType))
parameters.map(applySubs))
}
case BaseType(name) =>
if (renameSubstitutions.contains(name)) {
@ -194,8 +197,9 @@ object JavaAPICompletenessChecker {
private def toJavaMethod(method: SparkMethod): SparkMethod = {
val params = method.parameters
.filterNot(_.name == "scala.reflect.ClassManifest").map(toJavaType)
SparkMethod(method.name, toJavaType(method.returnType), params)
.filterNot(_.name == "scala.reflect.ClassManifest")
.map(toJavaType(_, isReturnType = false))
SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params)
}
private def isExcludedByName(method: Method): Boolean = {