From b08306c5cf015f6f2ca3b808cda127d438b80fc8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 7 Oct 2013 15:48:17 -0700 Subject: [PATCH] Minor cleanup --- .../org/apache/spark/storage/DiskStore.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 24 +----- .../spark/storage/StoragePerfTester.scala | 85 +++++++++---------- 3 files changed, 45 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c10bf6069c..e6d6190bc3 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -48,7 +48,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) /** Intercepts write calls and tracks total time spent writing. Not thread safe. */ private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream { def timeWriting = _timeWriting - private var _timeWriting = 0L private def callWithTiming(f: => Unit) = { @@ -88,6 +87,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def close() { if (initialized) { if (syncWrites) { + // Force outstanding writes to disk and track how long it takes val start = System.nanoTime() objOut.flush() fos.getFD.sync() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 701bc64a8b..b7c81d091c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++ Seq("GC Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++ + {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ Seq("Errors") val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) @@ -152,22 +152,6 @@ private[spark] class StagePage(parent: JobProgressUI) { else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L) - - val remoteBytesRead: Option[Long] = metrics.flatMap{m => m.shuffleReadMetrics}.map(r => r.remoteBytesRead) - val shuffleBytesWritten: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.map(r => r.shuffleBytesWritten) - - val writeThroughput: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.flatMap{ s=> - val bytesWritten = s.shuffleBytesWritten - val timeTaken = s.shuffleWriteTime - val timeSeconds = timeTaken / (1000 * 1000 * 1000.0) - if (bytesWritten < 10000 || timeSeconds < .01) { // To little data to form an useful average - None - } else { - Some((bytesWritten / timeSeconds).toLong) - } - } - val writeThroughputStr = writeThroughput.map(t => " (%s/s)".format(Utils.bytesToString(t))) - {info.taskId} {info.status} @@ -185,10 +169,10 @@ private[spark] class StagePage(parent: JobProgressUI) { Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")} }} {if (shuffleWrite) { + {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => + parent.formatDuration(s.shuffleWriteTime / (1000 * 1000))}.getOrElse("")} {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => - Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")} - {writeThroughputStr.getOrElse("")} - + Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")} }} {exception.map(e => diff --git a/core/src/main/scala/spark/storage/StoragePerfTester.scala b/core/src/main/scala/spark/storage/StoragePerfTester.scala index 6ecd936bbc..5f30383fd0 100644 --- a/core/src/main/scala/spark/storage/StoragePerfTester.scala +++ b/core/src/main/scala/spark/storage/StoragePerfTester.scala @@ -1,28 +1,25 @@ -package spark.storage +package org.apache.spark.storage -import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{CountDownLatch, Executors} -import spark.{KryoSerializer, SparkContext, Utils} +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.SparkContext +import org.apache.spark.util.Utils -/** Utility for micro-benchmarking storage performance. */ +/** Utility for micro-benchmarking shuffle write performance. + * + * Writes simulated shuffle output from several threads and records the observed throughput*/ object StoragePerfTester { - /** Writing shuffle data from several concurrent tasks and measure throughput. */ def main(args: Array[String]) = { - def intArg(key: String, default: Int) = Option(System.getenv(key)).map(_.toInt).getOrElse(default) - def stringArg(key: String, default: String) = Option(System.getenv(key)).getOrElse(default) - - /** Total number of simulated shuffles to run. */ - val numShuffles = intArg("NUM_SHUFFLES", 1) - - /** Total amount of data to generate, will be distributed evenly amongst maps and reduce splits. */ - val dataSizeMb = Utils.memoryStringToMb(stringArg("OUTPUT_DATA", "1g")) + /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ + val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) /** Number of map tasks. All tasks execute concurrently. */ - val numMaps = intArg("NUM_MAPS", 8) + val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) /** Number of reduce splits for each map task. */ - val numOutputSplits = intArg("NUM_REDUCERS", 500) + val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) val recordLength = 1000 // ~1KB records val totalRecords = dataSizeMb * 1000 @@ -34,11 +31,13 @@ object StoragePerfTester { System.setProperty("spark.shuffle.compress", "false") System.setProperty("spark.shuffle.sync", "true") + // This is only used to instantiate a BlockManager. All thread scheduling is done manually. val sc = new SparkContext("local[4]", "Write Tester") val blockManager = sc.env.blockManager - def writeOutputBytes(mapId: Int, shuffleId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forShuffle(shuffleId, numOutputSplits, new KryoSerializer()) + def writeOutputBytes(mapId: Int, total: AtomicLong) = { + val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits, + new KryoSerializer()) val buckets = shuffle.acquireWriters(mapId) for (i <- 1 to recordsPerMap) { buckets.writers(i % numOutputSplits).write(writeData) @@ -52,36 +51,32 @@ object StoragePerfTester { shuffle.releaseWriters(buckets) } - for (shuffle <- 1 to numShuffles) { - val start = System.currentTimeMillis() - val latch = new CountDownLatch(numMaps) - val totalBytes = new AtomicLong() - for (task <- 1 to numMaps) { - executor.submit(new Runnable() { - override def run() = { - try { - writeOutputBytes(task, shuffle, totalBytes) - latch.countDown() - } catch { - case e: Exception => - println("Exception in child thread: " + e + " " + e.getMessage) - System.exit(1) - } + val start = System.currentTimeMillis() + val latch = new CountDownLatch(numMaps) + val totalBytes = new AtomicLong() + for (task <- 1 to numMaps) { + executor.submit(new Runnable() { + override def run() = { + try { + writeOutputBytes(task, totalBytes) + latch.countDown() + } catch { + case e: Exception => + println("Exception in child thread: " + e + " " + e.getMessage) + System.exit(1) } - }) - } - latch.await() - val end = System.currentTimeMillis() - val time = (end - start) / 1000.0 - val bytesPerSecond = totalBytes.get() / time - val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong - - System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) - System.err.println("bytes_per_file\t\t%s".format(Utils.memoryBytesToString(bytesPerFile))) - System.err.println("agg_throughput\t\t%s/s".format(Utils.memoryBytesToString(bytesPerSecond.toLong))) - System.err.println("Shuffle %s is finished in %ss. To run next shuffle, press Enter:".format(shuffle, time)) - readLine() + } + }) } + latch.await() + val end = System.currentTimeMillis() + val time = (end - start) / 1000.0 + val bytesPerSecond = totalBytes.get() / time + val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong + + System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) + System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) + System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) executor.shutdown() sc.stop()