Minor cleanup
This commit is contained in:
parent
524d01ea31
commit
b08306c5cf
|
@ -48,7 +48,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
|
||||
private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
|
||||
def timeWriting = _timeWriting
|
||||
|
||||
private var _timeWriting = 0L
|
||||
|
||||
private def callWithTiming(f: => Unit) = {
|
||||
|
@ -88,6 +87,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
|||
override def close() {
|
||||
if (initialized) {
|
||||
if (syncWrites) {
|
||||
// Force outstanding writes to disk and track how long it takes
|
||||
val start = System.nanoTime()
|
||||
objOut.flush()
|
||||
fos.getFD.sync()
|
||||
|
|
|
@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
|||
Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
|
||||
Seq("GC Time") ++
|
||||
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
|
||||
{if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
|
||||
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
|
||||
Seq("Errors")
|
||||
|
||||
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
|
||||
|
@ -152,22 +152,6 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
|||
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
|
||||
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
|
||||
|
||||
|
||||
val remoteBytesRead: Option[Long] = metrics.flatMap{m => m.shuffleReadMetrics}.map(r => r.remoteBytesRead)
|
||||
val shuffleBytesWritten: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.map(r => r.shuffleBytesWritten)
|
||||
|
||||
val writeThroughput: Option[Long] = metrics.flatMap{m => m.shuffleWriteMetrics}.flatMap{ s=>
|
||||
val bytesWritten = s.shuffleBytesWritten
|
||||
val timeTaken = s.shuffleWriteTime
|
||||
val timeSeconds = timeTaken / (1000 * 1000 * 1000.0)
|
||||
if (bytesWritten < 10000 || timeSeconds < .01) { // To little data to form an useful average
|
||||
None
|
||||
} else {
|
||||
Some((bytesWritten / timeSeconds).toLong)
|
||||
}
|
||||
}
|
||||
val writeThroughputStr = writeThroughput.map(t => " (%s/s)".format(Utils.bytesToString(t)))
|
||||
|
||||
<tr>
|
||||
<td>{info.taskId}</td>
|
||||
<td>{info.status}</td>
|
||||
|
@ -185,10 +169,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
|
|||
Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
|
||||
}}
|
||||
{if (shuffleWrite) {
|
||||
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
|
||||
parent.formatDuration(s.shuffleWriteTime / (1000 * 1000))}.getOrElse("")}</td>
|
||||
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
|
||||
Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}
|
||||
{writeThroughputStr.getOrElse("")}
|
||||
</td>
|
||||
Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
|
||||
}}
|
||||
<td>{exception.map(e =>
|
||||
<span>
|
||||
|
|
|
@ -1,28 +1,25 @@
|
|||
package spark.storage
|
||||
package org.apache.spark.storage
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, Executors}
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.{CountDownLatch, Executors}
|
||||
|
||||
import spark.{KryoSerializer, SparkContext, Utils}
|
||||
import org.apache.spark.serializer.KryoSerializer
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/** Utility for micro-benchmarking storage performance. */
|
||||
/** Utility for micro-benchmarking shuffle write performance.
|
||||
*
|
||||
* Writes simulated shuffle output from several threads and records the observed throughput*/
|
||||
object StoragePerfTester {
|
||||
/** Writing shuffle data from several concurrent tasks and measure throughput. */
|
||||
def main(args: Array[String]) = {
|
||||
def intArg(key: String, default: Int) = Option(System.getenv(key)).map(_.toInt).getOrElse(default)
|
||||
def stringArg(key: String, default: String) = Option(System.getenv(key)).getOrElse(default)
|
||||
|
||||
/** Total number of simulated shuffles to run. */
|
||||
val numShuffles = intArg("NUM_SHUFFLES", 1)
|
||||
|
||||
/** Total amount of data to generate, will be distributed evenly amongst maps and reduce splits. */
|
||||
val dataSizeMb = Utils.memoryStringToMb(stringArg("OUTPUT_DATA", "1g"))
|
||||
/** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
|
||||
val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
|
||||
|
||||
/** Number of map tasks. All tasks execute concurrently. */
|
||||
val numMaps = intArg("NUM_MAPS", 8)
|
||||
val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
|
||||
|
||||
/** Number of reduce splits for each map task. */
|
||||
val numOutputSplits = intArg("NUM_REDUCERS", 500)
|
||||
val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
|
||||
|
||||
val recordLength = 1000 // ~1KB records
|
||||
val totalRecords = dataSizeMb * 1000
|
||||
|
@ -34,11 +31,13 @@ object StoragePerfTester {
|
|||
System.setProperty("spark.shuffle.compress", "false")
|
||||
System.setProperty("spark.shuffle.sync", "true")
|
||||
|
||||
// This is only used to instantiate a BlockManager. All thread scheduling is done manually.
|
||||
val sc = new SparkContext("local[4]", "Write Tester")
|
||||
val blockManager = sc.env.blockManager
|
||||
|
||||
def writeOutputBytes(mapId: Int, shuffleId: Int, total: AtomicLong) = {
|
||||
val shuffle = blockManager.shuffleBlockManager.forShuffle(shuffleId, numOutputSplits, new KryoSerializer())
|
||||
def writeOutputBytes(mapId: Int, total: AtomicLong) = {
|
||||
val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits,
|
||||
new KryoSerializer())
|
||||
val buckets = shuffle.acquireWriters(mapId)
|
||||
for (i <- 1 to recordsPerMap) {
|
||||
buckets.writers(i % numOutputSplits).write(writeData)
|
||||
|
@ -52,36 +51,32 @@ object StoragePerfTester {
|
|||
shuffle.releaseWriters(buckets)
|
||||
}
|
||||
|
||||
for (shuffle <- 1 to numShuffles) {
|
||||
val start = System.currentTimeMillis()
|
||||
val latch = new CountDownLatch(numMaps)
|
||||
val totalBytes = new AtomicLong()
|
||||
for (task <- 1 to numMaps) {
|
||||
executor.submit(new Runnable() {
|
||||
override def run() = {
|
||||
try {
|
||||
writeOutputBytes(task, shuffle, totalBytes)
|
||||
latch.countDown()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
println("Exception in child thread: " + e + " " + e.getMessage)
|
||||
System.exit(1)
|
||||
}
|
||||
val start = System.currentTimeMillis()
|
||||
val latch = new CountDownLatch(numMaps)
|
||||
val totalBytes = new AtomicLong()
|
||||
for (task <- 1 to numMaps) {
|
||||
executor.submit(new Runnable() {
|
||||
override def run() = {
|
||||
try {
|
||||
writeOutputBytes(task, totalBytes)
|
||||
latch.countDown()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
println("Exception in child thread: " + e + " " + e.getMessage)
|
||||
System.exit(1)
|
||||
}
|
||||
})
|
||||
}
|
||||
latch.await()
|
||||
val end = System.currentTimeMillis()
|
||||
val time = (end - start) / 1000.0
|
||||
val bytesPerSecond = totalBytes.get() / time
|
||||
val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
|
||||
|
||||
System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
|
||||
System.err.println("bytes_per_file\t\t%s".format(Utils.memoryBytesToString(bytesPerFile)))
|
||||
System.err.println("agg_throughput\t\t%s/s".format(Utils.memoryBytesToString(bytesPerSecond.toLong)))
|
||||
System.err.println("Shuffle %s is finished in %ss. To run next shuffle, press Enter:".format(shuffle, time))
|
||||
readLine()
|
||||
}
|
||||
})
|
||||
}
|
||||
latch.await()
|
||||
val end = System.currentTimeMillis()
|
||||
val time = (end - start) / 1000.0
|
||||
val bytesPerSecond = totalBytes.get() / time
|
||||
val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
|
||||
|
||||
System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
|
||||
System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
|
||||
System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
|
||||
|
||||
executor.shutdown()
|
||||
sc.stop()
|
||||
|
|
Loading…
Reference in a new issue