org.apache.derby
diff --git a/core/src/main/resources/spark/ui/static/webui.css b/core/src/main/resources/spark/ui/static/webui.css
index b83f4109c0..9914a8ad2a 100644
--- a/core/src/main/resources/spark/ui/static/webui.css
+++ b/core/src/main/resources/spark/ui/static/webui.css
@@ -5,10 +5,6 @@
padding: 0;
}
-body {
- font-size: 15px !important;
-}
-
.version {
line-height: 30px;
vertical-align: bottom;
@@ -53,6 +49,10 @@ body {
line-height: 15px !important;
}
+.table-fixed {
+ table-layout:fixed;
+}
+
.table td {
vertical-align: middle !important;
}
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index d17e70a4fa..b1edaa06f8 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -39,7 +39,6 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Represents a dependency on the output of a shuffle stage.
- * @param shuffleId the shuffle id
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializerClass class name of the serializer to use
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 28b46990f8..469e870409 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -85,17 +85,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
- self.mapPartitions(aggregator.combineValuesByKey(_), true)
+ self.mapPartitions(aggregator.combineValuesByKey, true)
} else if (mapSideCombine) {
- val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
- val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
- partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
+ val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey, true)
+ val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
+ .setSerializer(serializerClass)
+ partitioned.mapPartitions(aggregator.combineCombinersByKey, true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
- val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
- values.mapPartitions(aggregator.combineValuesByKey(_), true)
+ val values = new ShuffledRDD[K, V](self, partitioner).setSerializer(serializerClass)
+ values.mapPartitions(aggregator.combineValuesByKey, true)
}
}
@@ -233,31 +234,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
- * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
- * is true, Spark will group values of the same key together on the map side before the
- * repartitioning, to only send each key over the network once. If a large number of
- * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
- * be set to true.
+ * Return a copy of the RDD partitioned using the specified partitioner.
*/
- def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
- if (getKeyClass().isArray) {
- if (mapSideCombine) {
- throw new SparkException("Cannot use map-side combining with array keys.")
- }
- if (partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("Default partitioner cannot partition array keys.")
- }
- }
- if (mapSideCombine) {
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner)
- bufs.flatMapValues(buf => buf)
- } else {
- new ShuffledRDD[K, V](self, partitioner)
+ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+ if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
}
+ new ShuffledRDD[K, V](self, partitioner)
}
/**
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 160b3e9d83..503ea6ccbf 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -515,22 +515,19 @@ abstract class RDD[T: ClassManifest](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
- def zipPartitions[B: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B]) => Iterator[V],
- rdd2: RDD[B]): RDD[V] =
+ def zipPartitions[B: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B])
+ (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C]): RDD[V] =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C])
+ (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C],
- rdd4: RDD[D]): RDD[V] =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
+ (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1069e27513..80c65dfebd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -267,16 +267,20 @@ class SparkContext(
localProperties.value = new Properties()
}
- def addLocalProperty(key: String, value: String) {
- if(localProperties.value == null) {
+ def setLocalProperty(key: String, value: String) {
+ if (localProperties.value == null) {
localProperties.value = new Properties()
}
- localProperties.value.setProperty(key,value)
+ if (value == null) {
+ localProperties.value.remove(key)
+ } else {
+ localProperties.value.setProperty(key, value)
+ }
}
/** Set a human readable description of the current job. */
- def setDescription(value: String) {
- addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
+ def setJobDescription(value: String) {
+ setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
}
// Post init
diff --git a/core/src/main/scala/spark/TaskState.scala b/core/src/main/scala/spark/TaskState.scala
index 9df7d8277b..bf75753056 100644
--- a/core/src/main/scala/spark/TaskState.scala
+++ b/core/src/main/scala/spark/TaskState.scala
@@ -24,9 +24,11 @@ private[spark] object TaskState
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
+ val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
+
type TaskState = Value
- def isFinished(state: TaskState) = Seq(FINISHED, FAILED, LOST).contains(state)
+ def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
def toMesos(state: TaskState): MesosTaskState = state match {
case LAUNCHING => MesosTaskState.TASK_STARTING
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 673f9a810d..885a7391d6 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -521,9 +521,9 @@ private object Utils extends Logging {
}
/**
- * Convert a memory quantity in bytes to a human-readable string such as "4.0 MB".
+ * Convert a quantity in bytes to a human-readable string such as "4.0 MB".
*/
- def memoryBytesToString(size: Long): String = {
+ def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
@@ -566,10 +566,10 @@ private object Utils extends Logging {
}
/**
- * Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
+ * Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
*/
- def memoryMegabytesToString(megabytes: Long): String = {
- memoryBytesToString(megabytes * 1024L * 1024L)
+ def megabytesToString(megabytes: Long): String = {
+ bytesToString(megabytes * 1024L * 1024L)
}
/**
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index ccc511dc5f..c2995b836a 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -23,6 +23,7 @@ import java.util.Comparator
import scala.Tuple2
import scala.collection.JavaConversions._
+import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
@@ -252,11 +253,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(rdd.subtract(other, p))
/**
- * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
- * is true, Spark will group values of the same key together on the map side before the
- * repartitioning, to only send each key over the network once. If a large number of
- * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
- * be set to true.
+ * Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.partitionBy(partitioner))
@@ -276,8 +273,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* partition the output RDD.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
- : JavaPairRDD[K, (V, Option[W])] =
- fromRDD(rdd.leftOuterJoin(other, partitioner))
+ : JavaPairRDD[K, (V, Optional[W])] = {
+ val joinResult = rdd.leftOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
+ }
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -286,8 +285,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* partition the output RDD.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
- : JavaPairRDD[K, (Option[V], W)] =
- fromRDD(rdd.rightOuterJoin(other, partitioner))
+ : JavaPairRDD[K, (Optional[V], W)] = {
+ val joinResult = rdd.rightOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
+ }
/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
@@ -340,8 +341,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
- def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
- fromRDD(rdd.leftOuterJoin(other))
+ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
+ val joinResult = rdd.leftOuterJoin(other)
+ fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
+ }
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@@ -349,8 +352,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
- def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Option[W])] =
- fromRDD(rdd.leftOuterJoin(other, numPartitions))
+ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
+ val joinResult = rdd.leftOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
+ }
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -358,8 +363,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
- def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
- fromRDD(rdd.rightOuterJoin(other))
+ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
+ val joinResult = rdd.rightOuterJoin(other)
+ fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
+ }
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -367,8 +374,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
- def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Option[V], W)] =
- fromRDD(rdd.rightOuterJoin(other, numPartitions))
+ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
+ val joinResult = rdd.rightOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
+ }
/**
* Return the key-value pairs in this RDD to the master as a Map.
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 21b5abf053..2c2b138f16 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -207,12 +207,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of elements in each partition.
*/
def zipPartitions[U, V](
- f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V],
- other: JavaRDDLike[U, _]): JavaRDD[V] = {
+ other: JavaRDDLike[U, _],
+ f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
- rdd.zipPartitions(fn, other.rdd)(other.classManifest, f.elementType()))(f.elementType())
+ rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
}
// Actions (launch a job to return a value to the user program)
@@ -366,10 +366,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile(): Optional[String] = {
- rdd.getCheckpointFile match {
- case Some(file) => Optional.of(file)
- case _ => Optional.absent()
- }
+ JavaUtils.optionToOptional(rdd.getCheckpointFile)
}
/** A description of this RDD and its recursive dependencies for debugging. */
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index fe182e7ab6..29d57004b5 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -32,6 +32,8 @@ import spark.SparkContext.IntAccumulatorParam
import spark.SparkContext.DoubleAccumulatorParam
import spark.broadcast.Broadcast
+import com.google.common.base.Optional
+
/**
* A Java-friendly version of [[spark.SparkContext]] that returns [[spark.api.java.JavaRDD]]s and
* works with Java collections instead of Scala ones.
@@ -337,7 +339,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* or the spark.home Java property, or the SPARK_HOME environment variable
* (in that order of preference). If neither of these is set, return None.
*/
- def getSparkHome(): Option[String] = sc.getSparkHome()
+ def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())
/**
* Add a file to be downloaded with this Spark job on every node.
diff --git a/core/src/main/scala/spark/api/java/JavaUtils.scala b/core/src/main/scala/spark/api/java/JavaUtils.scala
new file mode 100644
index 0000000000..ffc131ac83
--- /dev/null
+++ b/core/src/main/scala/spark/api/java/JavaUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.api.java
+
+import com.google.common.base.Optional
+
+object JavaUtils {
+ def optionToOptional[T](option: Option[T]): Optional[T] =
+ option match {
+ case Some(value) => Optional.of(value)
+ case None => Optional.absent()
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index bd1db7c294..6b71b953dd 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -17,7 +17,7 @@
package spark.deploy
-import net.liftweb.json.JsonDSL._
+import scala.util.parsing.json.{JSONArray, JSONObject, JSONType}
import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import spark.deploy.master.{ApplicationInfo, WorkerInfo}
@@ -25,61 +25,63 @@ import spark.deploy.worker.ExecutorRunner
private[spark] object JsonProtocol {
- def writeWorkerInfo(obj: WorkerInfo) = {
- ("id" -> obj.id) ~
- ("host" -> obj.host) ~
- ("port" -> obj.port) ~
- ("webuiaddress" -> obj.webUiAddress) ~
- ("cores" -> obj.cores) ~
- ("coresused" -> obj.coresUsed) ~
- ("memory" -> obj.memory) ~
- ("memoryused" -> obj.memoryUsed)
- }
- def writeApplicationInfo(obj: ApplicationInfo) = {
- ("starttime" -> obj.startTime) ~
- ("id" -> obj.id) ~
- ("name" -> obj.desc.name) ~
- ("cores" -> obj.desc.maxCores) ~
- ("user" -> obj.desc.user) ~
- ("memoryperslave" -> obj.desc.memoryPerSlave) ~
- ("submitdate" -> obj.submitDate.toString)
- }
+ def writeWorkerInfo(obj: WorkerInfo): JSONType = JSONObject(Map(
+ "id" -> obj.id,
+ "host" -> obj.host,
+ "port" -> obj.port,
+ "webuiaddress" -> obj.webUiAddress,
+ "cores" -> obj.cores,
+ "coresused" -> obj.coresUsed,
+ "memory" -> obj.memory,
+ "memoryused" -> obj.memoryUsed,
+ "state" -> obj.state.toString
+ ))
- def writeApplicationDescription(obj: ApplicationDescription) = {
- ("name" -> obj.name) ~
- ("cores" -> obj.maxCores) ~
- ("memoryperslave" -> obj.memoryPerSlave) ~
- ("user" -> obj.user)
- }
+ def writeApplicationInfo(obj: ApplicationInfo): JSONType = JSONObject(Map(
+ "starttime" -> obj.startTime,
+ "id" -> obj.id,
+ "name" -> obj.desc.name,
+ "cores" -> obj.desc.maxCores,
+ "user" -> obj.desc.user,
+ "memoryperslave" -> obj.desc.memoryPerSlave,
+ "submitdate" -> obj.submitDate.toString
+ ))
- def writeExecutorRunner(obj: ExecutorRunner) = {
- ("id" -> obj.execId) ~
- ("memory" -> obj.memory) ~
- ("appid" -> obj.appId) ~
- ("appdesc" -> writeApplicationDescription(obj.appDesc))
- }
+ def writeApplicationDescription(obj: ApplicationDescription): JSONType = JSONObject(Map(
+ "name" -> obj.name,
+ "cores" -> obj.maxCores,
+ "memoryperslave" -> obj.memoryPerSlave,
+ "user" -> obj.user
+ ))
- def writeMasterState(obj: MasterStateResponse) = {
- ("url" -> ("spark://" + obj.uri)) ~
- ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
- ("cores" -> obj.workers.map(_.cores).sum) ~
- ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
- ("memory" -> obj.workers.map(_.memory).sum) ~
- ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
- ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
- ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
- }
+ def writeExecutorRunner(obj: ExecutorRunner): JSONType = JSONObject(Map(
+ "id" -> obj.execId,
+ "memory" -> obj.memory,
+ "appid" -> obj.appId,
+ "appdesc" -> writeApplicationDescription(obj.appDesc)
+ ))
- def writeWorkerState(obj: WorkerStateResponse) = {
- ("id" -> obj.workerId) ~
- ("masterurl" -> obj.masterUrl) ~
- ("masterwebuiurl" -> obj.masterWebUiUrl) ~
- ("cores" -> obj.cores) ~
- ("coresused" -> obj.coresUsed) ~
- ("memory" -> obj.memory) ~
- ("memoryused" -> obj.memoryUsed) ~
- ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
- ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
- }
+ def writeMasterState(obj: MasterStateResponse): JSONType = JSONObject(Map(
+ "url" -> ("spark://" + obj.uri),
+ "workers" -> obj.workers.toList.map(writeWorkerInfo),
+ "cores" -> obj.workers.map(_.cores).sum,
+ "coresused" -> obj.workers.map(_.coresUsed).sum,
+ "memory" -> obj.workers.map(_.memory).sum,
+ "memoryused" -> obj.workers.map(_.memoryUsed).sum,
+ "activeapps" -> JSONArray(obj.activeApps.toList.map(writeApplicationInfo)),
+ "completedapps" -> JSONArray(obj.completedApps.toList.map(writeApplicationInfo))
+ ))
+
+ def writeWorkerState(obj: WorkerStateResponse): JSONType = JSONObject(Map(
+ "id" -> obj.workerId,
+ "masterurl" -> obj.masterUrl,
+ "masterwebuiurl" -> obj.masterWebUiUrl,
+ "cores" -> obj.cores,
+ "coresused" -> obj.coresUsed,
+ "memory" -> obj.memory,
+ "memoryused" -> obj.memoryUsed,
+ "executors" -> JSONArray(obj.executors.toList.map(writeExecutorRunner)),
+ "finishedexecutors" -> JSONArray(obj.finishedExecutors.toList.map(writeExecutorRunner))
+ ))
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 4a4d9908a0..04af5e149c 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -57,14 +57,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
var firstApp: Option[ApplicationInfo] = None
- val webUi = new MasterWebUI(self, webUiPort)
-
Utils.checkHost(host, "Expected hostname")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
+ val webUi = new MasterWebUI(this, webUiPort)
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
@@ -96,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def receive = {
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
- host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
+ host, workerPort, cores, Utils.megabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 36a1e91b24..494a9b914d 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,6 +17,7 @@
package spark.deploy.master.ui
+import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
@@ -25,19 +26,17 @@ import akka.util.duration._
import javax.servlet.http.HttpServletRequest
-import net.liftweb.json.JsonAST.JValue
-
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import spark.deploy.JsonProtocol
import spark.deploy.master.ExecutorInfo
import spark.ui.UIUtils
private[spark] class ApplicationPage(parent: MasterWebUI) {
- val master = parent.master
+ val master = parent.masterActorRef
implicit val timeout = parent.timeout
/** Executor details for a particular application */
- def renderJson(request: HttpServletRequest): JValue = {
+ def renderJson(request: HttpServletRequest): JSONType = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index d3b10f197b..28e421e3bc 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -19,14 +19,13 @@ package spark.deploy.master.ui
import javax.servlet.http.HttpServletRequest
+import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import net.liftweb.json.JsonAST.JValue
-
import spark.Utils
import spark.deploy.DeployWebUI
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
@@ -35,10 +34,10 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo}
import spark.ui.UIUtils
private[spark] class IndexPage(parent: MasterWebUI) {
- val master = parent.master
+ val master = parent.masterActorRef
implicit val timeout = parent.timeout
- def renderJson(request: HttpServletRequest): JValue = {
+ def renderJson(request: HttpServletRequest): JSONType = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeMasterState(state)
@@ -53,7 +52,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
val workers = state.workers.sortBy(_.id)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
- val appHeaders = Seq("ID", "Description", "Cores", "Memory per Node", "Submit Time", "User",
+ val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User",
"State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
@@ -70,8 +69,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
Cores: {state.workers.map(_.cores).sum} Total,
{state.workers.map(_.coresUsed).sum} Used
Memory:
- {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
- {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used
+ {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
+ {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used
Applications:
{state.activeApps.size} Running,
{state.completedApps.size} Completed
@@ -116,8 +115,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
{worker.state} |
{worker.cores} ({worker.coresUsed} Used) |
- {Utils.memoryMegabytesToString(worker.memory)}
- ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)
+ {Utils.megabytesToString(worker.memory)}
+ ({Utils.megabytesToString(worker.memoryUsed)} Used)
|
}
@@ -135,7 +134,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
{app.coresGranted}
- {Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}
+ {Utils.megabytesToString(app.desc.memoryPerSlave)}
|
{DeployWebUI.formatDate(app.submitDate)} |
{app.desc.user} |
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index 31bdb7854e..c91e1db9f2 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,7 +17,6 @@
package spark.deploy.master.ui
-import akka.actor.ActorRef
import akka.util.Duration
import javax.servlet.http.HttpServletRequest
@@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, Utils}
+import spark.deploy.master.Master
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
@@ -32,12 +32,14 @@ import spark.ui.JettyUtils._
* Web UI server for the standalone master.
*/
private[spark]
-class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
+class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName()
val port = requestedPort
+ val masterActorRef = master.self
+
var server: Option[Server] = None
var boundPort: Option[Int] = None
@@ -57,7 +59,10 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
}
}
- val handlers = Array[(String, Handler)](
+ val metricsHandlers = master.masterMetricsSystem.getServletHandlers ++
+ master.applicationMetricsSystem.getServletHandlers
+
+ val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 4537c8305c..f661accd2f 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -170,10 +170,10 @@ private[spark] class ExecutorRunner(
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
- Files.write(header, stdout, Charsets.UTF_8)
redirectStream(process.getInputStream, stdout)
val stderr = new File(executorDir, "stderr")
+ Files.write(header, stderr, Charsets.UTF_8)
redirectStream(process.getErrorStream, stderr)
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0e46fa281e..d4b58fc34e 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -96,11 +96,12 @@ private[spark] class Worker(
override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
- host, port, cores, Utils.memoryMegabytesToString(memory)))
+ host, port, cores, Utils.megabytesToString(memory)))
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
+
webUi.start()
connectToMaster()
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index 700eb22d96..02993d58a0 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -19,14 +19,13 @@ package spark.deploy.worker.ui
import javax.servlet.http.HttpServletRequest
+import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import net.liftweb.json.JsonAST.JValue
-
import spark.Utils
import spark.deploy.JsonProtocol
import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
@@ -39,7 +38,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val worker = parent.worker
val timeout = parent.timeout
- def renderJson(request: HttpServletRequest): JValue = {
+ def renderJson(request: HttpServletRequest): JSONType = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
@@ -65,8 +64,8 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
Master URL: {workerState.masterUrl}
Cores: {workerState.cores} ({workerState.coresUsed} Used)
- Memory: {Utils.memoryMegabytesToString(workerState.memory)}
- ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)
+ Memory: {Utils.megabytesToString(workerState.memory)}
+ ({Utils.megabytesToString(workerState.memoryUsed)} Used)
Back to Master
@@ -97,7 +96,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
{executor.execId} |
{executor.cores} |
- {Utils.memoryMegabytesToString(executor.memory)}
+ {Utils.megabytesToString(executor.memory)}
|
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index 742e0a5fb6..717619f80d 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -17,7 +17,6 @@
package spark.deploy.worker.ui
-import akka.actor.ActorRef
import akka.util.{Duration, Timeout}
import java.io.{FileInputStream, File}
@@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val indexPage = new IndexPage(this)
- val handlers = Array[(String, Handler)](
+ val metricsHandlers = worker.metricsSystem.getServletHandlers
+
+ val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
("/logPage", (request: HttpServletRequest) => logPage(request)),
@@ -113,7 +114,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
.format(appId, executorId, logType, math.max(startByte-byteLength, 0),
byteLength)}>
}
@@ -128,7 +129,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 8a74a8d853..05a960d7c5 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -17,18 +17,17 @@
package spark.executor
-import java.io.{File, FileOutputStream}
-import java.net.{URI, URL, URLClassLoader}
+import java.io.{File}
+import java.lang.management.ManagementFactory
+import java.nio.ByteBuffer
import java.util.concurrent._
-import org.apache.hadoop.fs.FileUtil
+import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
-import scala.collection.mutable.{ArrayBuffer, Map, HashMap}
-
-import spark.broadcast._
import spark.scheduler._
import spark._
-import java.nio.ByteBuffer
+
/**
* The Mesos executor for Spark.
@@ -116,6 +115,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var attemptedTask: Option[Task[Any]] = None
var taskStart: Long = 0
+ def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+ val startGCTime = getTotalGCTime
+
try {
SparkEnv.set(env)
Accumulators.clear()
@@ -128,10 +130,11 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
- task.metrics.foreach{ m =>
+ for (m <- task.metrics) {
m.hostname = Utils.localHostName
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
+ m.jvmGCTime = getTotalGCTime - startGCTime
}
//TODO I'd also like to track the time it takes to serialize the task results, but that is huge headache, b/c
// we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could
@@ -155,7 +158,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
case t: Throwable => {
val serviceTime = (System.currentTimeMillis() - taskStart).toInt
val metrics = attemptedTask.flatMap(t => t.metrics)
- metrics.foreach{m => m.executorRunTime = serviceTime}
+ for (m <- metrics) {
+ m.executorRunTime = serviceTime
+ m.jvmGCTime = getTotalGCTime - startGCTime
+ }
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index 3151627839..47b8890bee 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -31,13 +31,18 @@ class TaskMetrics extends Serializable {
/**
* Time the executor spends actually running the task (including fetching shuffle data)
*/
- var executorRunTime:Int = _
+ var executorRunTime: Int = _
/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
var resultSize: Long = _
+ /**
+ * Amount of time the JVM spent in garbage collection while executing this task
+ */
+ var jvmGCTime: Long = _
+
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 3e32e9c82f..d7fb5378a4 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -36,7 +36,11 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
var propertyCategories: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
- // empty function, any default property can be set here
+ prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
+ prop.setProperty("*.sink.servlet.uri", "/metrics/json")
+ prop.setProperty("*.sink.servlet.sample", "false")
+ prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
+ prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
}
def initialize() {
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 1dacafa135..4e6c6b26c8 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
import spark.Logging
-import spark.metrics.sink.Sink
+import spark.metrics.sink.{MetricsServlet, Sink}
import spark.metrics.source.Source
/**
@@ -35,7 +35,7 @@ import spark.metrics.source.Source
* "instance" specify "who" (the role) use metrics system. In spark there are several roles
* like master, worker, executor, client driver, these roles will create metrics system
* for monitoring. So instance represents these roles. Currently in Spark, several instances
- * have already implemented: master, worker, executor, driver.
+ * have already implemented: master, worker, executor, driver, applications.
*
* "source" specify "where" (source) to collect metrics data. In metrics system, there exists
* two kinds of source:
@@ -51,8 +51,8 @@ import spark.metrics.source.Source
* Metrics configuration format is like below:
* [instance].[sink|source].[name].[options] = xxxx
*
- * [instance] can be "master", "worker", "executor", "driver", which means only the specified
- * instance has this property.
+ * [instance] can be "master", "worker", "executor", "driver", "applications" which means only
+ * the specified instance has this property.
* wild card "*" can be used to replace instance name, which means all the instances will have
* this property.
*
@@ -72,6 +72,12 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry()
+ // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
+ private var metricsServlet: Option[MetricsServlet] = None
+
+ /** Get any UI handlers used by this metrics system. */
+ def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
+
metricsConfig.initialize()
registerSources()
registerSinks()
@@ -126,7 +132,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
- sinks += sink.asInstanceOf[Sink]
+ if (kv._1 == "servlet") {
+ metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+ } else {
+ sinks += sink.asInstanceOf[Sink]
+ }
} catch {
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
}
diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
new file mode 100644
index 0000000000..17432b1ed1
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.json.MetricsModule
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import spark.ui.JettyUtils
+
+class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val SERVLET_KEY_URI = "uri"
+ val SERVLET_KEY_SAMPLE = "sample"
+
+ val servletURI = property.getProperty(SERVLET_KEY_URI)
+
+ val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
+
+ val mapper = new ObjectMapper().registerModule(
+ new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
+
+ def getHandlers = Array[(String, Handler)](
+ (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
+ )
+
+ def getMetricsSnapshot(request: HttpServletRequest): String = {
+ mapper.writeValueAsString(registry)
+ }
+
+ override def start() { }
+
+ override def stop() { }
+}
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index c540cd36eb..c2d95dc060 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -52,13 +52,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
override def hashCode(): Int = idx
}
-private[spark] class CoGroupAggregator
- extends Aggregator[Any, Any, ArrayBuffer[Any]](
- { x => ArrayBuffer(x) },
- { (b, x) => b += x },
- { (b1, b2) => b1 ++ b2 })
- with Serializable
-
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
@@ -66,34 +59,25 @@ private[spark] class CoGroupAggregator
*
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
- * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
- * is on, Spark does an extra pass over the data on the map side to merge
- * all values belonging to the same key together. This can reduce the amount
- * of data shuffled if and only if the number of distinct keys is very small,
- * and the ratio of key size to value size is also very small.
*/
-class CoGroupedRDD[K](
- @transient var rdds: Seq[RDD[(K, _)]],
- part: Partitioner,
- val mapSideCombine: Boolean = false,
- val serializerClass: String = null)
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
- private val aggr = new CoGroupAggregator
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): CoGroupedRDD[K] = {
+ serializerClass = cls
+ this
+ }
override def getDependencies: Seq[Dependency[_]] = {
- rdds.map { rdd =>
+ rdds.map { rdd: RDD[(K, _)] =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- if (mapSideCombine) {
- val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass)
- } else {
- new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
- }
+ new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
}
}
}
@@ -145,16 +129,8 @@ class CoGroupedRDD[K](
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- if (mapSideCombine) {
- // With map side combine on, for each key, the shuffle fetcher returns a list of values.
- fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
- case (key, values) => getSeq(key)(depNum) ++= values
- }
- } else {
- // With map side combine off, for each key the shuffle fetcher returns a single value.
- fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
- case (key, value) => getSeq(key)(depNum) += value
- }
+ fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
+ case (key, value) => getSeq(key)(depNum) += value
}
}
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 0137f80953..bcf7d0d89c 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -17,8 +17,9 @@
package spark.rdd
-import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
-import spark.SparkContext._
+import spark._
+import scala.Some
+import scala.Some
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -30,15 +31,24 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* The resulting RDD from a shuffle (e.g. repartitioning of data).
* @param prev the parent RDD.
* @param part the partitioner used to partition the RDD
- * @param serializerClass class name of the serializer to use.
* @tparam K the key class.
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
- @transient prev: RDD[(K, V)],
- part: Partitioner,
- serializerClass: String = null)
- extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part, serializerClass))) {
+ @transient var prev: RDD[(K, V)],
+ part: Partitioner)
+ extends RDD[(K, V)](prev.context, Nil) {
+
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): ShuffledRDD[K, V] = {
+ serializerClass = cls
+ this
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ List(new ShuffleDependency(prev, part, serializerClass))
+ }
override val partitioner = Some(part)
@@ -51,4 +61,9 @@ class ShuffledRDD[K, V](
SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics,
SparkEnv.get.serializerManager.get(serializerClass))
}
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ prev = null
+ }
}
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 0402b9f250..46b8cafaac 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -49,10 +49,16 @@ import spark.OneToOneDependency
private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
@transient var rdd1: RDD[(K, V)],
@transient var rdd2: RDD[(K, W)],
- part: Partitioner,
- val serializerClass: String = null)
+ part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): SubtractedRDD[K, V, W] = {
+ serializerClass = cls
+ this
+ }
+
override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
if (rdd.partitioner == Some(part)) {
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 2a09a956ad..e5531011c2 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -153,7 +153,7 @@ object StatsReportListener extends Logging {
}
def showBytesDistribution(heading: String, dist: Distribution) {
- showDistribution(heading, dist, (d => Utils.memoryBytesToString(d.toLong)): Double => String)
+ showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
}
def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index b2d089f31d..2fc8a76a05 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,19 +17,14 @@
package spark.scheduler.cluster
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
+import java.util.Properties
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.util.control.Breaks._
-import scala.xml._
+import scala.xml.XML
import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-import java.util.Properties
/**
* An interface to build Schedulable tree
@@ -56,7 +51,7 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
+ val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -69,39 +64,44 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
+ if (schedulerAllocFile != null) {
val file = new File(schedulerAllocFile)
- if (file.exists()) {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+ if (file.exists()) {
+ val xml = XML.loadFile(file)
+ for (poolNode <- (xml \\ POOLS_PROPERTY)) {
- val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
+ val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+ var schedulingMode = DEFAULT_SCHEDULING_MODE
+ var minShare = DEFAULT_MINIMUM_SHARE
+ var weight = DEFAULT_WEIGHT
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ } catch {
+ case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+ }
}
- }
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
+ val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShare != "") {
+ minShare = xmlMinShare.toInt
+ }
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
+ val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
- logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
- poolName, schedulingMode, minShare, weight))
+ val pool = new Pool(poolName, schedulingMode, minShare, weight)
+ rootPool.addSchedulable(pool)
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ poolName, schedulingMode, minShare, weight))
+ }
+ } else {
+ throw new java.io.FileNotFoundException(
+ "Fair scheduler allocation file not found: " + schedulerAllocFile)
}
}
@@ -110,7 +110,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
- logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+ logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
@@ -127,7 +127,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
- logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 55d6c0a47e..42c3b4a6cf 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -77,7 +77,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(executorId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
- executorId, hostPort, cores, Utils.memoryMegabytesToString(memory)))
+ executorId, hostPort, cores, Utils.megabytesToString(memory)))
}
override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index f274b1a767..6c43928bc8 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -18,8 +18,11 @@
package spark.scheduler.local
import java.io.File
+import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.AtomicInteger
import java.nio.ByteBuffer
+
+import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
@@ -173,6 +176,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var attemptedTask: Option[Task[_]] = None
val start = System.currentTimeMillis()
var taskStart: Long = 0
+ def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
+ val startGCTime = getTotalGCTime
+
try {
Accumulators.clear()
Thread.currentThread().setContextClassLoader(classLoader)
@@ -202,6 +208,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val serviceTime = System.currentTimeMillis() - taskStart
logInfo("Finished " + taskId)
deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
+ deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
@@ -210,7 +217,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
case t: Throwable => {
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
- metrics.foreach{m => m.executorRunTime = serviceTime.toInt}
+ for (m <- metrics) {
+ m.executorRunTime = serviceTime.toInt
+ m.jvmGCTime = getTotalGCTime - startGCTime
+ }
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
}
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index 07e3db30fe..568783d893 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -111,7 +111,7 @@ object BlockFetcherIterator {
protected def sendRequest(req: FetchRequest) {
logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.memoryBytesToString(req.size), req.address.hostPort))
+ req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
val cmId = new ConnectionManagerId(req.address.host, req.address.port)
val blockMessageArray = new BlockMessageArray(req.blocks.map {
case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))
@@ -310,7 +310,7 @@ object BlockFetcherIterator {
}
logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.memoryBytesToString(req.size), req.address.host))
+ req.blocks.size, Utils.bytesToString(req.size), req.address.host))
val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
val cpier = new ShuffleCopier
cpier.getBlocks(cmId, req.blocks, putResult)
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 011bb6b83d..2a2e178550 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -332,7 +332,7 @@ object BlockManagerMasterActor {
private val _blocks = new JHashMap[String, BlockStatus]
logInfo("Registering block manager %s with %s RAM".format(
- blockManagerId.hostPort, Utils.memoryBytesToString(maxMem)))
+ blockManagerId.hostPort, Utils.bytesToString(maxMem)))
def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
@@ -358,12 +358,12 @@ object BlockManagerMasterActor {
if (storageLevel.useMemory) {
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
+ Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
logInfo("Added %s on disk on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(diskSize)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
@@ -372,12 +372,12 @@ object BlockManagerMasterActor {
if (blockStatus.storageLevel.useMemory) {
_remainingMem += blockStatus.memSize
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
+ Utils.bytesToString(_remainingMem)))
}
if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s on disk (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(diskSize)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
}
}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3ebfe173b1..b14497157e 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -147,7 +147,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
- blockId, Utils.memoryBytesToString(bytes.limit), (finishTime - startTime)))
+ blockId, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
}
private def getFileBytes(file: File): ByteBuffer = {
@@ -181,7 +181,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val timeTaken = System.currentTimeMillis - startTime
logDebug("Block %s stored as %s file on disk in %d ms".format(
- blockId, Utils.memoryBytesToString(length), timeTaken))
+ blockId, Utils.bytesToString(length), timeTaken))
if (returnValues) {
// Return a byte buffer for the contents of the file
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index b5a86b85a7..5a51f5cf31 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -38,7 +38,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// blocks from the memory store.
private val putLock = new Object()
- logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
+ logInfo("MemoryStore started with capacity %s.".format(Utils.bytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
@@ -164,10 +164,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
} else {
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
}
true
} else {
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index 2aeed4ea3c..123b8f6345 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -42,9 +42,9 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
extends Ordered[RDDInfo] {
override def toString = {
- import Utils.memoryBytesToString
+ import Utils.bytesToString
"RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
- storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
+ storageLevel.toString, numCachedPartitions, numPartitions, bytesToString(memSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index ca6088ad93..ba58f35729 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -17,21 +17,20 @@
package spark.ui
-import annotation.tailrec
-
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
-import net.liftweb.json.{JValue, pretty, render}
+import scala.annotation.tailrec
+import scala.util.{Try, Success, Failure}
+import scala.util.parsing.json.JSONType
+import scala.xml.Node
import org.eclipse.jetty.server.{Server, Request, Handler}
import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool
-import scala.util.{Try, Success, Failure}
-import scala.xml.Node
-
import spark.Logging
+
/** Utilities for launching a web server using Jetty's HTTP Server class */
private[spark] object JettyUtils extends Logging {
// Base type for a function that returns something based on an HTTP request. Allows for
@@ -39,8 +38,8 @@ private[spark] object JettyUtils extends Logging {
type Responder[T] = HttpServletRequest => T
// Conversions from various types of Responder's to jetty Handlers
- implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
- createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
+ implicit def jsonResponderToHandler(responder: Responder[JSONType]): Handler =
+ createHandler(responder, "text/json", (in: JSONType) => in.toString)
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html", (in: Seq[Node]) => "" + in.toString)
@@ -48,7 +47,7 @@ private[spark] object JettyUtils extends Logging {
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
- private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
+ def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
extractFn: T => String = (in: Any) => in.toString): Handler = {
new AbstractHandler {
def handle(target: String,
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 7599f82a94..1fd5a0989e 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
-import spark.{Logging, SparkContext, Utils}
+import spark.{Logging, SparkContext, SparkEnv, Utils}
import spark.ui.env.EnvironmentUI
import spark.ui.exec.ExecutorsUI
import spark.ui.storage.BlockManagerUI
@@ -43,8 +43,12 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val jobs = new JobProgressUI(sc)
val env = new EnvironmentUI(sc)
val exec = new ExecutorsUI(sc)
+
+ // Add MetricsServlet handlers by default
+ val metricsServletHandlers = SparkEnv.get.metricsSystem.getServletHandlers
+
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
- exec.getHandlers ++ handlers
+ exec.getHandlers ++ metricsServletHandlers ++ handlers
/** Bind the HTTP server which backs this web interface */
def bind() {
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index ee7a8b482e..fe2afc1129 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -125,9 +125,21 @@ private[spark] object UIUtils {
}
/** Returns an HTML table constructed by generating a row for each object in a sequence. */
- def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
-
- {headers.map(h => {h} | )}
+ def listingTable[T](
+ headers: Seq[String],
+ makeRow: T => Seq[Node],
+ rows: Seq[T],
+ fixedWidth: Boolean = false): Seq[Node] = {
+
+ val colWidth = 100.toDouble / headers.size
+ val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
+ var tableClass = "table table-bordered table-striped table-condensed sortable"
+ if (fixedWidth) {
+ tableClass += " table-fixed"
+ }
+
+
+ {headers.map(h => {h} | )}
{rows.map(r => makeRow(r))}
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index 97ea644021..f96419520f 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -22,7 +22,8 @@ import scala.util.Random
import spark.SparkContext
import spark.SparkContext._
import spark.scheduler.cluster.SchedulingMode
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+
+
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
@@ -48,9 +49,9 @@ private[spark] object UIWorkloadGenerator {
def setProperties(s: String) = {
if(schedulingMode == SchedulingMode.FAIR) {
- sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
+ sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s)
}
- sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
+ sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
diff --git a/core/src/main/scala/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
index dc39b91648..b3e28ce317 100644
--- a/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
@@ -19,18 +19,17 @@ package spark.ui.env
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.Handler
-
import scala.collection.JavaConversions._
import scala.util.Properties
+import scala.xml.Node
+
+import org.eclipse.jetty.server.Handler
import spark.ui.JettyUtils._
-import spark.ui.UIUtils.headerSparkPage
+import spark.ui.UIUtils
import spark.ui.Page.Environment
import spark.SparkContext
-import spark.ui.UIUtils
-import scala.xml.Node
private[spark] class EnvironmentUI(sc: SparkContext) {
@@ -46,20 +45,22 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
("Scala Home", Properties.scalaHome)
).sorted
def jvmRow(kv: (String, String)) = {kv._1} | {kv._2} |
- def jvmTable = UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation)
+ def jvmTable =
+ UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation, fixedWidth = true)
val properties = System.getProperties.iterator.toSeq
- val classPathProperty = properties
- .filter{case (k, v) => k.contains("java.class.path")}
- .headOption
- .getOrElse("", "")
+ val classPathProperty = properties.find { case (k, v) =>
+ k.contains("java.class.path")
+ }.getOrElse(("", ""))
val sparkProperties = properties.filter(_._1.startsWith("spark")).sorted
val otherProperties = properties.diff(sparkProperties :+ classPathProperty).sorted
val propertyHeaders = Seq("Name", "Value")
def propertyRow(kv: (String, String)) = {kv._1} | {kv._2} |
- val sparkPropertyTable = UIUtils.listingTable(propertyHeaders, propertyRow, sparkProperties)
- val otherPropertyTable = UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties)
+ val sparkPropertyTable =
+ UIUtils.listingTable(propertyHeaders, propertyRow, sparkProperties, fixedWidth = true)
+ val otherPropertyTable =
+ UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
val classPathEntries = classPathProperty._2
.split(System.getProperty("path.separator", ":"))
@@ -71,16 +72,23 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
val classPathHeaders = Seq("Resource", "Source")
def classPathRow(data: (String, String)) = {data._1} | {data._2} |
- val classPathTable = UIUtils.listingTable(classPathHeaders, classPathRow, classPath)
+ val classPathTable =
+ UIUtils.listingTable(classPathHeaders, classPathRow, classPath, fixedWidth = true)
val content =
Runtime Information {jvmTable}
- Spark Properties {sparkPropertyTable}
- System Properties {otherPropertyTable}
- Classpath Entries {classPathTable}
+
+ {sparkProperties.size} Spark Properties
+ {sparkPropertyTable}
+
+ {otherProperties.size} System Properties
+ {otherPropertyTable}
+
+ {classPath.size} Classpath Entries
+ {classPathTable}
- headerSparkPage(content, sc, "Environment", Environment)
+ UIUtils.headerSparkPage(content, sc, "Environment", Environment)
}
}
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 6ec48f70a4..f97860013e 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -1,25 +1,20 @@
package spark.ui.exec
-
import javax.servlet.http.HttpServletRequest
+import scala.collection.mutable.{HashMap, HashSet}
+import scala.xml.Node
+
import org.eclipse.jetty.server.Handler
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Properties
-
-import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
+import spark.{ExceptionFailure, Logging, Utils, SparkContext}
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
-import spark.scheduler._
-import spark.SparkContext
-import spark.storage.{StorageStatus, StorageUtils}
+import spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
import spark.ui.JettyUtils._
import spark.ui.Page.Executors
-import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
-import scala.xml.{Node, XML}
private[spark] class ExecutorsUI(val sc: SparkContext) {
@@ -38,32 +33,32 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
+ val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_+_)
+ val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
- def execRow(kv: Seq[String]) =
+
+ def execRow(kv: Seq[String]) = {
{kv(0)} |
{kv(1)} |
{kv(2)} |
- {Utils.memoryBytesToString(kv(3).toLong)} / {Utils.memoryBytesToString(kv(4).toLong)}
+ {Utils.bytesToString(kv(3).toLong)} / {Utils.bytesToString(kv(4).toLong)}
|
- {Utils.memoryBytesToString(kv(5).toLong)}
+ {Utils.bytesToString(kv(5).toLong)}
|
{kv(6)} |
{kv(7)} |
{kv(8)} |
{kv(9)} |
- val execInfo =
- for (b <- 0 until storageStatusList.size)
- yield getExecInfo(b)
+ }
+
+ val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
val content =
@@ -71,9 +66,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
- Memory:
- {Utils.memoryBytesToString(memUsed)} Used
- ({Utils.memoryBytesToString(maxMem)} Total)
- - Disk: {Utils.memoryBytesToString(diskSpaceUsed)} Used
+ {Utils.bytesToString(memUsed)} Used
+ ({Utils.bytesToString(maxMem)} Total)
+ - Disk: {Utils.bytesToString(diskSpaceUsed)} Used
@@ -83,7 +78,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
;
- headerSparkPage(content, sc, "Executors", Executors)
+ UIUtils.headerSparkPage(content, sc, execInfo.size + " Executors", Executors)
}
def getExecInfo(a: Int): Seq[String] = {
@@ -93,10 +88,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
- val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size)
- .getOrElse(0).toString
- val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
- val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
+ val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0)
+ val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0)
+ val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
Seq(
@@ -106,10 +100,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
memUsed,
maxMem,
diskUsed,
- activeTasks,
- failedTasks,
- completedTasks,
- totalTasks
+ activeTasks.toString,
+ failedTasks.toString,
+ completedTasks.toString,
+ totalTasks.toString
)
}
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 9724671a03..cda6addd22 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -24,7 +24,7 @@ import scala.xml.{NodeSeq, Node}
import spark.scheduler.cluster.SchedulingMode
import spark.ui.Page._
import spark.ui.UIUtils._
-import spark.Utils
+
/** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) {
@@ -46,7 +46,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
- val poolTable = new PoolTable(listener.sc.getAllPools, listener)
+ val pools = listener.sc.getAllPools
+ val poolTable = new PoolTable(pools, listener)
val summary: NodeSeq =
@@ -76,15 +77,15 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val content = summary ++
{if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
- Pools ++ poolTable.toNodeSeq
+
{pools.size} Fair Scheduler Pools ++ poolTable.toNodeSeq
} else {
Seq()
}} ++
- Active Stages: {activeStages.size} ++
+
{activeStages.size} Active Stages ++
activeStagesTable.toNodeSeq++
- Completed Stages: {completedStages.size} ++
+
{completedStages.size} Completed Stages ++
completedStagesTable.toNodeSeq++
- Failed Stages: {failedStages.size} ++
+
{failedStages.size} Failed Stages ++
failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
index 04ef35c800..e8f80ebfce 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -23,10 +23,11 @@ private[spark] class PoolPage(parent: JobProgressUI) {
val pool = listener.sc.getPoolForName(poolName).get
val poolTable = new PoolTable(Seq(pool), listener)
- val content = Pool ++ poolTable.toNodeSeq() ++
- Active Stages : {activeStages.size} ++ activeStagesTable.toNodeSeq()
+ val content = Summary ++ poolTable.toNodeSeq() ++
+
+ {activeStages.size} Active Stages ++ activeStagesTable.toNodeSeq()
- headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
+ headerSparkPage(content, parent.sc, "Fair Scheduler Pool: " + poolName, Jobs)
}
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
index 21ebcef63a..621828f9c3 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -1,8 +1,8 @@
package spark.ui.jobs
-import scala.xml.Node
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+import scala.xml.Node
import spark.scheduler.Stage
import spark.scheduler.cluster.Schedulable
@@ -21,14 +21,14 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
private def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
rows: Seq[Schedulable]
): Seq[Node] = {
-
+
Pool Name |
Minimum Share |
Pool Weight |
- Active Stages |
- Running Tasks |
- SchedulingMode |
+ Active Stages |
+ Running Tasks |
+ SchedulingMode |
{rows.map(r => makeRow(r, poolToActiveStages))}
@@ -36,7 +36,8 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
}
- private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
+ private def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]])
+ : Seq[Node] = {
val activeStages = poolToActiveStages.get(p.name) match {
case Some(stages) => stages.size
case None => 0
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 1b071a91e5..797513f266 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -46,11 +46,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
Summary Metrics No tasks have started yet
Tasks No tasks have started yet
- return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ return headerSparkPage(content, parent.sc, "Details for Stage %s".format(stageId), Jobs)
}
val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
+ val numCompleted = tasks.count(_._1.finished)
val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L)
val hasShuffleRead = shuffleReadBytes > 0
val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L)
@@ -69,25 +70,26 @@ private[spark] class StagePage(parent: JobProgressUI) {
{if (hasShuffleRead)
-
Shuffle read:
- {Utils.memoryBytesToString(shuffleReadBytes)}
+ {Utils.bytesToString(shuffleReadBytes)}
}
{if (hasShuffleWrite)
-
Shuffle write:
- {Utils.memoryBytesToString(shuffleWriteBytes)}
+ {Utils.bytesToString(shuffleWriteBytes)}
}
val taskHeaders: Seq[String] =
- Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
- {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
- Seq("Details")
+ Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
+ Seq("GC Time") ++
+ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
+ {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+ Seq("Errors")
- val taskTable = listingTable(taskHeaders, taskRow, tasks)
+ val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (t._2.isDefined))
@@ -103,7 +105,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
ms => parent.formatDuration(ms.toLong))
def getQuantileCols(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
+ Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
val shuffleReadSizes = validTasks.map {
case(info, metrics, exception) =>
@@ -121,21 +123,25 @@ private[spark] class StagePage(parent: JobProgressUI) {
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
- val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
+ val quantileHeaders = Seq("Metric", "Min", "25th percentile",
+ "Median", "75th percentile", "Max")
def quantileRow(data: Seq[String]): Seq[Node] = {data.map(d => {d} | )}
- Some(listingTable(quantileHeaders, quantileRow, listings))
+ Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val content =
- summary ++ Summary Metrics ++ summaryTable.getOrElse(Nil) ++
- Tasks ++ taskTable;
+ summary ++
+ Summary Metrics for {numCompleted} Completed Tasks ++
+ {summaryTable.getOrElse("No tasks have reported metrics yet.")} ++
+
Tasks ++ taskTable;
- headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
+ headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Jobs)
}
}
- def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
+ def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean)
+ (taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => {e.toString})
val (info, metrics, exception) = taskData
@@ -144,20 +150,28 @@ private[spark] class StagePage(parent: JobProgressUI) {
else metrics.map(m => m.executorRunTime).getOrElse(1)
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+ val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
{info.taskId} |
{info.status} |
-
- {formatDuration}
- |
{info.taskLocality} |
{info.hostPort} |
{dateFmt.format(new Date(info.launchTime))} |
- {metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
- {Utils.memoryBytesToString(s.remoteBytesRead)} | }.getOrElse("")}
- {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
- {Utils.memoryBytesToString(s.shuffleBytesWritten)} | }.getOrElse("")}
+
+ {formatDuration}
+ |
+
+ {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+ |
+ {if (shuffleRead) {
+ {metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
+ Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")} |
+ }}
+ {if (shuffleWrite) {
+ {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
+ Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")} |
+ }}
{exception.map(e =>
{e.className} ({e.description})
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index 5068a025fa..b31f4abc26 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -1,21 +1,14 @@
package spark.ui.jobs
import java.util.Date
-import java.text.SimpleDateFormat
-import javax.servlet.http.HttpServletRequest
-
-import scala.Some
-import scala.xml.{NodeSeq, Node}
-import scala.collection.mutable.HashMap
+import scala.xml.Node
import scala.collection.mutable.HashSet
+import spark.Utils
import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
-import spark.Utils
-import spark.storage.StorageLevel
+
/** Page showing list of all ongoing and recently finished stages */
private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
@@ -38,10 +31,10 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
{if (isFairScheduler) { | Pool Name | } else {}}
Description |
Submitted |
- Duration |
- Tasks: Succeeded/Total |
- Shuffle Read |
- Shuffle Write |
+ Duration |
+ Tasks: Succeeded/Total |
+ Shuffle Read |
+ Shuffle Write |
{rows.map(r => makeRow(r))}
@@ -49,13 +42,6 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
}
- private def getElapsedTime(submitted: Option[Long], completed: Long): String = {
- submitted match {
- case Some(t) => parent.formatDuration(completed - t)
- case _ => "Unknown"
- }
- }
-
private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
@@ -78,11 +64,11 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
- case b => Utils.memoryBytesToString(b)
+ case b => Utils.bytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
- case b => Utils.memoryBytesToString(b)
+ case b => Utils.bytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
@@ -98,6 +84,8 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val nameLink = {s.name}
val description = listener.stageToDescription.get(s)
.map(d => {d} {nameLink} ).getOrElse(nameLink)
+ val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
+ val duration = s.submissionTime.map(t => finishTime - t)
{s.id} |
@@ -106,8 +94,9 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
}
{description} |
{submissionTime} |
- {getElapsedTime(s.submissionTime,
- s.completionTime.getOrElse(System.currentTimeMillis()))} |
+
+ {duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")}
+ |
{makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
|
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
index f76192eba8..0751f9e8f9 100644
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -58,8 +58,8 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
{rdd.numCachedPartitions} |
{rdd.numCachedPartitions / rdd.numPartitions.toDouble} |
- {Utils.memoryBytesToString(rdd.memSize)} |
- {Utils.memoryBytesToString(rdd.diskSize)} |
+ {Utils.bytesToString(rdd.memSize)} |
+ {Utils.bytesToString(rdd.diskSize)} |
}
}
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
index 4c3ee12c98..f0b711e6ec 100644
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -21,12 +21,13 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import spark.storage.{StorageStatus, StorageUtils}
-import spark.ui.UIUtils._
import spark.Utils
+import spark.storage.{StorageStatus, StorageUtils}
import spark.storage.BlockManagerMasterActor.BlockStatus
+import spark.ui.UIUtils._
import spark.ui.Page._
+
/** Page showing storage details for a given RDD */
private[spark] class RDDPage(parent: BlockManagerUI) {
val sc = parent.sc
@@ -44,7 +45,7 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
val workerTable = listingTable(workerHeaders, workerRow, workers)
val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk",
- "Locations")
+ "Executors")
val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
@@ -71,11 +72,11 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
-
Memory Size:
- {Utils.memoryBytesToString(rddInfo.memSize)}
+ {Utils.bytesToString(rddInfo.memSize)}
-
Disk Size:
- {Utils.memoryBytesToString(rddInfo.diskSize)}
+ {Utils.bytesToString(rddInfo.diskSize)}
@@ -83,19 +84,19 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
- Data Distribution Summary
+ Data Distribution on {workers.size} Executors
{workerTable}
- Partitions
+ {blocks.size} Partitions
{blockTable}
;
- headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Storage)
+ headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage)
}
def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
@@ -106,10 +107,10 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
{block.storageLevel.description}
- {Utils.memoryBytesToString(block.memSize)}
+ {Utils.bytesToString(block.memSize)}
|
- {Utils.memoryBytesToString(block.diskSize)}
+ {Utils.bytesToString(block.diskSize)}
|
{locations.map(l => {l} )}
@@ -122,10 +123,10 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
|
{status.blockManagerId.host + ":" + status.blockManagerId.port} |
- {Utils.memoryBytesToString(status.memUsed(prefix))}
- ({Utils.memoryBytesToString(status.memRemaining)} Total Available)
+ {Utils.bytesToString(status.memUsed(prefix))}
+ ({Utils.bytesToString(status.memRemaining)} Remaining)
|
- {Utils.memoryBytesToString(status.diskUsed(prefix))} |
+ {Utils.bytesToString(status.diskUsed(prefix))} |
}
}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5e2bf2d231..c337c49268 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import com.google.common.base.Optional;
import scala.Tuple2;
import com.google.common.base.Charsets;
@@ -197,6 +198,35 @@ public class JavaAPISuite implements Serializable {
cogrouped.collect();
}
+ @Test
+ public void leftOuterJoin() {
+ JavaPairRDD rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2(1, 1),
+ new Tuple2(1, 2),
+ new Tuple2(2, 1),
+ new Tuple2(3, 1)
+ ));
+ JavaPairRDD rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2(1, 'x'),
+ new Tuple2(2, 'y'),
+ new Tuple2(2, 'z'),
+ new Tuple2(4, 'w')
+ ));
+ List>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(
+ new Function>>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2>> tup)
+ throws Exception {
+ return !tup._2()._2().isPresent();
+ }
+ }).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
@Test
public void foldReduce() {
JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
@@ -718,7 +748,7 @@ public class JavaAPISuite implements Serializable {
}
};
- JavaRDD sizes = rdd1.zipPartitions(sizesFn, rdd2);
+ JavaRDD sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 752e4b85e6..c686b8cc5a 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -17,17 +17,8 @@
package spark
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
-
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-
-import com.google.common.io.Files
import spark.rdd.ShuffledRDD
import spark.SparkContext._
@@ -59,8 +50,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS),
- classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS))
+ .setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 10)
@@ -81,7 +72,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD(b, new HashPartitioner(3))
+ .setSerializer(classOf[spark.KryoSerializer].getName)
assert(c.count === 10)
}
@@ -96,7 +88,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
- val c = new ShuffledRDD(b, new HashPartitioner(10), classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD(b, new HashPartitioner(10))
+ .setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 4)
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index 31c3b25c50..98a6c1a1c9 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -26,14 +26,14 @@ import scala.util.Random
class UtilsSuite extends FunSuite {
- test("memoryBytesToString") {
- assert(Utils.memoryBytesToString(10) === "10.0 B")
- assert(Utils.memoryBytesToString(1500) === "1500.0 B")
- assert(Utils.memoryBytesToString(2000000) === "1953.1 KB")
- assert(Utils.memoryBytesToString(2097152) === "2.0 MB")
- assert(Utils.memoryBytesToString(2306867) === "2.2 MB")
- assert(Utils.memoryBytesToString(5368709120L) === "5.0 GB")
- assert(Utils.memoryBytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
+ test("bytesToString") {
+ assert(Utils.bytesToString(10) === "10.0 B")
+ assert(Utils.bytesToString(1500) === "1500.0 B")
+ assert(Utils.bytesToString(2000000) === "1953.1 KB")
+ assert(Utils.bytesToString(2097152) === "2.0 MB")
+ assert(Utils.bytesToString(2306867) === "2.2 MB")
+ assert(Utils.bytesToString(5368709120L) === "5.0 GB")
+ assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
}
test("copyStream") {
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
index 5e6d7b09d8..bb5d379273 100644
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
@@ -40,7 +40,7 @@ class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
- val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
+ val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index 87cd2ffad2..b0213b62d9 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -1,12 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics
-import java.util.Properties
-import java.io.{File, FileOutputStream}
-
import org.scalatest.{BeforeAndAfter, FunSuite}
-import spark.metrics._
-
class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
@@ -18,11 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
- assert(conf.properties.size() === 0)
+ assert(conf.properties.size() === 5)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
- assert(property.size() === 0)
+ assert(property.size() === 3)
+ assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(property.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with properties set") {
@@ -30,16 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 3)
+ assert(masterProp.size() === 6)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
+ assert(masterProp.getProperty("sink.servlet.sample") === "false")
val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 3)
+ assert(workerProp.size() === 6)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
- assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(workerProp.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with subProperties") {
@@ -47,7 +68,7 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val propCategories = conf.propertyCategories
- assert(propCategories.size === 2)
+ assert(propCategories.size === 3)
val masterProp = conf.getInstance("master")
val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
@@ -55,10 +76,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
- assert(sinkProps.size === 1)
+ assert(sinkProps.size === 2)
assert(sinkProps.contains("console"))
+ assert(sinkProps.contains("servlet"))
val consoleProps = sinkProps("console")
assert(consoleProps.size() === 2)
+
+ val servletProps = sinkProps("servlet")
+ assert(servletProps.size() === 3)
}
}
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
index c189996417..dc65ac6994 100644
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -1,12 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics
-import java.util.Properties
-import java.io.{File, FileOutputStream}
-
import org.scalatest.{BeforeAndAfter, FunSuite}
-import spark.metrics._
-
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
@@ -22,6 +34,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
assert(sources.length === 0)
assert(sinks.length === 0)
+ assert(!metricsSystem.getServletHandlers.isEmpty)
}
test("MetricsSystem with sources add") {
@@ -31,6 +44,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
assert(sources.length === 0)
assert(sinks.length === 1)
+ assert(!metricsSystem.getServletHandlers.isEmpty)
val source = new spark.deploy.master.MasterSource(null)
metricsSystem.registerSource(source)
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
index 66fd59e8bb..a79b8bf256 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
@@ -57,23 +57,23 @@ object TaskThreadInfo {
* 1. each thread contains one job.
* 2. each job contains one stage.
* 3. each stage only contains one task.
- * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
- * it will get cpu core resource, and will wait to finished after user manually
- * release "Lock" and then cluster will contain another free cpu cores.
- * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
+ * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
+ * it will get cpu core resource, and will wait to finished after user manually
+ * release "Lock" and then cluster will contain another free cpu cores.
+ * 5. each task(pending) must use "sleep" to make sure it has been added to taskSetManager queue,
* thus it will be scheduled later when cluster has free cpu cores.
*/
class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
-
+
TaskThreadInfo.threadToRunning(threadIndex) = false
val nums = sc.parallelize(threadIndex to threadIndex, 1)
TaskThreadInfo.threadToLock(threadIndex) = new Lock()
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
- sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
+ sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {
@@ -88,7 +88,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
}
}.start()
}
-
+
test("Local FIFO scheduler end-to-end test") {
System.setProperty("spark.cluster.schedulingmode", "FIFO")
sc = new SparkContext("local[4]", "test")
@@ -103,8 +103,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
createThread(4,null,sc,sem)
TaskThreadInfo.threadToStarted(4).await()
// thread 5 and 6 (stage pending)must meet following two points
- // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
- // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
+ // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
+ // queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
// 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
// So I just use "sleep" 1s here for each thread.
// TODO: any better solution?
@@ -112,24 +112,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
Thread.sleep(1000)
createThread(6,null,sc,sem)
Thread.sleep(1000)
-
+
assert(TaskThreadInfo.threadToRunning(1) === true)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === false)
assert(TaskThreadInfo.threadToRunning(6) === false)
-
+
TaskThreadInfo.threadToLock(1).jobFinished()
TaskThreadInfo.threadToStarted(5).await()
-
+
assert(TaskThreadInfo.threadToRunning(1) === false)
assert(TaskThreadInfo.threadToRunning(2) === true)
assert(TaskThreadInfo.threadToRunning(3) === true)
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === false)
-
+
TaskThreadInfo.threadToLock(3).jobFinished()
TaskThreadInfo.threadToStarted(6).await()
@@ -139,7 +139,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(4) === true)
assert(TaskThreadInfo.threadToRunning(5) === true)
assert(TaskThreadInfo.threadToRunning(6) === true)
-
+
TaskThreadInfo.threadToLock(2).jobFinished()
TaskThreadInfo.threadToLock(4).jobFinished()
TaskThreadInfo.threadToLock(5).jobFinished()
@@ -160,18 +160,18 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(20).await()
createThread(30,"3",sc,sem)
TaskThreadInfo.threadToStarted(30).await()
-
+
assert(TaskThreadInfo.threadToRunning(10) === true)
assert(TaskThreadInfo.threadToRunning(20) === true)
assert(TaskThreadInfo.threadToRunning(30) === true)
-
+
createThread(11,"1",sc,sem)
TaskThreadInfo.threadToStarted(11).await()
createThread(21,"2",sc,sem)
TaskThreadInfo.threadToStarted(21).await()
createThread(31,"3",sc,sem)
TaskThreadInfo.threadToStarted(31).await()
-
+
assert(TaskThreadInfo.threadToRunning(11) === true)
assert(TaskThreadInfo.threadToRunning(21) === true)
assert(TaskThreadInfo.threadToRunning(31) === true)
@@ -185,19 +185,19 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
assert(TaskThreadInfo.threadToRunning(12) === true)
assert(TaskThreadInfo.threadToRunning(22) === true)
assert(TaskThreadInfo.threadToRunning(32) === false)
-
+
TaskThreadInfo.threadToLock(10).jobFinished()
TaskThreadInfo.threadToStarted(32).await()
-
+
assert(TaskThreadInfo.threadToRunning(32) === true)
- //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
+ //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
// queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
//2. priority of 23 and 33 will be meaningless as using fair scheduler here.
createThread(23,"2",sc,sem)
createThread(33,"3",sc,sem)
Thread.sleep(1000)
-
+
TaskThreadInfo.threadToLock(11).jobFinished()
TaskThreadInfo.threadToStarted(23).await()
@@ -206,7 +206,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(12).jobFinished()
TaskThreadInfo.threadToStarted(33).await()
-
+
assert(TaskThreadInfo.threadToRunning(33) === true)
TaskThreadInfo.threadToLock(20).jobFinished()
@@ -217,7 +217,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToLock(31).jobFinished()
TaskThreadInfo.threadToLock(32).jobFinished()
TaskThreadInfo.threadToLock(33).jobFinished()
-
- sem.acquire(11)
+
+ sem.acquire(11)
}
}
diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
index 16bd2c6b38..bd87c528c3 100644
--- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -67,7 +67,12 @@ object LogisticRegressionSuite {
}
class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
- val sc = new SparkContext("local", "test")
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
override def afterAll() {
sc.stop()
diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
index 9e0970812d..04f631d80f 100644
--- a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala
@@ -62,7 +62,11 @@ object SVMSuite {
}
class SVMSuite extends FunSuite with BeforeAndAfterAll {
- val sc = new SparkContext("local", "test")
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
override def afterAll() {
sc.stop()
diff --git a/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala
index bebade9afb..d5d95c8639 100644
--- a/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/clustering/KMeansSuite.scala
@@ -27,9 +27,12 @@ import spark.SparkContext._
import org.jblas._
-
class KMeansSuite extends FunSuite with BeforeAndAfterAll {
- val sc = new SparkContext("local", "test")
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
override def afterAll() {
sc.stop()
diff --git a/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala
index 3a556fdc29..15a60efda6 100644
--- a/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/recommendation/ALSSuite.scala
@@ -66,7 +66,11 @@ object ALSSuite {
class ALSSuite extends FunSuite with BeforeAndAfterAll {
- val sc = new SparkContext("local", "test")
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
override def afterAll() {
sc.stop()
diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
index b9ada2b1ec..55a738f1e4 100644
--- a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala
@@ -57,7 +57,12 @@ object LassoSuite {
}
class LassoSuite extends FunSuite with BeforeAndAfterAll {
- val sc = new SparkContext("local", "test")
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
override def afterAll() {
sc.stop()
diff --git a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
index 4c4900658f..e2b244894d 100644
--- a/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -27,7 +27,11 @@ import spark.SparkContext._
class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
- val sc = new SparkContext("local", "test")
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
override def afterAll() {
sc.stop()
diff --git a/pom.xml b/pom.xml
index 1d0cb6a2f9..1811c62b55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
1.5
2.9.3
- 0.9.0-incubating
+ 0.12.1
2.0.3
1.7.2
4.1.2
@@ -254,11 +254,6 @@
10.4.2.0
test
-
- net.liftweb
- lift-json_2.9.2
- 2.5
-
com.codahale.metrics
metrics-core
@@ -269,6 +264,11 @@
metrics-jvm
3.0.0
+
+ com.codahale.metrics
+ metrics-json
+ 3.0.0
+
org.scala-lang
scala-compiler
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f860925650..852f40d3fd 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -180,12 +180,12 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
- "net.liftweb" % "lift-json_2.9.2" % "2.5",
- "org.apache.mesos" % "mesos" % "0.9.0-incubating",
+ "org.apache.mesos" % "mesos" % "0.12.1",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
+ "com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
) ++ (
diff --git a/python/examples/pagerank.py b/python/examples/pagerank.py
new file mode 100755
index 0000000000..cd774cf3a3
--- /dev/null
+++ b/python/examples/pagerank.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#!/usr/bin/env python
+
+import re, sys
+from operator import add
+
+from pyspark import SparkContext
+
+
+def computeContribs(urls, rank):
+ """Calculates URL contributions to the rank of other URLs."""
+ num_urls = len(urls)
+ for url in urls: yield (url, rank / num_urls)
+
+
+def parseNeighbors(urls):
+ """Parses a urls pair string into urls pair."""
+ parts = re.split(r'\s+', urls)
+ return parts[0], parts[1]
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 3:
+ print >> sys.stderr, "Usage: pagerank "
+ exit(-1)
+
+ # Initialize the spark context.
+ sc = SparkContext(sys.argv[1], "PythonPageRank")
+
+ # Loads in input file. It should be in format of:
+ # URL neighbor URL
+ # URL neighbor URL
+ # URL neighbor URL
+ # ...
+ lines = sc.textFile(sys.argv[2], 1)
+
+ # Loads all URLs from input file and initialize their neighbors.
+ links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
+
+ # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
+ ranks = links.map(lambda (url, neighbors): (url, 1.0))
+
+ # Calculates and updates URL ranks continuously using PageRank algorithm.
+ for iteration in xrange(int(sys.argv[3])):
+ # Calculates URL contributions to the rank of other URLs.
+ contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
+ computeContribs(urls, rank))
+
+ # Re-calculates URL ranks based on neighbor contributions.
+ ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
+
+ # Collects all URL ranks and dump them to console.
+ for (link, rank) in ranks.collect():
+ print "%s has rank: %s." % (link, rank)
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index cc8cd9e3c4..9b4b4e78cb 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -24,10 +24,15 @@ import os
import pyspark
from pyspark.context import SparkContext
+# this is the equivalent of ADD_JARS
+add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None
-sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell")
+sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files)
print "Spark context avaiable as sc."
+if add_files != None:
+ print "Adding files: [%s]" % ", ".join(add_files)
+
# The ./pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP,
# which allows us to execute the user's PYTHONSTARTUP file:
_pythonstartup = os.environ.get('OLD_PYTHONSTARTUP')
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index dfd841b10a..f75215a781 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -64,7 +64,7 @@ class TestCheckpoint(PySparkTestCase):
flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
self.assertFalse(flatMappedRDD.isCheckpointed())
- self.assertIsNone(flatMappedRDD.getCheckpointFile())
+ self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
flatMappedRDD.checkpoint()
result = flatMappedRDD.collect()
@@ -79,13 +79,13 @@ class TestCheckpoint(PySparkTestCase):
flatMappedRDD = parCollection.flatMap(lambda x: [x])
self.assertFalse(flatMappedRDD.isCheckpointed())
- self.assertIsNone(flatMappedRDD.getCheckpointFile())
+ self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
flatMappedRDD.checkpoint()
flatMappedRDD.count() # forces a checkpoint to be computed
time.sleep(1) # 1 second
- self.assertIsNotNone(flatMappedRDD.getCheckpointFile())
+ self.assertTrue(flatMappedRDD.getCheckpointFile() is not None)
recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile())
self.assertEquals([1, 2, 3, 4], recovered.collect())
@@ -164,9 +164,12 @@ class TestDaemon(unittest.TestCase):
time.sleep(1)
# daemon should no longer accept connections
- with self.assertRaises(EnvironmentError) as trap:
+ try:
self.connect(port)
- self.assertEqual(trap.exception.errno, ECONNREFUSED)
+ except EnvironmentError as exception:
+ self.assertEqual(exception.errno, ECONNREFUSED)
+ else:
+ self.fail("Expected EnvironmentError to be raised")
def test_termination_stdin(self):
"""Ensure that daemon and workers terminate when stdin is closed."""
diff --git a/python/run-tests b/python/run-tests
index 6643faa2e0..cbc554ea9d 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -26,20 +26,18 @@ cd "$FWDIR/python"
FAILED=0
-$FWDIR/pyspark pyspark/rdd.py
-FAILED=$(($?||$FAILED))
+rm -f unit-tests.log
-$FWDIR/pyspark pyspark/context.py
-FAILED=$(($?||$FAILED))
+function run_test() {
+ $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log
+ FAILED=$((PIPESTATUS[0]||$FAILED))
+}
-$FWDIR/pyspark -m doctest pyspark/broadcast.py
-FAILED=$(($?||$FAILED))
-
-$FWDIR/pyspark -m doctest pyspark/accumulators.py
-FAILED=$(($?||$FAILED))
-
-$FWDIR/pyspark -m unittest pyspark.tests
-FAILED=$(($?||$FAILED))
+run_test "pyspark/rdd.py"
+run_test "pyspark/context.py"
+run_test "-m doctest pyspark/broadcast.py"
+run_test "-m doctest pyspark/accumulators.py"
+run_test "pyspark/tests.py"
if [[ $FAILED != 0 ]]; then
echo -en "\033[31m" # Red
diff --git a/repl/pom.xml b/repl/pom.xml
index 7d8da03254..862595b9f9 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -73,6 +73,35 @@
target/scala-${scala.version}/classes
target/scala-${scala.version}/test-classes
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ test
+
+ run
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
org.scalatest
scalatest-maven-plugin
@@ -80,6 +109,7 @@
${basedir}/..
1
+ ${spark.classpath}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ccd15563b0..ea08fb3826 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -29,7 +29,7 @@ import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.{JavaRDD, JavaPairRDD}
+import spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
import spark.RDD
@@ -401,10 +401,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
(Seq[V], Option[S]) => Option[S] = {
val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
val list: JList[V] = values
- val scalaState: Optional[S] = state match {
- case Some(s) => Optional.of(s)
- case _ => Optional.absent()
- }
+ val scalaState: Optional[S] = JavaUtils.optionToOptional(state)
val result: Optional[S] = in.apply(list, scalaState)
result.isPresent match {
case true => Some(result.get())
diff --git a/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
index 3a55f50812..30fded12f0 100644
--- a/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
@@ -121,7 +121,7 @@ object JavaAPICompletenessChecker {
SparkMethod(name, returnType, parameters)
}
- private def toJavaType(scalaType: SparkType): SparkType = {
+ private def toJavaType(scalaType: SparkType, isReturnType: Boolean): SparkType = {
val renameSubstitutions = Map(
"scala.collection.Map" -> "java.util.Map",
// TODO: the JavaStreamingContext API accepts Array arguments
@@ -140,40 +140,43 @@ object JavaAPICompletenessChecker {
case "spark.RDD" =>
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
- parameters(0).asInstanceOf[ParameterizedType].parameters.map(toJavaType)
+ parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
ParameterizedType(classOf[JavaPairRDD[_, _]].getName, tupleParams)
} else {
- ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(toJavaType))
+ ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
}
case "spark.streaming.DStream" =>
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
- parameters(0).asInstanceOf[ParameterizedType].parameters.map(toJavaType)
+ parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
ParameterizedType("spark.streaming.api.java.JavaPairDStream", tupleParams)
} else {
ParameterizedType("spark.streaming.api.java.JavaDStream",
- parameters.map(toJavaType))
+ parameters.map(applySubs))
}
- // TODO: Spark Streaming uses Guava's Optional in place of Option, leading to some
- // false-positives here:
- case "scala.Option" =>
- toJavaType(parameters(0))
+ case "scala.Option" => {
+ if (isReturnType) {
+ ParameterizedType("com.google.common.base.Optional", parameters.map(applySubs))
+ } else {
+ applySubs(parameters(0))
+ }
+ }
case "scala.Function1" =>
val firstParamName = parameters.last.name
if (firstParamName.startsWith("scala.collection.Traversable") ||
firstParamName.startsWith("scala.collection.Iterator")) {
ParameterizedType("spark.api.java.function.FlatMapFunction",
Seq(parameters(0),
- parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(toJavaType))
+ parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(applySubs))
} else if (firstParamName == "scala.runtime.BoxedUnit") {
ParameterizedType("spark.api.java.function.VoidFunction",
- parameters.dropRight(1).map(toJavaType))
+ parameters.dropRight(1).map(applySubs))
} else {
- ParameterizedType("spark.api.java.function.Function", parameters.map(toJavaType))
+ ParameterizedType("spark.api.java.function.Function", parameters.map(applySubs))
}
case _ =>
ParameterizedType(renameSubstitutions.getOrElse(name, name),
- parameters.map(toJavaType))
+ parameters.map(applySubs))
}
case BaseType(name) =>
if (renameSubstitutions.contains(name)) {
@@ -194,8 +197,9 @@ object JavaAPICompletenessChecker {
private def toJavaMethod(method: SparkMethod): SparkMethod = {
val params = method.parameters
- .filterNot(_.name == "scala.reflect.ClassManifest").map(toJavaType)
- SparkMethod(method.name, toJavaType(method.returnType), params)
+ .filterNot(_.name == "scala.reflect.ClassManifest")
+ .map(toJavaType(_, isReturnType = false))
+ SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params)
}
private def isExcludedByName(method: Method): Boolean = {
|