From 48e4f2ad141492d7dee579a1b7fb1ec49fefa2ae Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Mon, 9 Dec 2013 00:02:59 +0800 Subject: [PATCH 01/23] SPARK-968, In stage UI, add an overview section that shows task stats grouped by executor id --- .../spark/ui/jobs/ExecutorSummary.scala | 27 ++++++ .../apache/spark/ui/jobs/ExecutorTable.scala | 73 +++++++++++++++ .../org/apache/spark/ui/jobs/IndexPage.scala | 7 ++ .../spark/ui/jobs/JobProgressListener.scala | 38 ++++++++ .../ui/jobs/JobProgressListenerSuite.scala | 89 +++++++++++++++++++ 5 files changed, 234 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala create mode 100644 core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala new file mode 100644 index 0000000000..f2ee12081c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +private[spark] class ExecutorSummary() { + var duration : Long = 0 + var totalTasks : Int = 0 + var failedTasks : Int = 0 + var succeedTasks : Int = 0 + var shuffleRead : Long = 0 + var shuffleWrite : Long = 0 +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala new file mode 100644 index 0000000000..c6823cd823 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + + +import scala.xml.Node + +import org.apache.spark.scheduler.SchedulingMode + + +/** Page showing executor summary */ +private[spark] class ExecutorTable(val parent: JobProgressUI) { + + val listener = parent.listener + val dateFmt = parent.dateFmt + val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR + + def toNodeSeq(): Seq[Node] = { + listener.synchronized { + executorTable() + } + } + + /** Special table which merges two header cells. */ + private def executorTable[T](): Seq[Node] = { + + + + + + + + + + + + {createExecutorTable()} + +
Executor IDDuration#Tasks#Failed Tasks#Succeed TasksShuffle ReadShuffle Write
+ } + + private def createExecutorTable() : Seq[Node] = { + val executorIdToSummary = listener.executorIdToSummary + executorIdToSummary.toSeq.sortBy(_._1).map{ + case (k,v) => { + + {k} + {v.duration} ms + {v.totalTasks} + {v.failedTasks} + {v.succeedTasks} + {v.shuffleRead} + {v.shuffleWrite} + + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index ca5a28625b..653a84b60f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -45,6 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) + val executorTable = new ExecutorTable(parent) val pools = listener.sc.getAllPools val poolTable = new PoolTable(pools, listener) @@ -56,6 +57,10 @@ private[spark] class IndexPage(parent: JobProgressUI) { {parent.formatDuration(now - listener.sc.startTime)}
  • Scheduling Mode: {parent.sc.getSchedulingMode}
  • +
  • + Executor Summary: + {listener.executorIdToSummary.size} +
  • Active Stages: {activeStages.size} @@ -77,6 +82,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { } else { Seq() }} ++ +

    Executor Summary

    ++ + executorTable.toNodeSeq++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq++

    Completed Stages ({completedStages.size})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 6b854740d6..2635478592 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTasksFailed = HashMap[Int, Int]() val stageIdToTaskInfos = HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + val executorIdToSummary = HashMap[String, ExecutorSummary]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -114,6 +115,9 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) stageIdToTaskInfos(sid) = taskList + val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId, + op = new ExecutorSummary()) + executorSummary.totalTasks += 1 } override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) @@ -123,9 +127,43 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + // update executor summary + val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId) + executorSummary match { + case Some(x) => { + // first update failed-task, succeed-task + taskEnd.reason match { + case e: ExceptionFailure => + x.failedTasks += 1 + case _ => + x.succeedTasks += 1 + } + + // update duration + x.duration += taskEnd.taskInfo.duration + + // update shuffle read/write + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + x.shuffleRead += s.remoteBytesRead + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + x.shuffleWrite += s.shuffleBytesWritten + } + case _ => {} + } + } + case _ => {} + } + val sid = taskEnd.task.stageId val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { case e: ExceptionFailure => diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala new file mode 100644 index 0000000000..90a58978c7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import org.scalatest.FunSuite +import org.apache.spark.scheduler._ +import org.apache.spark.SparkContext +import org.apache.spark.Success +import org.apache.spark.scheduler.SparkListenerTaskStart +import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} + +class JobProgressListenerSuite extends FunSuite { + test("test executor id to summary") { + val sc = new SparkContext("local", "joblogger") + val listener = new JobProgressListener(sc) + val taskMetrics = new TaskMetrics() + val shuffleReadMetrics = new ShuffleReadMetrics() + + // nothing in it + assert(listener.executorIdToSummary.size == 0) + + // launched a task, should get an item in map + listener.onTaskStart(new SparkListenerTaskStart( + new ShuffleMapTask(0, null, null, 0, null), + new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) + assert(listener.executorIdToSummary.size == 1) + + // finish this task, should get updated shuffleRead + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 1000) + + // finish a task with unknown executor-id, nothing should happen + taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.size == 1) + + // launched a task + listener.onTaskStart(new SparkListenerTaskStart( + new ShuffleMapTask(0, null, null, 0, null), + new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) + assert(listener.executorIdToSummary.size == 1) + + // finish this task, should get updated duration + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 2000) + + // launched a task in another exec + listener.onTaskStart(new SparkListenerTaskStart( + new ShuffleMapTask(0, null, null, 0, null), + new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL))) + assert(listener.executorIdToSummary.size == 2) + + // finish this task, should get updated duration + shuffleReadMetrics.remoteBytesRead = 1000 + taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL) + taskInfo.finishTime = 1 + listener.onTaskEnd(new SparkListenerTaskEnd( + new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) + assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000) + } +} From ee68a85cff499c7aa5d448cc72a93e4de3c23c41 Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Mon, 9 Dec 2013 09:38:58 +0800 Subject: [PATCH 02/23] SPARK-968, added sc finalize code to avoid akka rebinding to the same port --- .../apache/spark/ui/jobs/JobProgressListenerSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 90a58978c7..861d37a862 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -85,5 +85,12 @@ class JobProgressListenerSuite extends FunSuite { listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000) + + // do finalize + sc.stop() + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } } From f8ba89da217a1f1fd5c856a95a27a3e535017643 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 15 Dec 2013 18:39:30 -0800 Subject: [PATCH 03/23] Fix Cygwin support in several scripts. This allows the spark-shell, spark-class, run-example, make-distribution.sh, and ./bin/start-* scripts to work under Cygwin. Note that this doesn't support PySpark under Cygwin, since that requires many additional `cygpath` calls from within Python and will be non-trivial to implement. This PR was inspired by, and subsumes, #253 (so close #253 after this is merged). --- run-example | 10 ++++++++++ sbt/sbt | 21 ++++++++++++++++++--- spark-class | 10 ++++++++++ spark-shell | 19 +++++++++++++++++-- 4 files changed, 55 insertions(+), 5 deletions(-) diff --git a/run-example b/run-example index feade6589a..a78192d31d 100755 --- a/run-example +++ b/run-example @@ -17,6 +17,11 @@ # limitations under the License. # +cygwin=false +case "`uname`" in + CYGWIN*) cygwin=true;; +esac + SCALA_VERSION=2.10 # Figure out where the Scala framework is installed @@ -59,6 +64,11 @@ fi CLASSPATH=`$FWDIR/bin/compute-classpath.sh` CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH" +if $cygwin; then + CLASSPATH=`cygpath -wp $CLASSPATH` + export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR` +fi + # Find java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" diff --git a/sbt/sbt b/sbt/sbt index c31a0280ff..5942280585 100755 --- a/sbt/sbt +++ b/sbt/sbt @@ -17,12 +17,27 @@ # limitations under the License. # -EXTRA_ARGS="" +cygwin=false +case "`uname`" in + CYGWIN*) cygwin=true;; +esac + +EXTRA_ARGS="-Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m" if [ "$MESOS_HOME" != "" ]; then - EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java" + EXTRA_ARGS="$EXTRA_ARGS -Djava.library.path=$MESOS_HOME/lib/java" fi export SPARK_HOME=$(cd "$(dirname $0)/.." 2>&1 >/dev/null ; pwd) export SPARK_TESTING=1 # To put test classes on classpath -java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m $EXTRA_ARGS $SBT_OPTS -jar "$SPARK_HOME"/sbt/sbt-launch-*.jar "$@" +SBT_JAR="$SPARK_HOME"/sbt/sbt-launch-*.jar +if $cygwin; then + SBT_JAR=`cygpath -w $SBT_JAR` + export SPARK_HOME=`cygpath -w $SPARK_HOME` + EXTRA_ARGS="$EXTRA_ARGS -Djline.terminal=jline.UnixTerminal -Dsbt.cygwin=true" + stty -icanon min 1 -echo > /dev/null 2>&1 + java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@" + stty icanon echo > /dev/null 2>&1 +else + java $EXTRA_ARGS $SBT_OPTS -jar $SBT_JAR "$@" +fi \ No newline at end of file diff --git a/spark-class b/spark-class index 4fa6fb864e..4eb95a9ba2 100755 --- a/spark-class +++ b/spark-class @@ -17,6 +17,11 @@ # limitations under the License. # +cygwin=false +case "`uname`" in + CYGWIN*) cygwin=true;; +esac + SCALA_VERSION=2.10 # Figure out where the Scala framework is installed @@ -125,6 +130,11 @@ fi # Compute classpath using external script CLASSPATH=`$FWDIR/bin/compute-classpath.sh` CLASSPATH="$SPARK_TOOLS_JAR:$CLASSPATH" + +if $cygwin; then + CLASSPATH=`cygpath -wp $CLASSPATH` + export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR` +fi export CLASSPATH if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then diff --git a/spark-shell b/spark-shell index 9608bd3f30..d20af0fb39 100755 --- a/spark-shell +++ b/spark-shell @@ -23,7 +23,11 @@ # if those two env vars are set in spark-env.sh but MASTER is not. # Options: # -c Set the number of cores for REPL to use -# + +cygwin=false +case "`uname`" in + CYGWIN*) cygwin=true;; +esac # Enter posix mode for bash set -o posix @@ -79,7 +83,18 @@ if [[ ! $? ]]; then saved_stty="" fi -$FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@" +if $cygwin; then + # Workaround for issue involving JLine and Cygwin + # (see http://sourceforge.net/p/jline/bugs/40/). + # If you're using the Mintty terminal emulator in Cygwin, may need to set the + # "Backspace sends ^H" setting in "Keys" section of the Mintty options + # (see https://github.com/sbt/sbt/issues/562). + stty -icanon min 1 -echo > /dev/null 2>&1 + $FWDIR/spark-class -Djline.terminal=unix $OPTIONS org.apache.spark.repl.Main "$@" + stty icanon echo > /dev/null 2>&1 +else + $FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@" +fi # record the exit status lest it be overwritten: # then reenable echo and propagate the code. From 8a397a959bf0b68f7d10fa57665225e0c2b5d03a Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 16 Dec 2013 12:07:39 -0500 Subject: [PATCH 04/23] Track task value serialisation time in TaskMetrics. --- .../org/apache/spark/executor/Executor.scala | 12 +++++++---- .../apache/spark/executor/TaskMetrics.scala | 5 +++++ .../apache/spark/scheduler/TaskResult.scala | 20 +++++++++---------- .../cluster/ClusterTaskSetManagerSuite.scala | 4 +++- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0b0a60ee60..02ad64d070 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -222,18 +222,22 @@ private[spark] class Executor( return } + val objectSer = SparkEnv.get.serializer.newInstance() + val beforeSerialization = System.currentTimeMillis() + val valueBytes = objectSer.serialize(value) + val afterSerialization = System.currentTimeMillis() + for (m <- task.metrics) { m.hostname = Utils.localHostName() m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime + m.serializationTime = (afterSerialization - beforeSerialization).toInt } - // TODO I'd also like to track the time it takes to serialize the task results, but that is - // huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a - // custom serialized format, we could just change the relevants bytes in the byte buffer + val accumUpdates = Accumulators.values - val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null)) + val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index c0ce46e379..c036866afd 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -43,6 +43,11 @@ class TaskMetrics extends Serializable { */ var jvmGCTime: Long = _ + /** + * Amount of time spent serializing the result of the task + */ + var serializationTime: Long = _ + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 7e468d0d67..4e00bc8271 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] -class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) +class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) extends TaskResult[T] with Externalizable { - def this() = this(null.asInstanceOf[T], null, null) + def this() = this(null.asInstanceOf[ByteBuffer], null, null) override def writeExternal(out: ObjectOutput) { - val objectSer = SparkEnv.get.serializer.newInstance() - val bb = objectSer.serialize(value) - - out.writeInt(bb.remaining()) - Utils.writeByteBuffer(bb, out) + out.writeInt(valueBytes.remaining); + Utils.writeByteBuffer(valueBytes, out) out.writeInt(accumUpdates.size) for ((key, value) <- accumUpdates) { @@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me override def readExternal(in: ObjectInput) { - val objectSer = SparkEnv.get.serializer.newInstance() - val blen = in.readInt() val byteVal = new Array[Byte](blen) in.readFully(byteVal) - value = objectSer.deserialize(ByteBuffer.wrap(byteVal)) + valueBytes = ByteBuffer.wrap(byteVal) val numUpdates = in.readInt if (numUpdates == 0) { @@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me } metrics = in.readObject().asInstanceOf[TaskMetrics] } + + def value(): T = { + val objectSer = SparkEnv.get.serializer.newInstance() + return objectSer.deserialize(valueBytes) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index b97f2b19b5..788cbb81bf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -313,6 +313,8 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo } def createTaskResult(id: Int): DirectTaskResult[Int] = { - new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics) + val objectSer = SparkEnv.get.serializer.newInstance() + new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, + new TaskMetrics) } } From 882d544856c61573cdd6124e921f700d580d170d Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 16 Dec 2013 13:27:03 -0500 Subject: [PATCH 05/23] UI to display serialisation time of a stage. --- .../src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 69f9446bab..81651bdd20 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) { None } else { + val serializationTimes = validTasks.map{case (info, metrics, exception) => + metrics.get.serializationTime.toDouble} + val serializationQuantiles = "Serialization Time" +: Distribution(serializationTimes).get.getQuantiles().map( + ms => parent.formatDuration(ms.toLong)) + val serviceTimes = validTasks.map{case (info, metrics, exception) => metrics.get.executorRunTime.toDouble} val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( @@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) { val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) val listings: Seq[Seq[String]] = Seq( + serializationQuantiles, serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, From 963d6f065a763c2b94529bbd3ac4326e190bb2d7 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 16 Dec 2013 23:14:52 -0500 Subject: [PATCH 06/23] Incorporate pwendell's code review suggestions. --- .../src/main/scala/org/apache/spark/executor/Executor.scala | 6 +++--- .../main/scala/org/apache/spark/executor/TaskMetrics.scala | 4 ++-- .../src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 ++-- .../scheduler/cluster/ClusterTaskSetManagerSuite.scala | 3 +-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 02ad64d070..0f19d7a96b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -222,9 +222,9 @@ private[spark] class Executor( return } - val objectSer = SparkEnv.get.serializer.newInstance() + val resultSer = SparkEnv.get.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() - val valueBytes = objectSer.serialize(value) + val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { @@ -232,7 +232,7 @@ private[spark] class Executor( m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime - m.serializationTime = (afterSerialization - beforeSerialization).toInt + m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt } val accumUpdates = Accumulators.values diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index c036866afd..bb1471d9ee 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -44,9 +44,9 @@ class TaskMetrics extends Serializable { var jvmGCTime: Long = _ /** - * Amount of time spent serializing the result of the task + * Amount of time spent serializing the task result */ - var serializationTime: Long = _ + var resultSerializationTime: Long = _ /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 81651bdd20..2f06efa66e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -102,8 +102,8 @@ private[spark] class StagePage(parent: JobProgressUI) { } else { val serializationTimes = validTasks.map{case (info, metrics, exception) => - metrics.get.serializationTime.toDouble} - val serializationQuantiles = "Serialization Time" +: Distribution(serializationTimes).get.getQuantiles().map( + metrics.get.resultSerializationTime.toDouble} + val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map( ms => parent.formatDuration(ms.toLong)) val serviceTimes = validTasks.map{case (info, metrics, exception) => diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index 788cbb81bf..2476ab5c19 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -314,7 +314,6 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo def createTaskResult(id: Int): DirectTaskResult[Int] = { val objectSer = SparkEnv.get.serializer.newInstance() - new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, - new TaskMetrics) + new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, new TaskMetrics) } } From b2f0329511f3caaf473cf300792690703a300a22 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Tue, 17 Dec 2013 00:18:46 -0500 Subject: [PATCH 07/23] Missed a spot; had an objectSer here too. --- .../main/scala/org/apache/spark/scheduler/TaskResult.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 4e00bc8271..e80cc6b0f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -73,7 +73,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long } def value(): T = { - val objectSer = SparkEnv.get.serializer.newInstance() - return objectSer.deserialize(valueBytes) + val resultSer = SparkEnv.get.serializer.newInstance() + return resultSer.deserialize(valueBytes) } } From 36060f4f50ead2632117bb12e8c5bc1fb4f91f1e Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Tue, 17 Dec 2013 17:55:38 +0800 Subject: [PATCH 08/23] spark-898, changes according to review comments --- .../apache/spark/ui/exec/ExecutorsUI.scala | 39 ++++++++++++++-- .../spark/ui/jobs/ExecutorSummary.scala | 3 +- .../apache/spark/ui/jobs/ExecutorTable.scala | 40 +++++++++-------- .../org/apache/spark/ui/jobs/IndexPage.scala | 5 +-- .../spark/ui/jobs/JobProgressListener.scala | 31 +++++++------ .../org/apache/spark/ui/jobs/StagePage.scala | 3 +- .../ui/jobs/JobProgressListenerSuite.scala | 45 +++++-------------- 7 files changed, 90 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index e596690bc3..808bbe8c8f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_) val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", - "Active tasks", "Failed tasks", "Complete tasks", "Total tasks") + "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read", + "Shuffle Write") def execRow(kv: Seq[String]) = { @@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { {kv(7)} {kv(8)} {kv(9)} + {Utils.msDurationToString(kv(10).toLong)} + {Utils.bytesToString(kv(11).toLong)} + {Utils.bytesToString(kv(12).toLong)} } @@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks + val totalDuration = listener.executorToDuration.getOrElse(execId, 0) + val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0) + val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0) Seq( execId, @@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks.toString, failedTasks.toString, completedTasks.toString, - totalTasks.toString + totalTasks.toString, + totalDuration.toString, + totalShuffleRead.toString, + totalShuffleWrite.toString ) } @@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() + val executorToDuration = HashMap[String, Long]() + val executorToShuffleRead = HashMap[String, Long]() + val executorToShuffleWrite = HashMap[String, Long]() override def onTaskStart(taskStart: SparkListenerTaskStart) { val eid = taskStart.taskInfo.executorId @@ -137,9 +150,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks += taskStart.taskInfo } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val eid = taskEnd.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) + val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration + executorToDuration.put(eid, newDuration) + activeTasks -= taskEnd.taskInfo val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { @@ -150,6 +166,23 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 (None, Option(taskEnd.taskMetrics)) } + + // update shuffle read/write + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead + executorToShuffleRead.put(eid, newShuffleRead) + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten + executorToShuffleWrite.put(eid, newShuffleWrite) + } + case _ => {} + } } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index f2ee12081c..75c0dd2c7f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -19,9 +19,8 @@ package org.apache.spark.ui.jobs private[spark] class ExecutorSummary() { var duration : Long = 0 - var totalTasks : Int = 0 var failedTasks : Int = 0 - var succeedTasks : Int = 0 + var succeededTasks : Int = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index c6823cd823..763d5a344b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -17,14 +17,13 @@ package org.apache.spark.ui.jobs - import scala.xml.Node import org.apache.spark.scheduler.SchedulingMode - +import org.apache.spark.util.Utils /** Page showing executor summary */ -private[spark] class ExecutorTable(val parent: JobProgressUI) { +private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) { val listener = parent.listener val dateFmt = parent.dateFmt @@ -42,9 +41,9 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) { Executor ID Duration - #Tasks - #Failed Tasks - #Succeed Tasks + Total Tasks + Failed Tasks + Succeeded Tasks Shuffle Read Shuffle Write @@ -55,19 +54,24 @@ private[spark] class ExecutorTable(val parent: JobProgressUI) { } private def createExecutorTable() : Seq[Node] = { - val executorIdToSummary = listener.executorIdToSummary - executorIdToSummary.toSeq.sortBy(_._1).map{ - case (k,v) => { - - {k} - {v.duration} ms - {v.totalTasks} - {v.failedTasks} - {v.succeedTasks} - {v.shuffleRead} - {v.shuffleWrite} - + val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) + executorIdToSummary match { + case Some(x) => { + x.toSeq.sortBy(_._1).map{ + case (k,v) => { + + {k} + {parent.formatDuration(v.duration)} + {v.failedTasks + v.succeededTasks} + {v.failedTasks} + {v.succeededTasks} + {Utils.bytesToString(v.shuffleRead)} + {Utils.bytesToString(v.shuffleWrite)} + + } + } } + case _ => { Seq[Node]() } } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 653a84b60f..854afb665a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -45,7 +45,6 @@ private[spark] class IndexPage(parent: JobProgressUI) { val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent) val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent) - val executorTable = new ExecutorTable(parent) val pools = listener.sc.getAllPools val poolTable = new PoolTable(pools, listener) @@ -59,7 +58,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
  • Scheduling Mode: {parent.sc.getSchedulingMode}
  • Executor Summary: - {listener.executorIdToSummary.size} + {listener.stageIdToExecutorSummaries.size}
  • Active Stages: @@ -82,8 +81,6 @@ private[spark] class IndexPage(parent: JobProgressUI) { } else { Seq() }} ++ -

    Executor Summary

    ++ - executorTable.toNodeSeq++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq++

    Completed Stages ({completedStages.size})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 2635478592..8c92ff19a6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -57,7 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTasksFailed = HashMap[Int, Int]() val stageIdToTaskInfos = HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() - val executorIdToSummary = HashMap[String, ExecutorSummary]() + val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -115,9 +115,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None)) stageIdToTaskInfos(sid) = taskList - val executorSummary = executorIdToSummary.getOrElseUpdate(key = taskStart.taskInfo.executorId, - op = new ExecutorSummary()) - executorSummary.totalTasks += 1 } override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) @@ -127,32 +124,39 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - // update executor summary - val executorSummary = executorIdToSummary.get(taskEnd.taskInfo.executorId) + val sid = taskEnd.task.stageId + + // create executor summary map if necessary + val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, + op = new HashMap[String, ExecutorSummary]()) + executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId, + op = new ExecutorSummary()) + + val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId) executorSummary match { - case Some(x) => { + case Some(y) => { // first update failed-task, succeed-task taskEnd.reason match { - case e: ExceptionFailure => - x.failedTasks += 1 + case Success => + y.succeededTasks += 1 case _ => - x.succeedTasks += 1 + y.failedTasks += 1 } // update duration - x.duration += taskEnd.taskInfo.duration + y.duration += taskEnd.taskInfo.duration // update shuffle read/write val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics shuffleRead match { case Some(s) => - x.shuffleRead += s.remoteBytesRead + y.shuffleRead += s.remoteBytesRead case _ => {} } val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics shuffleWrite match { case Some(s) => { - x.shuffleWrite += s.shuffleBytesWritten + y.shuffleWrite += s.shuffleBytesWritten } case _ => {} } @@ -160,7 +164,6 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList case _ => {} } - val sid = taskEnd.task.stageId val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 69f9446bab..c077613b1d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -160,9 +160,10 @@ private[spark] class StagePage(parent: JobProgressUI) { def quantileRow(data: Seq[String]): Seq[Node] = {data.map(d => {d})} Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true)) } - + val executorTable = new ExecutorTable(parent, stageId) val content = summary ++ +

    Summary Metrics for Executors

    ++ executorTable.toNodeSeq() ++

    Summary Metrics for {numCompleted} Completed Tasks

    ++
    {summaryTable.getOrElse("No tasks have reported metrics yet.")}
    ++

    Tasks

    ++ taskTable diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 861d37a862..67a57a0e7f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -19,26 +19,19 @@ package org.apache.spark.ui.jobs import org.scalatest.FunSuite import org.apache.spark.scheduler._ -import org.apache.spark.SparkContext -import org.apache.spark.Success +import org.apache.spark.{LocalSparkContext, SparkContext, Success} import org.apache.spark.scheduler.SparkListenerTaskStart import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} -class JobProgressListenerSuite extends FunSuite { +class JobProgressListenerSuite extends FunSuite with LocalSparkContext { test("test executor id to summary") { - val sc = new SparkContext("local", "joblogger") + val sc = new SparkContext("local", "test") val listener = new JobProgressListener(sc) val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() // nothing in it - assert(listener.executorIdToSummary.size == 0) - - // launched a task, should get an item in map - listener.onTaskStart(new SparkListenerTaskStart( - new ShuffleMapTask(0, null, null, 0, null), - new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) - assert(listener.executorIdToSummary.size == 1) + assert(listener.stageIdToExecutorSummaries.size == 0) // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 @@ -47,20 +40,15 @@ class JobProgressListenerSuite extends FunSuite { taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 1000) + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) + .shuffleRead == 1000) // finish a task with unknown executor-id, nothing should happen taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL) taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.size == 1) - - // launched a task - listener.onTaskStart(new SparkListenerTaskStart( - new ShuffleMapTask(0, null, null, 0, null), - new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL))) - assert(listener.executorIdToSummary.size == 1) + assert(listener.stageIdToExecutorSummaries.size == 1) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -69,13 +57,8 @@ class JobProgressListenerSuite extends FunSuite { taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.getOrElse("exe-1", fail()).shuffleRead == 2000) - - // launched a task in another exec - listener.onTaskStart(new SparkListenerTaskStart( - new ShuffleMapTask(0, null, null, 0, null), - new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL))) - assert(listener.executorIdToSummary.size == 2) + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) + .shuffleRead == 2000) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -84,13 +67,7 @@ class JobProgressListenerSuite extends FunSuite { taskInfo.finishTime = 1 listener.onTaskEnd(new SparkListenerTaskEnd( new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics)) - assert(listener.executorIdToSummary.getOrElse("exe-2", fail()).shuffleRead == 1000) - - // do finalize - sc.stop() - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") + assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) + .shuffleRead == 1000) } } From 59e53fa21caa202a57093c74ada128fca2be5bac Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Tue, 17 Dec 2013 17:57:27 +0800 Subject: [PATCH 09/23] spark-968, changes for avoid a NPE --- .../apache/spark/ui/exec/ExecutorsUI.scala | 30 ++++++++++--------- .../spark/ui/jobs/JobProgressListener.scala | 24 ++++++++------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 808bbe8c8f..f62ae37466 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -150,7 +150,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { activeTasks += taskStart.taskInfo } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val eid = taskEnd.taskInfo.executorId val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration @@ -168,20 +168,22 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } // update shuffle read/write - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead - executorToShuffleRead.put(eid, newShuffleRead) - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten - executorToShuffleWrite.put(eid, newShuffleWrite) + if (null != taskEnd.taskMetrics) { + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead + executorToShuffleRead.put(eid, newShuffleRead) + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten + executorToShuffleWrite.put(eid, newShuffleWrite) + } + case _ => {} } - case _ => {} } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 8c92ff19a6..64ce715993 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -147,18 +147,20 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList y.duration += taskEnd.taskInfo.duration // update shuffle read/write - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - y.shuffleRead += s.remoteBytesRead - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - y.shuffleWrite += s.shuffleBytesWritten + if (null != taskEnd.taskMetrics) { + val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics + shuffleRead match { + case Some(s) => + y.shuffleRead += s.remoteBytesRead + case _ => {} + } + val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics + shuffleWrite match { + case Some(s) => { + y.shuffleWrite += s.shuffleBytesWritten + } + case _ => {} } - case _ => {} } } case _ => {} From 717c7fddb27a3ec8732a760c000bbfa7060d76c1 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Tue, 17 Dec 2013 23:02:21 -0500 Subject: [PATCH 10/23] objectSer -> valueSer in a test. --- .../spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index cb719d7ab9..bb28a31a99 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -313,7 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo } def createTaskResult(id: Int): DirectTaskResult[Int] = { - val objectSer = SparkEnv.get.serializer.newInstance() - new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, new TaskMetrics) + val valueSer = SparkEnv.get.serializer.newInstance() + new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) } } From d3b1af4b6c7766bbf7a09ee6d5c1b13eda6b098f Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Wed, 18 Dec 2013 14:25:56 -0500 Subject: [PATCH 11/23] Add a serialisation time column to the StagePage. --- .../src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2f06efa66e..996e1b4d1a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) { val taskHeaders: Seq[String] = Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ - Seq("Duration", "GC Time") ++ + Seq("Duration", "GC Time", "Result Ser Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ Seq("Errors") @@ -189,6 +189,7 @@ private[spark] class StagePage(parent: JobProgressUI) { val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L) + val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L) val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead} val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") @@ -216,6 +217,9 @@ private[spark] class StagePage(parent: JobProgressUI) { {if (gcTime > 0) parent.formatDuration(gcTime) else ""} + + {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""} + {if (shuffleRead) { {shuffleReadReadable} From af0cd6bd27dda73b326bcb6a66addceadebf5e54 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 18 Dec 2013 11:40:07 -0800 Subject: [PATCH 12/23] Add collectPartition to JavaRDD interface. Also remove takePartition from PythonRDD and use collectPartition in rdd.py. --- .../apache/spark/api/java/JavaRDDLike.scala | 11 +++++++- .../apache/spark/api/python/PythonRDD.scala | 4 --- .../scala/org/apache/spark/JavaAPISuite.java | 28 +++++++++++++++++++ python/pyspark/context.py | 3 -- python/pyspark/rdd.py | 2 +- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 9e912d3adb..1d71875ed1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,7 +26,7 @@ import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark.{SparkContext, Partition, TaskContext} -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, PartitionPruningRDD} import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import org.apache.spark.partial.{PartialResult, BoundedDouble} @@ -244,6 +244,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } + /** + * Return an array that contains all of the elements in a specific partition of this RDD. + */ + def collectPartition(partitionId: Int): JList[T] = { + import scala.collection.JavaConversions._ + val partition = new PartitionPruningRDD[T](rdd, _ == partitionId) + new java.util.ArrayList(partition.collect().toSeq) + } + /** * Reduces the elements of this RDD using the specified commutative and associative binary operator. */ diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a659cc06c2..ca42c76928 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -235,10 +235,6 @@ private[spark] object PythonRDD { file.close() } - def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { - implicit val cm : ClassTag[T] = rdd.elementClassTag - rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator - } } private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] { diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 4234f6eac7..2862ed3019 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -897,4 +897,32 @@ public class JavaAPISuite implements Serializable { new Tuple2(0, 4)), rdd3.collect()); } + + @Test + public void collectPartition() { + JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); + + JavaPairRDD rdd2 = rdd1.map(new PairFunction() { + @Override + public Tuple2 call(Integer i) throws Exception { + return new Tuple2(i, i % 2); + } + }); + + Assert.assertEquals(Arrays.asList(1, 2), rdd1.collectPartition(0)); + Assert.assertEquals(Arrays.asList(3, 4), rdd1.collectPartition(1)); + Assert.assertEquals(Arrays.asList(5, 6, 7), rdd1.collectPartition(2)); + + Assert.assertEquals(Arrays.asList(new Tuple2(1, 1), + new Tuple2(2, 0)), + rdd2.collectPartition(0)); + Assert.assertEquals(Arrays.asList(new Tuple2(3, 1), + new Tuple2(4, 0)), + rdd2.collectPartition(1)); + Assert.assertEquals(Arrays.asList(new Tuple2(5, 1), + new Tuple2(6, 0), + new Tuple2(7, 1)), + rdd2.collectPartition(2)); + } + } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index cbd41e58c4..0604f6836c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -43,7 +43,6 @@ class SparkContext(object): _gateway = None _jvm = None _writeToFile = None - _takePartition = None _next_accum_id = 0 _active_spark_context = None _lock = Lock() @@ -134,8 +133,6 @@ class SparkContext(object): SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = \ SparkContext._jvm.PythonRDD.writeToFile - SparkContext._takePartition = \ - SparkContext._jvm.PythonRDD.takePartition if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 61720dcf1a..d81b7c90c1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -577,7 +577,7 @@ class RDD(object): mapped = self.mapPartitions(takeUpToNum) items = [] for partition in range(mapped._jrdd.splits().size()): - iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition) + iterator = mapped._jrdd.collectPartition(partition).iterator() items.extend(mapped._collect_iterator_through_file(iterator)) if len(items) >= num: break From 293a0af5a1def95e47d9188f42957083f5adf3b8 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 18 Dec 2013 18:51:02 -0800 Subject: [PATCH 13/23] In experimental clusters we've observed that a 10 second timeout was insufficient, despite having a low number of nodes and relatively small workload (16 nodes, <1.5 TB data). This would cause an entire job to fail at the beginning of the reduce phase. There is no particular reason for this value to be small as a timeout should only occur in an exceptional situation. Also centralized the reading of spark.akka.askTimeout to AkkaUtils (surely this can later be cleaned up to use Typesafe). Finally, deleted some lurking implicits. If anyone can think of a reason they should still be there, please let me know. --- .../org/apache/spark/MapOutputTracker.scala | 5 ++--- .../org/apache/spark/deploy/client/Client.scala | 6 +++--- .../org/apache/spark/deploy/master/Master.scala | 17 ++++++----------- .../deploy/master/ui/ApplicationPage.scala | 14 +++++--------- .../spark/deploy/master/ui/IndexPage.scala | 16 ++++++---------- .../spark/deploy/master/ui/MasterWebUI.scala | 10 +++------- .../spark/deploy/worker/ui/IndexPage.scala | 4 ++-- .../spark/deploy/worker/ui/WorkerWebUI.scala | 10 +++------- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 ++++------ .../spark/storage/BlockManagerMaster.scala | 7 ++++--- .../spark/storage/BlockManagerMasterActor.scala | 11 ++++------- .../spark/ui/storage/BlockManagerUI.scala | 3 --- .../scala/org/apache/spark/util/AkkaUtils.scala | 6 ++++++ 13 files changed, 48 insertions(+), 71 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 10fae5af9f..ccffcc356c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -29,8 +29,7 @@ import akka.pattern.ask import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap} - +import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) @@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster private[spark] class MapOutputTracker extends Logging { - private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + private val timeout = AkkaUtils.askTimeout // Set to the MapOutputTrackerActor living on the driver var trackerActor: Either[ActorRef, ActorSelection] = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 4d95efa73a..953755e40d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -23,14 +23,14 @@ import scala.concurrent.duration._ import scala.concurrent.Await import akka.actor._ -import akka.pattern.AskTimeoutException import akka.pattern.ask -import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent} +import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent} import org.apache.spark.{SparkException, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master +import org.apache.spark.util.AkkaUtils /** @@ -178,7 +178,7 @@ private[spark] class Client( def stop() { if (actor != null) { try { - val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeout = AkkaUtils.askTimeout val future = actor.ask(StopClient)(timeout) Await.result(future, timeout) } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c627dd3806..7b2b1c3327 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -18,19 +18,16 @@ package org.apache.spark.deploy.master import java.text.SimpleDateFormat -import java.util.concurrent.TimeUnit import java.util.Date import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ -import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor._ import akka.pattern.ask -import akka.remote._ +import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension -import akka.util.Timeout import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} @@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { import context.dispatcher @@ -537,12 +534,10 @@ private[spark] object Master { def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) - val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName) - val timeoutDuration: FiniteDuration = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) - implicit val timeout = Timeout(timeoutDuration) - val respFuture = actor ? RequestWebUIPort // ask pattern - val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse] + val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName) + val timeout = AkkaUtils.askTimeout + val respFuture = actor.ask(RequestWebUIPort)(timeout) + val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse] (actorSystem, boundPort, resp.webUIBoundPort) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 3b983c19eb..dbb0cb90f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -17,32 +17,28 @@ package org.apache.spark.deploy.master.ui +import scala.concurrent.Await import scala.xml.Node import akka.pattern.ask - -import scala.concurrent.Await -import scala.concurrent.duration._ - import javax.servlet.http.HttpServletRequest - import net.liftweb.json.JsonAST.JValue -import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.JsonProtocol +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils private[spark] class ApplicationPage(parent: MasterWebUI) { val master = parent.masterActorRef - implicit val timeout = parent.timeout + val timeout = parent.timeout /** Executor details for a particular application */ def renderJson(request: HttpServletRequest): JValue = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) val app = state.activeApps.find(_.id == appId).getOrElse({ state.completedApps.find(_.id == appId).getOrElse(null) }) @@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) val app = state.activeApps.find(_.id == appId).getOrElse({ state.completedApps.find(_.id == appId).getOrElse(null) }) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 65e7a14e7a..4ef762892c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -17,37 +17,33 @@ package org.apache.spark.deploy.master.ui -import javax.servlet.http.HttpServletRequest - +import scala.concurrent.Await import scala.xml.Node -import scala.concurrent.Await import akka.pattern.ask -import scala.concurrent.duration._ - +import javax.servlet.http.HttpServletRequest import net.liftweb.json.JsonAST.JValue -import org.apache.spark.deploy.DeployWebUI +import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils private[spark] class IndexPage(parent: MasterWebUI) { val master = parent.masterActorRef - implicit val timeout = parent.timeout + val timeout = parent.timeout def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) JsonProtocol.writeMasterState(state) } /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, 30 seconds) + val state = Await.result(stateFuture, timeout) val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory") val workers = state.workers.sortBy(_.id) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index a211ce2b42..9ab594b682 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,25 +17,21 @@ package org.apache.spark.deploy.master.ui -import scala.concurrent.duration._ - import javax.servlet.http.HttpServletRequest - import org.eclipse.jetty.server.{Handler, Server} -import org.apache.spark.{Logging} +import org.apache.spark.Logging import org.apache.spark.deploy.master.Master import org.apache.spark.ui.JettyUtils import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} /** * Web UI server for the standalone master. */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { - implicit val timeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeout = AkkaUtils.askTimeout val host = Utils.localHostName() val port = requestedPort diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 1a768d501f..0d59048313 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) { def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] - val workerState = Await.result(stateFuture, 30 seconds) + val workerState = Await.result(stateFuture, timeout) JsonProtocol.writeWorkerState(workerState) } def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] - val workerState = Await.result(stateFuture, 30 seconds) + val workerState = Await.result(stateFuture, timeout) val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") val runningExecutorTable = diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 6c18a3c245..40d6bdb3fd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui import java.io.File -import scala.concurrent.duration._ - -import akka.util.Timeout import javax.servlet.http.HttpServletRequest +import org.eclipse.jetty.server.{Handler, Server} import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker import org.apache.spark.ui.{JettyUtils, UIUtils} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.Utils -import org.eclipse.jetty.server.{Handler, Server} +import org.apache.spark.util.{AkkaUtils, Utils} /** * Web UI server for the standalone worker. @@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server} private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) extends Logging { - implicit val timeout = Timeout( - Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) + val timeout = AkkaUtils.askTimeout val host = Utils.localHostName() val port = requestedPort.getOrElse( System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f5e8766f6d..7e22c843bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,10 +27,10 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{SparkException, Logging, TaskState} +import org.apache.spark.{Logging, SparkException, TaskState} import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + private val timeout = AkkaUtils.askTimeout + class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] @@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } - private val timeout = { - Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - } - def stopExecutors() { try { if (driverActor != null) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e05b842476..e1d68ef592 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -18,7 +18,6 @@ package org.apache.spark.storage import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._ @@ -26,15 +25,17 @@ import akka.pattern.ask import org.apache.spark.{Logging, SparkException} import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.AkkaUtils -private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging { +private[spark] +class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" - val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeout = AkkaUtils.askTimeout /** Remove a dead executor from the driver actor. This is only called on the driver side. */ def removeExecutor(execId: String) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 154a3980e9..21022e1cfb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap} import scala.collection.mutable import scala.collection.JavaConversions._ +import scala.concurrent.Future +import scala.concurrent.duration._ import akka.actor.{Actor, ActorRef, Cancellable} import akka.pattern.ask -import scala.concurrent.duration._ -import scala.concurrent.Future - import org.apache.spark.{Logging, SparkException} import org.apache.spark.storage.BlockManagerMessages._ -import org.apache.spark.util.Utils - +import org.apache.spark.util.{AkkaUtils, Utils} /** * BlockManagerMasterActor is an actor on the master node to track statuses of @@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] - val akkaTimeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + private val akkaTimeout = AkkaUtils.askTimeout initLogging() diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala index a5446b3fc3..39f422dd6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala @@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging { - implicit val timeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - val indexPage = new IndexPage(this) val rddPage = new RDDPage(this) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 74133cef6c..1c8b51b8bc 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import scala.concurrent.duration.{Duration, FiniteDuration} + import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory @@ -84,4 +86,8 @@ private[spark] object AkkaUtils { (actorSystem, boundPort) } + /** Returns the default Spark timeout to use for Akka ask operations. */ + def askTimeout: FiniteDuration = { + Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds") + } } From eaf6a269b123e1eca1f1a3cb9e210a9b37ae4a27 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 17 Nov 2013 22:37:46 -0800 Subject: [PATCH 14/23] [SPARK-959] Explicitly depend on org.eclipse.jetty.orbit jar Without this, in some cases, Ivy attempts to download the wrong file and fails, stopping the whole build. See bug for more details. (This is probably also the beginning of the slow death of our recently prettified dependencies. Form follow function.) --- project/SparkBuild.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 29f4a4b9ff..ab96cfa18b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -177,6 +177,8 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "io.netty" % "netty-all" % "4.0.0.CR1", "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", + /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ + "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), "org.scalatest" %% "scalatest" % "1.9.1" % "test", "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", "com.novocode" % "junit-interface" % "0.9" % "test", From a76f53416c0267e8a9816ee4d22fe8e838c4c319 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 19 Dec 2013 14:38:20 +0200 Subject: [PATCH 15/23] Add toString to Java RDD, and __repr__ to Python RDD --- core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 2 ++ python/pyspark/rdd.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index c47657f512..037cd1c774 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -125,6 +125,8 @@ JavaRDDLike[T, JavaRDD[T]] { */ def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) + + override def toString = rdd.toString } object JavaRDD { diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 61720dcf1a..7cbc66d3c9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -54,6 +54,9 @@ class RDD(object): self.ctx = ctx self._jrdd_deserializer = jrdd_deserializer + def __repr__(self): + return self._jrdd.toString() + @property def context(self): """ From d3234f9726db3917af4688ba70933938b078b0bd Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 19 Dec 2013 11:40:34 -0800 Subject: [PATCH 16/23] Make collectPartitions take an array of partitions Change the implementation to use runJob instead of PartitionPruningRDD. Also update the unit tests and the python take implementation to use the new interface. --- .../apache/spark/api/java/JavaRDDLike.scala | 8 ++++---- .../scala/org/apache/spark/JavaAPISuite.java | 19 ++++++++++++------- python/pyspark/rdd.py | 7 ++++++- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 1d71875ed1..458d9dcbc3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,7 +26,7 @@ import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark.{SparkContext, Partition, TaskContext} -import org.apache.spark.rdd.{RDD, PartitionPruningRDD} +import org.apache.spark.rdd.RDD import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import org.apache.spark.partial.{PartialResult, BoundedDouble} @@ -247,10 +247,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return an array that contains all of the elements in a specific partition of this RDD. */ - def collectPartition(partitionId: Int): JList[T] = { + def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = { import scala.collection.JavaConversions._ - val partition = new PartitionPruningRDD[T](rdd, _ == partitionId) - new java.util.ArrayList(partition.collect().toSeq) + val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true) + res.map(x => new java.util.ArrayList(x.toSeq)).toArray } /** diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 2862ed3019..79913dc718 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -899,7 +899,7 @@ public class JavaAPISuite implements Serializable { } @Test - public void collectPartition() { + public void collectPartitions() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); JavaPairRDD rdd2 = rdd1.map(new PairFunction() { @@ -909,20 +909,25 @@ public class JavaAPISuite implements Serializable { } }); - Assert.assertEquals(Arrays.asList(1, 2), rdd1.collectPartition(0)); - Assert.assertEquals(Arrays.asList(3, 4), rdd1.collectPartition(1)); - Assert.assertEquals(Arrays.asList(5, 6, 7), rdd1.collectPartition(2)); + List[] parts = rdd1.collectPartitions(new int[] {0}); + Assert.assertEquals(Arrays.asList(1, 2), parts[0]); + + parts = rdd1.collectPartitions(new int[] {1, 2}); + Assert.assertEquals(Arrays.asList(3, 4), parts[0]); + Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); Assert.assertEquals(Arrays.asList(new Tuple2(1, 1), new Tuple2(2, 0)), - rdd2.collectPartition(0)); + rdd2.collectPartitions(new int[] {0})[0]); + + parts = rdd2.collectPartitions(new int[] {1, 2}); Assert.assertEquals(Arrays.asList(new Tuple2(3, 1), new Tuple2(4, 0)), - rdd2.collectPartition(1)); + parts[0]); Assert.assertEquals(Arrays.asList(new Tuple2(5, 1), new Tuple2(6, 0), new Tuple2(7, 1)), - rdd2.collectPartition(2)); + parts[1]); } } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d81b7c90c1..7015119551 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -576,8 +576,13 @@ class RDD(object): # Take only up to num elements from each partition we try mapped = self.mapPartitions(takeUpToNum) items = [] + # TODO(shivaram): Similar to the scala implementation, update the take + # method to scan multiple splits based on an estimate of how many elements + # we have per-split. for partition in range(mapped._jrdd.splits().size()): - iterator = mapped._jrdd.collectPartition(partition).iterator() + partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) + partitionsToTake[0] = partition + iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() items.extend(mapped._collect_iterator_through_file(iterator)) if len(items) >= num: break From 9cc3a6d3c0a64b80af77ae358c58d4b29b18c534 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 19 Dec 2013 11:49:17 -0800 Subject: [PATCH 17/23] Add comment explaining collectPartitions's use --- core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 458d9dcbc3..f344804b4c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -248,6 +248,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an array that contains all of the elements in a specific partition of this RDD. */ def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = { + // This is useful for implementing `take` from other language frontends + // like Python where the data is serialized. import scala.collection.JavaConversions._ val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true) res.map(x => new java.util.ArrayList(x.toSeq)).toArray From 1ab031eaff7fb2473adb3e909a7a969e9cd28b49 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 19 Dec 2013 17:51:29 -0800 Subject: [PATCH 18/23] Extraordinarily minor code/comment cleanup --- .../spark/deploy/master/SparkZooKeeperSession.scala | 12 ++++++------ .../deploy/master/ZooKeeperLeaderElectionAgent.scala | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala index 81e15c534f..5b957fcd5a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -18,12 +18,12 @@ package org.apache.spark.deploy.master import scala.collection.JavaConversions._ -import scala.concurrent.ops._ + +import org.apache.zookeeper._ +import org.apache.zookeeper.Watcher.Event.KeeperState +import org.apache.zookeeper.data.Stat import org.apache.spark.Logging -import org.apache.zookeeper._ -import org.apache.zookeeper.data.Stat -import org.apache.zookeeper.Watcher.Event.KeeperState /** * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry @@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState * informed via zkDown(). * * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many - * times or a semantic exception is thrown (e.g.., "node already exists"). + * times or a semantic exception is thrown (e.g., "node already exists"). */ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "") @@ -179,7 +179,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext } catch { case e: KeeperException.NoNodeException => throw e case e: KeeperException.NodeExistsException => throw e - case e if n > 0 => + case e: Exception if n > 0 => logError("ZooKeeper exception, " + n + " more retries...", e) Thread.sleep(RETRY_WAIT_MILLIS) retry(fn, n-1) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 7809013e83..7d535b08de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -21,8 +21,8 @@ import akka.actor.ActorRef import org.apache.zookeeper._ import org.apache.zookeeper.Watcher.Event.EventType -import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.Logging +import org.apache.spark.deploy.master.MasterMessages._ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { @@ -105,7 +105,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, mas // We found a different master file pointing to this process. // This can happen in the following two cases: // (1) The master process was restarted on the same node. - // (2) The ZK server died between creating the node and returning the name of the node. + // (2) The ZK server died between creating the file and returning the name of the file. // For this case, we will end up creating a second file, and MUST explicitly delete the // first one, since our ZK session is still open. // Note that this deletion will cause a NodeDeleted event to be fired so we check again for From 4d74b899b7daff74054b70f38cddf7b38fe6c211 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 19 Dec 2013 17:53:41 -0800 Subject: [PATCH 19/23] Remove firstApp from the standalone scheduler Master As a lonely child with no one to care for it... we had to put it down. --- .../scala/org/apache/spark/deploy/master/Master.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7b2b1c3327..eebd0794b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -61,8 +61,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val waitingApps = new ArrayBuffer[ApplicationInfo] val completedApps = new ArrayBuffer[ApplicationInfo] - var firstApp: Option[ApplicationInfo] = None - Utils.checkHost(host, "Expected hostname") val masterMetricsSystem = MetricsSystem.createMetricsSystem("master") @@ -441,14 +439,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act idToApp(app.id) = app actorToApp(app.driver) = app addressToApp(appAddress) = app - if (firstApp == None) { - firstApp = Some(app) - } - // TODO: What is firstApp?? Can we remove it? - val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray - if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) { - logWarning("Could not find any workers with enough memory for " + firstApp.get.id) - } waitingApps += app } From 6613ab663d42f6b54fe823b06307b8a1005bbb6a Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 19 Dec 2013 17:56:13 -0800 Subject: [PATCH 20/23] Fix compiler warning in SparkZooKeeperSession --- .../org/apache/spark/deploy/master/SparkZooKeeperSession.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala index 5b957fcd5a..6cc7fd2ff4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -103,6 +103,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext connectToZooKeeper() case KeeperState.Disconnected => logWarning("ZooKeeper disconnected, will retry...") + case s => // Do nothing } } } From c979eecdf6a11462595aba9d5b8fc942682cf85d Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Sun, 22 Dec 2013 21:43:15 +0800 Subject: [PATCH 21/23] added changes according to comments from rxin --- .../apache/spark/ui/exec/ExecutorsUI.scala | 24 +++++++------------ .../spark/ui/jobs/ExecutorSummary.scala | 5 ++-- .../apache/spark/ui/jobs/ExecutorTable.scala | 4 ++-- .../org/apache/spark/ui/jobs/IndexPage.scala | 4 ---- .../spark/ui/jobs/JobProgressListener.scala | 23 ++++++------------ .../org/apache/spark/ui/jobs/StagePage.scala | 4 ++-- .../org/apache/spark/ui/jobs/StageTable.scala | 2 +- 7 files changed, 23 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index f62ae37466..a31a7e1d58 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -56,7 +56,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_) val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", - "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read", + "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read", "Shuffle Write") def execRow(kv: Seq[String]) = { @@ -169,21 +169,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { // update shuffle read/write if (null != taskEnd.taskMetrics) { - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead - executorToShuffleRead.put(eid, newShuffleRead) - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten - executorToShuffleWrite.put(eid, newShuffleWrite) - } - case _ => {} - } + taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead => + executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) + + shuffleRead.remoteBytesRead)) + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite => + executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) + + shuffleWrite.shuffleBytesWritten)) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 75c0dd2c7f..3c53e88380 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -17,8 +17,9 @@ package org.apache.spark.ui.jobs -private[spark] class ExecutorSummary() { - var duration : Long = 0 +/** class for reporting aggregated metrics for each executors in stageUI */ +private[spark] class ExecutorSummary { + var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 var shuffleRead : Long = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 763d5a344b..0e9dd4a8c7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -40,7 +40,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) - + @@ -61,7 +61,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) case (k,v) => { - + diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 854afb665a..ca5a28625b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -56,10 +56,6 @@ private[spark] class IndexPage(parent: JobProgressUI) { {parent.formatDuration(now - listener.sc.startTime)}
  • Scheduling Mode: {parent.sc.getSchedulingMode}
  • -
  • - Executor Summary: - {listener.stageIdToExecutorSummaries.size} -
  • Active Stages: {activeStages.size} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 64ce715993..07a42f0503 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -144,23 +144,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } // update duration - y.duration += taskEnd.taskInfo.duration + y.taskTime += taskEnd.taskInfo.duration - // update shuffle read/write - if (null != taskEnd.taskMetrics) { - val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics - shuffleRead match { - case Some(s) => - y.shuffleRead += s.remoteBytesRead - case _ => {} - } - val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics - shuffleWrite match { - case Some(s) => { - y.shuffleWrite += s.shuffleBytesWritten - } - case _ => {} - } + taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead => + y.shuffleRead += shuffleRead.remoteBytesRead + } + + taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite => + y.shuffleWrite += shuffleWrite.shuffleBytesWritten } } case _ => {} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c077613b1d..d8a6c9e2dc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
    • - Total duration across all tasks: + Total task time across all tasks: {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
    • {if (hasShuffleRead) @@ -163,9 +163,9 @@ private[spark] class StagePage(parent: JobProgressUI) { val executorTable = new ExecutorTable(parent, stageId) val content = summary ++ -

      Summary Metrics for Executors

      ++ executorTable.toNodeSeq() ++

      Summary Metrics for {numCompleted} Completed Tasks

      ++
      {summaryTable.getOrElse("No tasks have reported metrics yet.")}
      ++ +

      Aggregated Metrics by Executors

      ++ executorTable.toNodeSeq() ++

      Tasks

      ++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 9ad6de3c6d..463d85dfd5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr {if (isFairScheduler) {
  • } else {}} - + From 2f689ba97b437092bf52063cface12aa9ee09bf3 Mon Sep 17 00:00:00 2001 From: "wangda.tan" Date: Mon, 23 Dec 2013 15:03:45 +0800 Subject: [PATCH 22/23] SPARK-968, added executor address showing in aggregated metrics by executors table --- .../org/apache/spark/ui/jobs/ExecutorTable.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0e9dd4a8c7..0dd876480a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -21,6 +21,7 @@ import scala.xml.Node import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.util.Utils +import scala.collection.mutable /** Page showing executor summary */ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) { @@ -40,6 +41,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
    Executor IDDurationTask Time Total Tasks Failed Tasks Succeeded Tasks
    {k}{parent.formatDuration(v.duration)}{parent.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} Pool NameDescription SubmittedDurationTask Time Tasks: Succeeded/Total Shuffle Read Shuffle Write
    + @@ -54,6 +56,16 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) } private def createExecutorTable() : Seq[Node] = { + // make a executor-id -> address map + val executorIdToAddress = mutable.HashMap[String, String]() + val storageStatusList = parent.sc.getExecutorStorageStatus + for (statusId <- 0 until storageStatusList.size) { + val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId + val address = blockManagerId.hostPort + val executorId = blockManagerId.executorId + executorIdToAddress.put(executorId, address) + } + val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) executorIdToSummary match { case Some(x) => { @@ -61,6 +73,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) case (k,v) => { + From fc80b2e693d4c52d0f1ada67216723902c09c666 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 23 Dec 2013 21:20:20 -0800 Subject: [PATCH 23/23] Show full stack trace and time taken in unit tests. --- project/SparkBuild.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ab96cfa18b..7bcbd90bd3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -114,6 +114,9 @@ object SparkBuild extends Build { fork := true, javaOptions += "-Xmx3g", + // Show full stack trace and duration in test cases. + testOptions in Test += Tests.Argument("-oDF"), + // Only allow one test at a time, even across projects, since they run in the same JVM concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), @@ -259,7 +262,7 @@ object SparkBuild extends Build { libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v ) ) - + def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", libraryDependencies ++= Seq(
    Executor IDAddress Task Time Total Tasks Failed Tasks
    {k}{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} {parent.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks} {v.failedTasks}