Merge pull request #480 from pwendell/0.9-fixes

Handful of 0.9 fixes

This patch addresses a few fixes for Spark 0.9.0 based on the last release candidate.

@mridulm gets credit for reporting most of the issues here. Many of the fixes here are based on his work in #477 and follow up discussion with him.
This commit is contained in:
Patrick Wendell 2014-01-21 00:09:42 -08:00
commit 77b986f661
7 changed files with 65 additions and 24 deletions

View file

@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
} }
/** Get all akka conf variables set on this SparkConf */ /** Get all akka conf variables set on this SparkConf */
def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")} def getAkkaConf: Seq[(String, String)] =
/* This is currently undocumented. If we want to make this public we should consider
* nesting options under the spark namespace to avoid conflicts with user akka options.
* Otherwise users configuring their own akka code via system properties could mess up
* spark's akka options.
*
* E.g. spark.akka.option.x.y.x = "value"
*/
getAll.filter {case (k, v) => k.startsWith("akka.")}
/** Does the configuration contain a given parameter? */ /** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key) def contains(key: String): Boolean = settings.contains(key)

View file

@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
val numPartitions = val numPartitions =
// listStatus can throw exception if path does not exist. // listStatus can throw exception if path does not exist.
if (fs.exists(cpath)) { if (fs.exists(cpath)) {
val dirContents = fs.listStatus(cpath) val dirContents = fs.listStatus(cpath).map(_.getPath)
val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.size val numPart = partitionFiles.size
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) { ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {

View file

@ -233,7 +233,7 @@ private[spark] class TaskSetManager(
/** Check whether a task is currently running an attempt on a given host */ /** Check whether a task is currently running an attempt on a given host */
private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = { private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
!taskAttempts(taskIndex).exists(_.host == host) taskAttempts(taskIndex).exists(_.host == host)
} }
/** /**

View file

@ -138,6 +138,7 @@ private[spark] class DiskBlockObjectWriter(
fos = null fos = null
ts = null ts = null
objOut = null objOut = null
initialized = false
} }
} }
@ -145,7 +146,8 @@ private[spark] class DiskBlockObjectWriter(
override def commit(): Long = { override def commit(): Long = {
if (initialized) { if (initialized) {
// NOTE: Flush the serializer first and then the compressed/buffered output stream // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush() objOut.flush()
bs.flush() bs.flush()
val prevPos = lastValidPosition val prevPos = lastValidPosition
@ -175,7 +177,6 @@ private[spark] class DiskBlockObjectWriter(
} }
override def fileSegment(): FileSegment = { override def fileSegment(): FileSegment = {
val bytesWritten = lastValidPosition - initialPosition
new FileSegment(file, initialPosition, bytesWritten) new FileSegment(file, initialPosition, bytesWritten)
} }

View file

@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */ /** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup { private[spark] trait ShuffleWriterGroup {
@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer. * files within a ShuffleFileGroups associated with the block's reducer.
*/ */
private[spark] private[spark]
class ShuffleBlockManager(blockManager: BlockManager) { class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
def conf = blockManager.conf def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@ -106,6 +107,15 @@ class ShuffleBlockManager(blockManager: BlockManager) {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId) val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile")
} else {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
} }
} }

View file

@ -20,14 +20,15 @@ package org.apache.spark.util.collection
import java.io._ import java.io._
import java.util.Comparator import java.util.Comparator
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer} import org.apache.spark.io.LZFCompressionCodec
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter} import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
/** /**
* An append-only map that spills sorted content to disk when there is insufficient space for it * An append-only map that spills sorted content to disk when there is insufficient space for it
@ -153,9 +154,33 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock() val (blockId, file) = diskBlockManager.createTempBlock()
val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _) /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
* closes and re-opens serialization and compression streams within each file. This makes some
* assumptions about the way that serialization and compression streams work, specifically:
*
* 1) The serializer input streams do not pre-fetch data from the underlying stream.
*
* 2) Several compression streams can be opened, written to, and flushed on the write path
* while only one compression input stream is created on the read path
*
* In practice (1) is only true for Java, so we add a special fix below to make it work for
* Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
*
* To avoid making these assumptions we should create an intermediate stream that batches
* objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
* This is a bit tricky because, within each segment, you'd need to track the total number
* of bytes written and then re-wind and write it at the beginning of the segment. This will
* most likely require using the file channel API.
*/
val shouldCompress = blockManager.shouldCompress(blockId)
val compressionCodec = new LZFCompressionCodec(sparkConf)
def wrapForCompression(outputStream: OutputStream) = {
if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream
}
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
compressStream, syncWrites) wrapForCompression, syncWrites)
var writer = getNewWriter var writer = getNewWriter
var objectsWritten = 0 var objectsWritten = 0
@ -168,6 +193,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten == serializerBatchSize) { if (objectsWritten == serializerBatchSize) {
writer.commit() writer.commit()
writer.close()
_diskBytesSpilled += writer.bytesWritten
writer = getNewWriter writer = getNewWriter
objectsWritten = 0 objectsWritten = 0
} }
@ -176,8 +203,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten > 0) writer.commit() if (objectsWritten > 0) writer.commit()
} finally { } finally {
// Partial failures cannot be tolerated; do not revert partial writes // Partial failures cannot be tolerated; do not revert partial writes
_diskBytesSpilled += writer.bytesWritten
writer.close() writer.close()
_diskBytesSpilled += writer.bytesWritten
} }
currentMap = new SizeTrackingAppendOnlyMap[K, C] currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file, blockId)) spilledMaps.append(new DiskMapIterator(file, blockId))

View file

@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.shuffle.spill.compress</td> <td>spark.shuffle.spill.compress</td>
<td>true</td> <td>true</td>
<td> <td>
Whether to compress data spilled during shuffles. Whether to compress data spilled during shuffles. If enabled, spill compression
always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
regardless of the value of `spark.io.compression.codec`.
</td> </td>
</tr> </tr>
<tr> <tr>
@ -379,13 +381,6 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit. Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td> </td>
</tr> </tr>
<tr>
<td>akka.x.y....</td>
<td>value</td>
<td>
An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
</td>
</tr>
<tr> <tr>
<td>spark.shuffle.consolidateFiles</td> <td>spark.shuffle.consolidateFiles</td>