Merge pull request #50 from amplab/mergemerge

Merge Spark master into graphx
This commit is contained in:
Joey 2013-11-05 01:32:55 -08:00
commit ca44b5134a
36 changed files with 642 additions and 345 deletions

View file

@ -160,10 +160,10 @@ with YARN, also set `SPARK_YARN=true`:
For convenience, these variables may also be set through the For convenience, these variables may also be set through the
`conf/spark-env.sh` file described below. `conf/spark-env.sh` file described below.
When developing a Spark application, specify the Hadoop version by When developing a Spark application, specify the Hadoop version by adding the
adding the "hadoop-client" artifact to your project's "hadoop-client" artifact to your project's dependencies. For example, if you're
dependencies. For example, if you're using Hadoop 1.0.1 and build your using Hadoop 1.2.1 and build your application using SBT, add this entry to
application using SBT, add this entry to `libraryDependencies`: `libraryDependencies`:
"org.apache.hadoop" % "hadoop-client" % "1.2.1" "org.apache.hadoop" % "hadoop-client" % "1.2.1"

View file

@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.Map import scala.collection.Map
import scala.collection.generic.Growable import scala.collection.generic.Growable
import scala.collection.JavaConversions._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.deploy.LocalSparkCluster import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._ import org.apache.spark.rdd._
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
@ -245,7 +245,7 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = { val hadoopConfiguration = {
val env = SparkEnv.get val env = SparkEnv.get
val conf = env.hadoop.newConfiguration() val conf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables // Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) { System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
@ -255,8 +255,10 @@ class SparkContext(
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
} }
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) { Utils.getSystemProperties.foreach { case (key, value) =>
conf.set(key.substring("spark.hadoop.".length), System.getProperty(key)) if (key.startsWith("spark.hadoop.")) {
conf.set(key.substring("spark.hadoop.".length), value)
}
} }
val bufferSize = System.getProperty("spark.buffer.size", "65536") val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize) conf.set("io.file.buffer.size", bufferSize)
@ -379,7 +381,7 @@ class SparkContext(
minSplits: Int = defaultMinSplits minSplits: Int = defaultMinSplits
): RDD[(K, V)] = { ): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it. // Add necessary security credentials to the JobConf before broadcasting it.
SparkEnv.get.hadoop.addCredentials(conf) SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
} }
@ -589,6 +591,7 @@ class SparkContext(
val uri = new URI(path) val uri = new URI(path)
val key = uri.getScheme match { val key = uri.getScheme match {
case null | "file" => env.httpFileServer.addFile(new File(uri.getPath)) case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
case "local" => "file:" + uri.getPath
case _ => path case _ => path
} }
addedFiles(key) = System.currentTimeMillis addedFiles(key) = System.currentTimeMillis
@ -698,7 +701,7 @@ class SparkContext(
key = uri.getScheme match { key = uri.getScheme match {
// A JAR file which exists only on the driver node // A JAR file which exists only on the driver node
case null | "file" => case null | "file" =>
if (env.hadoop.isYarnMode()) { if (SparkHadoopUtil.get.isYarnMode()) {
// In order for this to work on yarn the user must specify the --addjars option to // In order for this to work on yarn the user must specify the --addjars option to
// the client to upload the file into the distributed cache to make it show up in the // the client to upload the file into the distributed cache to make it show up in the
// current working directory. // current working directory.
@ -936,9 +939,8 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory. * prevent accidental overriding of checkpoint files in the existing directory.
*/ */
def setCheckpointDir(dir: String, useExisting: Boolean = false) { def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val env = SparkEnv.get
val path = new Path(dir) val path = new Path(dir)
val fs = path.getFileSystem(env.hadoop.newConfiguration()) val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
if (!useExisting) { if (!useExisting) {
if (fs.exists(path)) { if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.") throw new Exception("Checkpoint directory '" + path + "' already exists.")

View file

@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster} import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
import org.apache.spark.network.ConnectionManager import org.apache.spark.network.ConnectionManager
import org.apache.spark.serializer.{Serializer, SerializerManager} import org.apache.spark.serializer.{Serializer, SerializerManager}
import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.api.python.PythonWorkerFactory
import com.google.common.collect.MapMaker
/** /**
* Holds all the runtime environment objects for a running Spark instance (either master or worker), * Holds all the runtime environment objects for a running Spark instance (either master or worker),
@ -58,18 +58,9 @@ class SparkEnv (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
val hadoop = { // A general, soft-reference map for metadata needed during HadoopRDD split computation
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
if(yarnMode) { private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}
def stop() { def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() } pythonWorkers.foreach { case(key, worker) => worker.stop() }

View file

@ -20,17 +20,13 @@ package org.apache.spark.deploy
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.JobConf
import com.google.common.collect.MapMaker import org.apache.spark.SparkException
/** /**
* Contains util methods to interact with Hadoop from Spark. * Contains util methods to interact with Hadoop from Spark.
*/ */
private[spark] private[spark]
class SparkHadoopUtil { class SparkHadoopUtil {
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
/** /**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
@ -45,5 +41,23 @@ class SparkHadoopUtil {
def addCredentials(conf: JobConf) {} def addCredentials(conf: JobConf) {}
def isYarnMode(): Boolean = { false } def isYarnMode(): Boolean = { false }
}
object SparkHadoopUtil {
private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}
def get: SparkHadoopUtil = {
hadoop
}
} }

View file

@ -18,6 +18,7 @@
package org.apache.spark.rdd package org.apache.spark.rdd
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter} import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{NullWritable, BytesWritable} import org.apache.hadoop.io.{NullWritable, BytesWritable}
@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging {
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get val env = SparkEnv.get
val outputDir = new Path(path) val outputDir = new Path(path)
val fs = outputDir.getFileSystem(env.hadoop.newConfiguration()) val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName) val finalOutputPath = new Path(outputDir, finalOutputName)
@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging {
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val env = SparkEnv.get val env = SparkEnv.get
val fs = path.getFileSystem(env.hadoop.newConfiguration()) val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize) val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance() val serializer = env.serializer.newInstance()
@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test") val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp") val path = new Path(hdfsPath, "temp")
val fs = path.getFileSystem(env.hadoop.newConfiguration()) val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString) val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")

View file

@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.hadoop.conf.{Configuration, Configurable}
@ -198,10 +199,10 @@ private[spark] object HadoopRDD {
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process. * the local process.
*/ */
def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)
def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)
def putCachedMetadata(key: String, value: Any) = def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) SparkEnv.get.hadoopJobMetadata.put(key, value)
} }

View file

@ -18,6 +18,7 @@
package org.apache.spark.scheduler package org.apache.spark.scheduler
import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
import scala.collection.immutable.Set import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation
@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ... // This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val conf = new JobConf(configuration) val conf = new JobConf(configuration)
env.hadoop.addCredentials(conf) SparkHadoopUtil.get.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path) FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ... // This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val jobConf = new JobConf(configuration) val jobConf = new JobConf(configuration)
env.hadoop.addCredentials(jobConf) SparkHadoopUtil.get.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path) FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] = val instance: org.apache.hadoop.mapred.InputFormat[_, _] =

View file

@ -146,26 +146,26 @@ private[spark] class ShuffleMapTask(
metrics = Some(context.taskMetrics) metrics = Some(context.taskMetrics)
val blockManager = SparkEnv.get.blockManager val blockManager = SparkEnv.get.blockManager
var shuffle: ShuffleBlocks = null val shuffleBlockManager = blockManager.shuffleBlockManager
var buckets: ShuffleWriterGroup = null var shuffle: ShuffleWriterGroup = null
var success = false
try { try {
// Obtain all the block writers for shuffle blocks. // Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partitionId)
// Write the map output to its associated buckets. // Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, context)) { for (elem <- rdd.iterator(split, context)) {
val pair = elem.asInstanceOf[Product2[Any, Any]] val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1) val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair) shuffle.writers(bucketId).write(pair)
} }
// Commit the writes. Get the size of each bucket block (total block size). // Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L var totalBytes = 0L
var totalTime = 0L var totalTime = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit() writer.commit()
val size = writer.fileSegment().length val size = writer.fileSegment().length
totalBytes += size totalBytes += size
@ -179,19 +179,20 @@ private[spark] class ShuffleMapTask(
shuffleMetrics.shuffleWriteTime = totalTime shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
success = true
new MapStatus(blockManager.blockManagerId, compressedSizes) new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { case e: Exception => } catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes // If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark. // and throw the exception upstream to Spark.
if (buckets != null) { if (shuffle != null) {
buckets.writers.foreach(_.revertPartialWrites()) shuffle.writers.foreach(_.revertPartialWrites())
} }
throw e throw e
} finally { } finally {
// Release the writers back to the shuffle block manager. // Release the writers back to the shuffle block manager.
if (shuffle != null && buckets != null) { if (shuffle != null && shuffle.writers != null) {
buckets.writers.foreach(_.close()) shuffle.writers.foreach(_.close())
shuffle.releaseWriters(buckets) shuffle.releaseWriters(success)
} }
// Execute the callbacks on task completion. // Execute the callbacks on task completion.
context.executeOnCompleteCallbacks() context.executeOnCompleteCallbacks()

View file

@ -175,7 +175,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
reason.className, reason.description, locs.mkString("\n"))) reason.className, reason.description, locs.mkString("\n")))
if (numFailures(index) > MAX_TASK_FAILURES) { if (numFailures(index) > MAX_TASK_FAILURES) {
val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format( val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
taskSet.id, index, 4, reason.description) taskSet.id, index, MAX_TASK_FAILURES, reason.description)
decreaseRunningTasks(runningTasks) decreaseRunningTasks(runningTasks)
sched.dagScheduler.taskSetFailed(taskSet, errorMessage) sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
// need to delete failed Taskset from schedule queue // need to delete failed Taskset from schedule queue

View file

@ -17,7 +17,7 @@
package org.apache.spark.storage package org.apache.spark.storage
import java.io.{InputStream, OutputStream} import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer} import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{HashMap, ArrayBuffer} import scala.collection.mutable.{HashMap, ArrayBuffer}
@ -47,7 +47,7 @@ private[spark] class BlockManager(
extends Logging { extends Logging {
val shuffleBlockManager = new ShuffleBlockManager(this) val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager( val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
@ -462,15 +462,11 @@ private[spark] class BlockManager(
* This is currently used for writing shuffle files out. Callers should handle error * This is currently used for writing shuffle files out. Callers should handle error
* cases. * cases.
*/ */
def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int) def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = { : BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => { writer.registerCloseEventHandler(() => {
if (shuffleBlockManager.consolidateShuffleFiles) {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
}
val myInfo = new ShuffleBlockInfo() val myInfo = new ShuffleBlockInfo()
blockInfo.put(blockId, myInfo) blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length) myInfo.markReady(writer.fileSegment().length)

View file

@ -111,8 +111,8 @@ class DiskBlockObjectWriter(
private var fos: FileOutputStream = null private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null private var objOut: SerializationStream = null
private var initialPosition = 0L private val initialPosition = file.length()
private var lastValidPosition = 0L private var lastValidPosition = initialPosition
private var initialized = false private var initialized = false
private var _timeWriting = 0L private var _timeWriting = 0L
@ -120,7 +120,6 @@ class DiskBlockObjectWriter(
fos = new FileOutputStream(file, true) fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos) ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel() channel = fos.getChannel()
initialPosition = channel.position
lastValidPosition = initialPosition lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs) objOut = serializer.newInstance().serializeStream(bs)

View file

@ -20,12 +20,11 @@ package org.apache.spark.storage
import java.io.File import java.io.File
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.{Date, Random} import java.util.{Date, Random}
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.network.netty.{PathResolver, ShuffleSender} import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} import org.apache.spark.util.Utils
/** /**
* Creates and maintains the logical mapping between logical blocks and physical on-disk * Creates and maintains the logical mapping between logical blocks and physical on-disk
@ -35,7 +34,8 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
* *
* @param rootDirs The directories to use for storing block files. Data will be hashed among these. * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
*/ */
private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String)
extends PathResolver with Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
@ -47,54 +47,23 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null private var shuffleSender : ShuffleSender = null
// Stores only Blocks which have been specifically mapped to segments of files
// (rather than the default, which maps a Block to a whole file).
// This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks.
private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment]
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup)
addShutdownHook() addShutdownHook()
/**
* Creates a logical mapping from the given BlockId to a segment of a file.
* This will cause any accesses of the logical BlockId to be directed to the specified
* physical location.
*/
def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) {
blockToFileSegmentMap.put(blockId, fileSegment)
}
/** /**
* Returns the phyiscal file segment in which the given BlockId is located. * Returns the phyiscal file segment in which the given BlockId is located.
* If the BlockId has been mapped to a specific FileSegment, that will be returned. * If the BlockId has been mapped to a specific FileSegment, that will be returned.
* Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
*/ */
def getBlockLocation(blockId: BlockId): FileSegment = { def getBlockLocation(blockId: BlockId): FileSegment = {
if (blockToFileSegmentMap.internalMap.containsKey(blockId)) { if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
blockToFileSegmentMap.get(blockId).get shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
} else { } else {
val file = getFile(blockId.name) val file = getFile(blockId.name)
new FileSegment(file, 0, file.length()) new FileSegment(file, 0, file.length())
} }
} }
/** def getFile(filename: String): File = {
* Simply returns a File to place the given Block into. This does not physically create the file.
* If filename is given, that file will be used. Otherwise, we will use the BlockId to get
* a unique filename.
*/
def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = {
val actualFilename = if (filename == "") blockId.name else filename
val file = getFile(actualFilename)
if (!allowAppending && file.exists()) {
throw new IllegalStateException(
"Attempted to create file that already exists: " + actualFilename)
}
file
}
private def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that // Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename) val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length val dirId = hash % localDirs.length
@ -119,6 +88,8 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
new File(subDir, filename) new File(subDir, filename)
} }
def getFile(blockId: BlockId): File = getFile(blockId.name)
private def createLocalDirs(): Array[File] = { private def createLocalDirs(): Array[File] = {
logDebug("Creating local directories at root dirs '" + rootDirs + "'") logDebug("Creating local directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
@ -151,10 +122,6 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
} }
} }
private def cleanup(cleanupTime: Long) {
blockToFileSegmentMap.clearOldValues(cleanupTime)
}
private def addShutdownHook() { private def addShutdownHook() {
localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {

View file

@ -44,7 +44,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val bytes = _bytes.duplicate() val bytes = _bytes.duplicate()
logDebug("Attempting to put block " + blockId) logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis
val file = diskManager.createBlockFile(blockId, allowAppending = false) val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel() val channel = new FileOutputStream(file).getChannel()
while (bytes.remaining > 0) { while (bytes.remaining > 0) {
channel.write(bytes) channel.write(bytes)
@ -64,7 +64,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
logDebug("Attempting to write values for block " + blockId) logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis
val file = diskManager.createBlockFile(blockId, allowAppending = false) val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file) val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values.iterator) blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
val length = file.length val length = file.length

View file

@ -17,33 +17,45 @@
package org.apache.spark.storage package org.apache.spark.storage
import java.io.File
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import org.apache.spark.serializer.Serializer import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
private[spark] /** A group of writers for a ShuffleMapTask, one writer per reducer. */
class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter]) private[spark] trait ShuffleWriterGroup {
val writers: Array[BlockObjectWriter]
private[spark] /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
trait ShuffleBlocks { def releaseWriters(success: Boolean)
def acquireWriters(mapId: Int): ShuffleWriterGroup
def releaseWriters(group: ShuffleWriterGroup)
} }
/** /**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
* per reducer. * per reducer (this set of files is called a ShuffleFileGroup).
* *
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" per reducer * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
* it releases them for another task. * files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage. * - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id) * - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes. * time owns a particular fileId, and this id is returned to a pool when the task finishes.
* Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
* that specifies where in a given file the actual block data is located.
*
* Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
* ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
* each block stored in each file. In order to find the location of a shuffle block, we search the
* files within a ShuffleFileGroups associated with the block's reducer.
*/ */
private[spark] private[spark]
class ShuffleBlockManager(blockManager: BlockManager) { class ShuffleBlockManager(blockManager: BlockManager) {
@ -52,45 +64,152 @@ class ShuffleBlockManager(blockManager: BlockManager) {
val consolidateShuffleFiles = val consolidateShuffleFiles =
System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean
var nextFileId = new AtomicInteger(0) private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()
def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = { /**
new ShuffleBlocks { * Contains all the state related to a particular shuffle. This includes a pool of unused
// Get a group of writers for a map task. * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
override def acquireWriters(mapId: Int): ShuffleWriterGroup = { */
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 private class ShuffleState() {
val fileId = getUnusedFileId() val nextFileId = new AtomicInteger(0)
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
}
type ShuffleId = Int
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
private
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
if (consolidateShuffleFiles) { blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
val filename = physicalFileName(shuffleId, bucketId, fileId) }
blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
} else { } else {
blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
} val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
} val blockFile = blockManager.diskBlockManager.getFile(blockId)
new ShuffleWriterGroup(mapId, fileId, writers) blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
override def releaseWriters(group: ShuffleWriterGroup) {
recycleFileId(group.fileId)
}
} }
} }
private def getUnusedFileId(): Int = { override def releaseWriters(success: Boolean) {
val fileId = unusedFileIds.poll()
if (fileId == null) nextFileId.getAndIncrement() else fileId
}
private def recycleFileId(fileId: Int) {
if (consolidateShuffleFiles) { if (consolidateShuffleFiles) {
unusedFileIds.add(fileId) if (success) {
val offsets = writers.map(_.fileSegment().offset)
fileGroup.recordMapOutput(mapId, offsets)
} }
recycleFileGroup(fileGroup)
}
}
private def getUnusedFileGroup(): ShuffleFileGroup = {
val fileGroup = shuffleState.unusedFileGroups.poll()
if (fileGroup != null) fileGroup else newFileGroup()
}
private def newFileGroup(): ShuffleFileGroup = {
val fileId = shuffleState.nextFileId.getAndIncrement()
val files = Array.tabulate[File](numBuckets) { bucketId =>
val filename = physicalFileName(shuffleId, bucketId, fileId)
blockManager.diskBlockManager.getFile(filename)
}
val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files)
shuffleState.allFileGroups.add(fileGroup)
fileGroup
}
private def recycleFileGroup(group: ShuffleFileGroup) {
shuffleState.unusedFileGroups.add(group)
}
}
}
/**
* Returns the physical file segment in which the given BlockId is located.
* This function should only be called if shuffle file consolidation is enabled, as it is
* an error condition if we don't find the expected block.
*/
def getBlockLocation(id: ShuffleBlockId): FileSegment = {
// Search all file groups associated with this shuffle.
val shuffleState = shuffleStates(id.shuffleId)
for (fileGroup <- shuffleState.allFileGroups) {
val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
if (segment.isDefined) { return segment.get }
}
throw new IllegalStateException("Failed to find shuffle block: " + id)
} }
private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
"merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
} }
private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime)
}
}
private[spark]
object ShuffleBlockManager {
/**
* A group of shuffle files, one per reducer.
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
*/
private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
/**
* Stores the absolute index of each mapId in the files of this group. For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
/**
* Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
* This ordering allows us to compute block lengths by examining the following block offset.
* Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
* reducer.
*/
private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
new PrimitiveVector[Long]()
}
def numBlocks = mapIdToIndex.size
def apply(bucketId: Int) = files(bucketId)
def recordMapOutput(mapId: Int, offsets: Array[Long]) {
mapIdToIndex(mapId) = numBlocks
for (i <- 0 until offsets.length) {
blockOffsetsByReducer(i) += offsets(i)
}
}
/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
val file = files(reducerId)
val blockOffsets = blockOffsetsByReducer(reducerId)
val index = mapIdToIndex.getOrElse(mapId, -1)
if (index >= 0) {
val offset = blockOffsets(index)
val length =
if (index + 1 < numBlocks) {
blockOffsets(index + 1) - offset
} else {
file.length() - offset
}
assert(length >= 0)
Some(new FileSegment(file, offset, length))
} else {
None
}
}
}
} }

View file

@ -38,19 +38,19 @@ object StoragePerfTester {
val blockManager = sc.env.blockManager val blockManager = sc.env.blockManager
def writeOutputBytes(mapId: Int, total: AtomicLong) = { def writeOutputBytes(mapId: Int, total: AtomicLong) = {
val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits, val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer()) new KryoSerializer())
val buckets = shuffle.acquireWriters(mapId) val writers = shuffle.writers
for (i <- 1 to recordsPerMap) { for (i <- 1 to recordsPerMap) {
buckets.writers(i % numOutputSplits).write(writeData) writers(i % numOutputSplits).write(writeData)
} }
buckets.writers.map {w => writers.map {w =>
w.commit() w.commit()
total.addAndGet(w.fileSegment().length) total.addAndGet(w.fileSegment().length)
w.close() w.close()
} }
shuffle.releaseWriters(buckets) shuffle.releaseWriters(true)
} }
val start = System.currentTimeMillis() val start = System.currentTimeMillis()

View file

@ -59,7 +59,7 @@ object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext
"ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") { "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") {
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
SHUFFLE_MAP_TASK, BLOCK_MANAGER, DISK_BLOCK_MANAGER, BROADCAST_VARS = Value SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
type MetadataCleanerType = Value type MetadataCleanerType = Value

View file

@ -18,13 +18,12 @@
package org.apache.spark.util package org.apache.spark.util
import java.io._ import java.io._
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
import java.util.{Locale, Random, UUID} import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import java.util.regex.Pattern
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.io.Source import scala.io.Source
@ -36,7 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer import java.nio.ByteBuffer
import org.apache.spark.{SparkEnv, SparkException, Logging} import org.apache.spark.{SparkException, Logging}
/** /**
@ -148,7 +147,7 @@ private[spark] object Utils extends Logging {
return buf return buf
} }
private val shutdownDeletePaths = new collection.mutable.HashSet[String]() private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
// Register the path to be deleted via shutdown hook // Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) { def registerShutdownDeleteDir(file: File) {
@ -280,9 +279,8 @@ private[spark] object Utils extends Logging {
} }
case _ => case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val env = SparkEnv.get
val uri = new URI(url) val uri = new URI(url)
val conf = env.hadoop.newConfiguration() val conf = SparkHadoopUtil.get.newConfiguration()
val fs = FileSystem.get(uri, conf) val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri)) val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile) val out = new FileOutputStream(tempFile)
@ -819,4 +817,10 @@ private[spark] object Utils extends Logging {
// Nothing else to guard against ? // Nothing else to guard against ?
hashAbs hashAbs
} }
/** Returns a copy of the system properties that is thread-safe to iterator over. */
def getSystemProperties(): Map[String, String] = {
return System.getProperties().clone()
.asInstanceOf[java.util.Properties].toMap[String, String]
}
} }

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util.hash package org.apache.spark.util.collection
/** /**
@ -110,7 +110,7 @@ class BitSet(numBits: Int) {
*/ */
def get(index: Int): Boolean = { def get(index: Int): Boolean = {
val bitmask = 1L << (index & 0x3f) // mod 64 and shift val bitmask = 1L << (index & 0x3f) // mod 64 and shift
(words(index >>> 6) & bitmask) != 0 // div by 64 and mask (words(index >> 6) & bitmask) != 0 // div by 64 and mask
} }
@ -181,5 +181,5 @@ class BitSet(numBits: Int) {
/** Return the number of longs it would take to hold numBits. */ /** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >>> 6) + 1 private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
} }

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util.hash package org.apache.spark.util.collection
/** /**
@ -42,7 +42,6 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
*/ */
def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity)) def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
@transient private var _oldValues: Array[V] = null @transient private var _oldValues: Array[V] = null
// Treat the null key differently so we can use nulls in "data" to represent empty items. // Treat the null key differently so we can use nulls in "data" to represent empty items.
@ -71,7 +70,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
haveNullValue = true haveNullValue = true
nullValue = v nullValue = v
} else { } else {
val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = v _values(pos) = v
keySet.rehashIfNeeded(k, grow, move) keySet.rehashIfNeeded(k, grow, move)
_oldValues = null _oldValues = null
@ -88,7 +87,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
nullValue = v nullValue = v
} }
} else { } else {
val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = mergeF(_values(pos), v) _values(pos) = mergeF(_values(pos), v)
keySet.rehashIfNeeded(k, grow, move) keySet.rehashIfNeeded(k, grow, move)
_oldValues = null _oldValues = null
@ -111,8 +110,8 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
} }
nullValue nullValue
} else { } else {
val pos = keySet.fastAdd(k) val pos = keySet.addWithoutResize(k)
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
val newValue = defaultValue val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue _values(pos & OpenHashSet.POSITION_MASK) = newValue
keySet.rehashIfNeeded(k, grow, move) keySet.rehashIfNeeded(k, grow, move)

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util.hash package org.apache.spark.util.collection
/** /**
@ -43,6 +43,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity") require(initialCapacity >= 1, "Invalid initial capacity")
require(loadFactor < 1.0, "Load factor must be less than 1.0")
require(loadFactor > 0.0, "Load factor must be greater than 0.0")
import OpenHashSet._ import OpenHashSet._
@ -78,11 +80,15 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
protected var _mask = _capacity - 1 protected var _mask = _capacity - 1
protected var _size = 0 protected var _size = 0
protected var _data = classManifest[T].newArray(_capacity)
protected var _bitset = new BitSet(_capacity) protected var _bitset = new BitSet(_capacity)
def getBitSet = _bitset def getBitSet = _bitset
// Init of the array in constructor (instead of in declaration) to work around a Scala compiler
// specialization bug that would generate two arrays (one for Object and one for specialized T).
protected var _data: Array[T] = _
_data = new Array[T](_capacity)
/** Number of elements in the set. */ /** Number of elements in the set. */
def size: Int = _size def size: Int = _size
@ -97,7 +103,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
* and rehash all elements. * and rehash all elements.
*/ */
def add(k: T) { def add(k: T) {
fastAdd(k) addWithoutResize(k)
rehashIfNeeded(k, grow, move) rehashIfNeeded(k, grow, move)
} }
@ -111,14 +117,14 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
* @return The position where the key is placed, plus the highest order bit is set if the key * @return The position where the key is placed, plus the highest order bit is set if the key
* exists previously. * exists previously.
*/ */
def fastAdd(k: T): Int = putInto(_bitset, _data, k) def addWithoutResize(k: T): Int = putInto(_bitset, _data, k)
/** /**
* Rehash the set if it is overloaded. * Rehash the set if it is overloaded.
* @param k A parameter unused in the function, but to force the Scala compiler to specialize * @param k A parameter unused in the function, but to force the Scala compiler to specialize
* this method. * this method.
* @param allocateFunc Closure invoked when we are allocating a new, larger array. * @param allocateFunc Callback invoked when we are allocating a new, larger array.
* @param moveFunc Closure invoked when we move the key from one position (in the old data array) * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
* to a new position (in the new data array). * to a new position (in the new data array).
*/ */
def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
@ -127,7 +133,9 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
} }
} }
/** Return the position of the element in the underlying array. */ /**
* Return the position of the element in the underlying array, or INVALID_POS if it is not found.
*/
def getPos(k: T): Int = { def getPos(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask var pos = hashcode(hasher.hash(k)) & _mask
var i = 1 var i = 1
@ -163,7 +171,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
/** /**
* Put an entry into the set. Return the position where the key is placed. In addition, the * Put an entry into the set. Return the position where the key is placed. In addition, the
* highest bid in the returned position is set if the key exists prior to this put. * highest bit in the returned position is set if the key exists prior to this put.
* *
* This function assumes the data array has at least one empty slot. * This function assumes the data array has at least one empty slot.
*/ */
@ -177,7 +185,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
data(pos) = k data(pos) = k
bitset.set(pos) bitset.set(pos)
_size += 1 _size += 1
return pos | EXISTENCE_MASK return pos | NONEXISTENCE_MASK
} else if (data(pos) == k) { } else if (data(pos) == k) {
// Found an existing key. // Found an existing key.
return pos return pos
@ -199,8 +207,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
* *
* @param k A parameter unused in the function, but to force the Scala compiler to specialize * @param k A parameter unused in the function, but to force the Scala compiler to specialize
* this method. * this method.
* @param allocateFunc Closure invoked when we are allocating a new, larger array. * @param allocateFunc Callback invoked when we are allocating a new, larger array.
* @param moveFunc Closure invoked when we move the key from one position (in the old data array) * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
* to a new position (in the new data array). * to a new position (in the new data array).
*/ */
private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
@ -208,7 +216,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
allocateFunc(newCapacity) allocateFunc(newCapacity)
val newData = classManifest[T].newArray(newCapacity) val newData = new Array[T](newCapacity)
val newBitset = new BitSet(newCapacity) val newBitset = new BitSet(newCapacity)
var pos = 0 var pos = 0
_size = 0 _size = 0
@ -245,9 +253,7 @@ private[spark]
object OpenHashSet { object OpenHashSet {
val INVALID_POS = -1 val INVALID_POS = -1
val NONEXISTENCE_MASK = 0x80000000
val EXISTENCE_MASK = 0x80000000
val POSITION_MASK = 0xEFFFFFF val POSITION_MASK = 0xEFFFFFF
/** /**

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util.hash package org.apache.spark.util.collection
/** /**
@ -55,9 +55,15 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
_values(pos) _values(pos)
} }
/** Get the value for a given key, or returns elseValue if it doesn't exist. */
def getOrElse(k: K, elseValue: V): V = {
val pos = keySet.getPos(k)
if (pos >= 0) _values(pos) else elseValue
}
/** Set the value for a key */ /** Set the value for a key */
def update(k: K, v: V) { def update(k: K, v: V) {
val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = v _values(pos) = v
keySet.rehashIfNeeded(k, grow, move) keySet.rehashIfNeeded(k, grow, move)
_oldValues = null _oldValues = null
@ -66,9 +72,9 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
/** Set the value for a key */ /** Set the value for a key */
def setMerge(k: K, v: V, mergeF: (V,V) => V) { def setMerge(k: K, v: V, mergeF: (V,V) => V) {
val pos = keySet.fastAdd(k) val pos = keySet.addWithoutResize(k)
val ind = pos & OpenHashSet.POSITION_MASK val ind = pos & OpenHashSet.POSITION_MASK
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { // if first add if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add
_values(ind) = v _values(ind) = v
} else { } else {
_values(ind) = mergeF(_values(ind), v) _values(ind) = mergeF(_values(ind), v)
@ -85,8 +91,8 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
* @return the newly updated value. * @return the newly updated value.
*/ */
def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = { def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
val pos = keySet.fastAdd(k) val pos = keySet.addWithoutResize(k)
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
val newValue = defaultValue val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue _values(pos & OpenHashSet.POSITION_MASK) = newValue
keySet.rehashIfNeeded(k, grow, move) keySet.rehashIfNeeded(k, grow, move)

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util.collection
/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
private[spark]
class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
private var numElements = 0
private var array: Array[V] = _
// NB: This must be separate from the declaration, otherwise the specialized parent class
// will get its own array with the same initial size. TODO: Figure out why...
array = new Array[V](initialSize)
def apply(index: Int): V = {
require(index < numElements)
array(index)
}
def +=(value: V) {
if (numElements == array.length) { resize(array.length * 2) }
array(numElements) = value
numElements += 1
}
def length = numElements
def getUnderlyingArray = array
/** Resizes the array, dropping elements if the total length decreases. */
def resize(newLength: Int) {
val newArray = new Array[V](newLength)
array.copyToArray(newArray)
array = newArray
}
}

View file

@ -0,0 +1,84 @@
package org.apache.spark.storage
import java.io.{FileWriter, File}
import scala.collection.mutable
import com.google.common.io.Files
import org.scalatest.{BeforeAndAfterEach, FunSuite}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
val rootDir0 = Files.createTempDir()
rootDir0.deleteOnExit()
val rootDir1 = Files.createTempDir()
rootDir1.deleteOnExit()
val rootDirs = rootDir0.getName + "," + rootDir1.getName
println("Created root dirs: " + rootDirs)
val shuffleBlockManager = new ShuffleBlockManager(null) {
var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
}
var diskBlockManager: DiskBlockManager = _
override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
shuffleBlockManager.idToSegmentMap.clear()
}
test("basic block creation") {
val blockId = new TestBlockId("test")
assertSegmentEquals(blockId, blockId.name, 0, 0)
val newFile = diskBlockManager.getFile(blockId)
writeToFile(newFile, 10)
assertSegmentEquals(blockId, blockId.name, 0, 10)
newFile.delete()
}
test("block appending") {
val blockId = new TestBlockId("test")
val newFile = diskBlockManager.getFile(blockId)
writeToFile(newFile, 15)
assertSegmentEquals(blockId, blockId.name, 0, 15)
val newFile2 = diskBlockManager.getFile(blockId)
assert(newFile === newFile2)
writeToFile(newFile2, 12)
assertSegmentEquals(blockId, blockId.name, 0, 27)
newFile.delete()
}
test("block remapping") {
val filename = "test"
val blockId0 = new ShuffleBlockId(1, 2, 3)
val newFile = diskBlockManager.getFile(filename)
writeToFile(newFile, 15)
shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15)
assertSegmentEquals(blockId0, filename, 0, 15)
val blockId1 = new ShuffleBlockId(1, 2, 4)
val newFile2 = diskBlockManager.getFile(filename)
writeToFile(newFile2, 12)
shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12)
assertSegmentEquals(blockId1, filename, 15, 12)
assert(newFile === newFile2)
newFile.delete()
}
def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
val segment = diskBlockManager.getBlockLocation(blockId)
assert(segment.file.getName === filename)
assert(segment.offset === offset)
assert(segment.length === length)
}
def writeToFile(file: File, numBytes: Int) {
val writer = new FileWriter(file, true)
for (i <- 0 until numBytes) writer.write(i)
writer.close()
}
}

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.util.hash package org.apache.spark.util.collection
import org.scalatest.FunSuite import org.scalatest.FunSuite

View file

@ -1,4 +1,4 @@
package org.apache.spark.util.hash package org.apache.spark.util.collection
import scala.collection.mutable.HashSet import scala.collection.mutable.HashSet
import org.scalatest.FunSuite import org.scalatest.FunSuite
@ -82,7 +82,7 @@ class OpenHashMapSuite extends FunSuite {
test("null keys") { test("null keys") {
val map = new OpenHashMap[String, String]() val map = new OpenHashMap[String, String]()
for (i <- 1 to 100) { for (i <- 1 to 100) {
map("" + i) = "" + i map(i.toString) = i.toString
} }
assert(map.size === 100) assert(map.size === 100)
assert(map(null) === null) assert(map(null) === null)
@ -94,7 +94,7 @@ class OpenHashMapSuite extends FunSuite {
test("null values") { test("null values") {
val map = new OpenHashMap[String, String]() val map = new OpenHashMap[String, String]()
for (i <- 1 to 100) { for (i <- 1 to 100) {
map("" + i) = null map(i.toString) = null
} }
assert(map.size === 100) assert(map.size === 100)
assert(map("1") === null) assert(map("1") === null)
@ -108,12 +108,12 @@ class OpenHashMapSuite extends FunSuite {
test("changeValue") { test("changeValue") {
val map = new OpenHashMap[String, String]() val map = new OpenHashMap[String, String]()
for (i <- 1 to 100) { for (i <- 1 to 100) {
map("" + i) = "" + i map(i.toString) = i.toString
} }
assert(map.size === 100) assert(map.size === 100)
for (i <- 1 to 100) { for (i <- 1 to 100) {
val res = map.changeValue("" + i, { assert(false); "" }, v => { val res = map.changeValue(i.toString, { assert(false); "" }, v => {
assert(v === "" + i) assert(v === i.toString)
v + "!" v + "!"
}) })
assert(res === i + "!") assert(res === i + "!")
@ -121,7 +121,7 @@ class OpenHashMapSuite extends FunSuite {
// Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a // Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a
// bug where changeValue would return the wrong result when the map grew on that insert // bug where changeValue would return the wrong result when the map grew on that insert
for (i <- 101 to 400) { for (i <- 101 to 400) {
val res = map.changeValue("" + i, { i + "!" }, v => { assert(false); v }) val res = map.changeValue(i.toString, { i + "!" }, v => { assert(false); v })
assert(res === i + "!") assert(res === i + "!")
} }
assert(map.size === 400) assert(map.size === 400)
@ -138,11 +138,11 @@ class OpenHashMapSuite extends FunSuite {
test("inserting in capacity-1 map") { test("inserting in capacity-1 map") {
val map = new OpenHashMap[String, String](1) val map = new OpenHashMap[String, String](1)
for (i <- 1 to 100) { for (i <- 1 to 100) {
map("" + i) = "" + i map(i.toString) = i.toString
} }
assert(map.size === 100) assert(map.size === 100)
for (i <- 1 to 100) { for (i <- 1 to 100) {
assert(map("" + i) === "" + i) assert(map(i.toString) === i.toString)
} }
} }
} }

View file

@ -0,0 +1,145 @@
package org.apache.spark.util.collection
import org.scalatest.FunSuite
class OpenHashSetSuite extends FunSuite {
test("primitive int") {
val set = new OpenHashSet[Int]
assert(set.size === 0)
assert(!set.contains(10))
assert(!set.contains(50))
assert(!set.contains(999))
assert(!set.contains(10000))
set.add(10)
assert(set.contains(10))
assert(!set.contains(50))
assert(!set.contains(999))
assert(!set.contains(10000))
set.add(50)
assert(set.size === 2)
assert(set.contains(10))
assert(set.contains(50))
assert(!set.contains(999))
assert(!set.contains(10000))
set.add(999)
assert(set.size === 3)
assert(set.contains(10))
assert(set.contains(50))
assert(set.contains(999))
assert(!set.contains(10000))
set.add(50)
assert(set.size === 3)
assert(set.contains(10))
assert(set.contains(50))
assert(set.contains(999))
assert(!set.contains(10000))
}
test("primitive long") {
val set = new OpenHashSet[Long]
assert(set.size === 0)
assert(!set.contains(10L))
assert(!set.contains(50L))
assert(!set.contains(999L))
assert(!set.contains(10000L))
set.add(10L)
assert(set.size === 1)
assert(set.contains(10L))
assert(!set.contains(50L))
assert(!set.contains(999L))
assert(!set.contains(10000L))
set.add(50L)
assert(set.size === 2)
assert(set.contains(10L))
assert(set.contains(50L))
assert(!set.contains(999L))
assert(!set.contains(10000L))
set.add(999L)
assert(set.size === 3)
assert(set.contains(10L))
assert(set.contains(50L))
assert(set.contains(999L))
assert(!set.contains(10000L))
set.add(50L)
assert(set.size === 3)
assert(set.contains(10L))
assert(set.contains(50L))
assert(set.contains(999L))
assert(!set.contains(10000L))
}
test("non-primitive") {
val set = new OpenHashSet[String]
assert(set.size === 0)
assert(!set.contains(10.toString))
assert(!set.contains(50.toString))
assert(!set.contains(999.toString))
assert(!set.contains(10000.toString))
set.add(10.toString)
assert(set.size === 1)
assert(set.contains(10.toString))
assert(!set.contains(50.toString))
assert(!set.contains(999.toString))
assert(!set.contains(10000.toString))
set.add(50.toString)
assert(set.size === 2)
assert(set.contains(10.toString))
assert(set.contains(50.toString))
assert(!set.contains(999.toString))
assert(!set.contains(10000.toString))
set.add(999.toString)
assert(set.size === 3)
assert(set.contains(10.toString))
assert(set.contains(50.toString))
assert(set.contains(999.toString))
assert(!set.contains(10000.toString))
set.add(50.toString)
assert(set.size === 3)
assert(set.contains(10.toString))
assert(set.contains(50.toString))
assert(set.contains(999.toString))
assert(!set.contains(10000.toString))
}
test("non-primitive set growth") {
val set = new OpenHashSet[String]
for (i <- 1 to 1000) {
set.add(i.toString)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
for (i <- 1 to 100) {
set.add(i.toString)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
}
test("primitive set growth") {
val set = new OpenHashSet[Long]
for (i <- 1 to 1000) {
set.add(i.toLong)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
for (i <- 1 to 100) {
set.add(i.toLong)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
}
}

View file

@ -1,4 +1,4 @@
package org.apache.spark.util.hash package org.apache.spark.util.collection
import scala.collection.mutable.HashSet import scala.collection.mutable.HashSet
import org.scalatest.FunSuite import org.scalatest.FunSuite
@ -58,12 +58,12 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite {
test("changeValue") { test("changeValue") {
val map = new PrimitiveKeyOpenHashMap[Long, String]() val map = new PrimitiveKeyOpenHashMap[Long, String]()
for (i <- 1 to 100) { for (i <- 1 to 100) {
map(i.toLong) = "" + i map(i.toLong) = i.toString
} }
assert(map.size === 100) assert(map.size === 100)
for (i <- 1 to 100) { for (i <- 1 to 100) {
val res = map.changeValue(i.toLong, { assert(false); "" }, v => { val res = map.changeValue(i.toLong, { assert(false); "" }, v => {
assert(v === "" + i) assert(v === i.toString)
v + "!" v + "!"
}) })
assert(res === i + "!") assert(res === i + "!")
@ -80,11 +80,11 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite {
test("inserting in capacity-1 map") { test("inserting in capacity-1 map") {
val map = new PrimitiveKeyOpenHashMap[Long, String](1) val map = new PrimitiveKeyOpenHashMap[Long, String](1)
for (i <- 1 to 100) { for (i <- 1 to 100) {
map(i.toLong) = "" + i map(i.toLong) = i.toString
} }
assert(map.size === 100) assert(map.size === 100)
for (i <- 1 to 100) { for (i <- 1 to 100) {
assert(map(i.toLong) === "" + i) assert(map(i.toLong) === i.toString)
} }
} }
} }

View file

@ -1,74 +0,0 @@
package org.apache.spark.util.hash
import org.scalatest.FunSuite
class OpenHashSetSuite extends FunSuite {
test("primitive int") {
val set = new OpenHashSet[Int]
assert(set.size === 0)
set.add(10)
assert(set.size === 1)
set.add(50)
assert(set.size === 2)
set.add(999)
assert(set.size === 3)
set.add(50)
assert(set.size === 3)
}
test("primitive long") {
val set = new OpenHashSet[Long]
assert(set.size === 0)
set.add(10L)
assert(set.size === 1)
set.add(50L)
assert(set.size === 2)
set.add(999L)
assert(set.size === 3)
set.add(50L)
assert(set.size === 3)
}
test("non-primitive") {
val set = new OpenHashSet[String]
assert(set.size === 0)
set.add(10.toString)
assert(set.size === 1)
set.add(50.toString)
assert(set.size === 2)
set.add(999.toString)
assert(set.size === 3)
set.add(50.toString)
assert(set.size === 3)
}
test("non-primitive set growth") {
val set = new OpenHashSet[String]
for (i <- 1 to 1000) {
set.add(i.toString)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
for (i <- 1 to 100) {
set.add(i.toString)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
}
test("primitive set growth") {
val set = new OpenHashSet[Long]
for (i <- 1 to 1000) {
set.add(i.toLong)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
for (i <- 1 to 100) {
set.add(i.toLong)
}
assert(set.size === 1000)
assert(set.capacity > 1000)
}
}

View file

@ -57,6 +57,18 @@ which takes a list of JAR files (Java/Scala) or .egg and .zip libraries (Python)
worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar` worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar`
and `addFile`. and `addFile`.
## URIs for addJar / addFile
- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and every executor
pulls the file from the driver HTTP server
- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This
means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
or shared via NFS, GlusterFS, etc.
Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
Over time this can use up a significant amount of space and will need to be cleaned up.
# Monitoring # Monitoring
Each driver program has a web UI, typically on port 4040, that displays information about running Each driver program has a web UI, typically on port 4040, that displays information about running

View file

@ -98,7 +98,7 @@ permissions on your private key file, you can run `launch` with the
`bin/hadoop` script in that directory. Note that the data in this `bin/hadoop` script in that directory. Note that the data in this
HDFS goes away when you stop and restart a machine. HDFS goes away when you stop and restart a machine.
- There is also a *persistent HDFS* instance in - There is also a *persistent HDFS* instance in
`/root/presistent-hdfs` that will keep data across cluster restarts. `/root/persistent-hdfs` that will keep data across cluster restarts.
Typically each node has relatively little space of persistent data Typically each node has relatively little space of persistent data
(about 3 GB), but you can use the `--ebs-vol-size` option to (about 3 GB), but you can use the `--ebs-vol-size` option to
`spark-ec2` to attach a persistent EBS volume to each node for `spark-ec2` to attach a persistent EBS volume to each node for

View file

@ -73,7 +73,7 @@ def parse_args():
parser.add_option("-v", "--spark-version", default="0.8.0", parser.add_option("-v", "--spark-version", default="0.8.0",
help="Version of Spark to use: 'X.Y.Z' or a specific git hash") help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
parser.add_option("--spark-git-repo", parser.add_option("--spark-git-repo",
default="https://github.com/mesos/spark", default="https://github.com/apache/incubator-spark",
help="Github repo from which to checkout supplied commit hash") help="Github repo from which to checkout supplied commit hash")
parser.add_option("--hadoop-major-version", default="1", parser.add_option("--hadoop-major-version", default="1",
help="Major version of Hadoop (default: 1)") help="Major version of Hadoop (default: 1)")

View file

@ -21,6 +21,7 @@ import java.util.Random
import scala.math.exp import scala.math.exp
import org.apache.spark.util.Vector import org.apache.spark.util.Vector
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.InputFormatInfo import org.apache.spark.scheduler.InputFormatInfo
/** /**
@ -51,7 +52,7 @@ object SparkHdfsLR {
System.exit(1) System.exit(1)
} }
val inputPath = args(1) val inputPath = args(1)
val conf = SparkEnv.get.hadoop.newConfiguration() val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR", val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
InputFormatInfo.computePreferredLocations( InputFormatInfo.computePreferredLocations(

View file

@ -2,10 +2,9 @@ package org.apache.spark.graph
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl.MessageToPartition import org.apache.spark.graph.impl.{EdgePartition, MessageToPartition}
import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.graph.impl._ import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.hash.BitSet
class GraphKryoRegistrator extends KryoRegistrator { class GraphKryoRegistrator extends KryoRegistrator {

View file

@ -17,26 +17,11 @@
package org.apache.spark.graph package org.apache.spark.graph
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark.Partitioner._ import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.hash.BitSet import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.util.hash.OpenHashSet
import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
/** /**
@ -184,9 +169,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
(keysIter: Iterator[VertexIdToIndexMap], (keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(Int => V, BitSet)]) => valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next() val index = keysIter.next()
assert(keysIter.hasNext() == false) assert(keysIter.hasNext == false)
val (oldValues, bs) = valuesIter.next() val (oldValues, bs) = valuesIter.next()
assert(valuesIter.hasNext() == false) assert(valuesIter.hasNext == false)
// Allocate the array to store the results into // Allocate the array to store the results into
val newBS = new BitSet(index.capacity) val newBS = new BitSet(index.capacity)
// Iterate over the active bits in the old bitset and // Iterate over the active bits in the old bitset and
@ -246,9 +231,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
(keysIter: Iterator[VertexIdToIndexMap], (keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(Int => V, BitSet)]) => valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next() val index = keysIter.next()
assert(keysIter.hasNext() == false) assert(keysIter.hasNext == false)
val (oldValues, bs: BitSet) = valuesIter.next() val (oldValues, bs: BitSet) = valuesIter.next()
assert(valuesIter.hasNext() == false) assert(valuesIter.hasNext == false)
// Cosntruct a view of the map transformation // Cosntruct a view of the map transformation
val newValues: (Int => U) = (ind: Int) => { val newValues: (Int => U) = (ind: Int) => {
if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) } if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
@ -384,7 +369,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
// Get the location of the key in the index // Get the location of the key in the index
val pos = index.getPos(k) val pos = index.getPos(k)
// Only if the key is already in the index // Only if the key is already in the index
if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) { if ((pos & OpenHashSet.NONEXISTENCE_MASK) == 0) {
// Get the actual index // Get the actual index
val ind = pos & OpenHashSet.POSITION_MASK val ind = pos & OpenHashSet.POSITION_MASK
// If this value has already been seen then merge // If this value has already been seen then merge
@ -642,7 +627,7 @@ object VertexSetRDD {
* *
* @note duplicate vertices are discarded arbitrarily * @note duplicate vertices are discarded arbitrarily
* *
* @tparam the vertex attribute type * @tparam V the vertex attribute type
* @param rdd the rdd containing vertices * @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices * @param index the index which must be a superset of the vertices
* in RDD * in RDD
@ -656,7 +641,7 @@ object VertexSetRDD {
* Construct a vertex set from an RDD using an existing index and a * Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices. * user defined `combiner` to merge duplicate vertices.
* *
* @tparam the vertex attribute type * @tparam V the vertex attribute type
* @param rdd the rdd containing vertices * @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices * @param index the index which must be a superset of the vertices
* in RDD * in RDD
@ -673,7 +658,7 @@ object VertexSetRDD {
* Construct a vertex set from an RDD using an existing index and a * Construct a vertex set from an RDD using an existing index and a
* user defined `combiner` to merge duplicate vertices. * user defined `combiner` to merge duplicate vertices.
* *
* @tparam the vertex attribute type * @tparam V the vertex attribute type
* @param rdd the rdd containing vertices * @param rdd the rdd containing vertices
* @param index the index which must be a superset of the vertices * @param index the index which must be a superset of the vertices
* in RDD * in RDD
@ -710,13 +695,13 @@ object VertexSetRDD {
val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map // There is only one map
val index = indexIter.next() val index = indexIter.next()
assert(!indexIter.hasNext()) assert(!indexIter.hasNext)
val values = new Array[C](index.capacity) val values = new Array[C](index.capacity)
val bs = new BitSet(index.capacity) val bs = new BitSet(index.capacity)
for ((k,c) <- tblIter) { for ((k,c) <- tblIter) {
// Get the location of the key in the index // Get the location of the key in the index
val pos = index.getPos(k) val pos = index.getPos(k)
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
throw new SparkException("Error: Trying to bind an external index " + throw new SparkException("Error: Trying to bind an external index " +
"to an RDD which contains keys that are not in the index.") "to an RDD which contains keys that are not in the index.")
} else { } else {

View file

@ -4,26 +4,17 @@ import scala.collection.JavaConversions._
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner import org.apache.spark.HashPartitioner
import org.apache.spark.util.ClosureCleaner import org.apache.spark.util.ClosureCleaner
import org.apache.spark.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.graph._ import org.apache.spark.graph._
import org.apache.spark.graph.impl.GraphImpl._ import org.apache.spark.graph.impl.GraphImpl._
import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._ import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.hash.BitSet import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.util.hash.OpenHashSet
import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
/** /**

View file

@ -1,8 +1,6 @@
package org.apache.spark package org.apache.spark
import org.apache.spark.util.hash.BitSet import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.util.hash.OpenHashSet
import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
package object graph { package object graph {