diff --git a/README.md b/README.md index ba31ed586d..7790139c8f 100644 --- a/README.md +++ b/README.md @@ -52,9 +52,9 @@ to interactively load, transform, and compute on massive graphs. ## Examples -Suppose I want to build a graph from some text files, restrict the graph +Suppose I want to build a graph from some text files, restrict the graph to important relationships and users, run page-rank on the sub-graph, and -then finally return attributes associated with the top users. I can do +then finally return attributes associated with the top users. I can do all of this in just a few lines with GraphX: ```scala @@ -69,16 +69,16 @@ val users = sc.textFile("hdfs://user_attributes.tsv") val followerGraph = Graph.textFile(sc, "hdfs://followers.tsv") // Attach the user attributes -val graph = followerGraph.outerJoinVertices(users){ +val graph = followerGraph.outerJoinVertices(users){ case (uid, deg, Some(attrList)) => attrList // Some users may not have attributes so we set them as empty - case (uid, deg, None) => Array.empty[String] + case (uid, deg, None) => Array.empty[String] } // Restrict the graph to users which have exactly two attributes val subgraph = graph.subgraph((vid, attr) => attr.size == 2) -// Compute the PageRank +// Compute the PageRank val pagerankGraph = Analytics.pagerank(subgraph) // Get the attributes of the top pagerank users @@ -86,7 +86,7 @@ val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices){ case (uid, attrList, Some(pr)) => (pr, attrList) case (uid, attrList, None) => (pr, attrList) } - + println(userInfoWithPageRank.top(5)) ``` @@ -160,10 +160,10 @@ with YARN, also set `SPARK_YARN=true`: For convenience, these variables may also be set through the `conf/spark-env.sh` file described below. -When developing a Spark application, specify the Hadoop version by -adding the "hadoop-client" artifact to your project's -dependencies. For example, if you're using Hadoop 1.0.1 and build your -application using SBT, add this entry to `libraryDependencies`: +When developing a Spark application, specify the Hadoop version by adding the +"hadoop-client" artifact to your project's dependencies. For example, if you're +using Hadoop 1.2.1 and build your application using SBT, add this entry to +`libraryDependencies`: "org.apache.hadoop" % "hadoop-client" % "1.2.1" diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 158197ae4d..880b49e8ef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.deploy.LocalSparkCluster +import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -245,7 +245,7 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { val env = SparkEnv.get - val conf = env.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { @@ -255,8 +255,10 @@ class SparkContext( conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) { - conf.set(key.substring("spark.hadoop.".length), System.getProperty(key)) + Utils.getSystemProperties.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + conf.set(key.substring("spark.hadoop.".length), value) + } } val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize) @@ -379,7 +381,7 @@ class SparkContext( minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. - SparkEnv.get.hadoop.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -589,7 +591,8 @@ class SparkContext( val uri = new URI(path) val key = uri.getScheme match { case null | "file" => env.httpFileServer.addFile(new File(uri.getPath)) - case _ => path + case "local" => "file:" + uri.getPath + case _ => path } addedFiles(key) = System.currentTimeMillis @@ -698,7 +701,7 @@ class SparkContext( key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => - if (env.hadoop.isYarnMode()) { + if (SparkHadoopUtil.get.isYarnMode()) { // In order for this to work on yarn the user must specify the --addjars option to // the client to upload the file into the distributed cache to make it show up in the // current working directory. @@ -936,9 +939,8 @@ class SparkContext( * prevent accidental overriding of checkpoint files in the existing directory. */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { - val env = SparkEnv.get val path = new Path(dir) - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index aaab717bcf..ff2df8fb6a 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -25,13 +25,13 @@ import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster} import org.apache.spark.network.ConnectionManager import org.apache.spark.serializer.{Serializer, SerializerManager} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.api.python.PythonWorkerFactory +import com.google.common.collect.MapMaker /** * Holds all the runtime environment objects for a running Spark instance (either master or worker), @@ -58,18 +58,9 @@ class SparkEnv ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() - val hadoop = { - val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if(yarnMode) { - try { - Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] - } catch { - case th: Throwable => throw new SparkException("Unable to load YARN support", th) - } - } else { - new SparkHadoopUtil - } - } + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 83cd3df5fa..6bc846aa92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -20,17 +20,13 @@ package org.apache.spark.deploy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf -import com.google.common.collect.MapMaker - +import org.apache.spark.SparkException /** * Contains util methods to interact with Hadoop from Spark. */ private[spark] class SparkHadoopUtil { - // A general, soft-reference map for metadata needed during HadoopRDD split computation - // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). - private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop @@ -45,5 +41,23 @@ class SparkHadoopUtil { def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } - +} + +object SparkHadoopUtil { + private val hadoop = { + val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if (yarnMode) { + try { + Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + } catch { + case th: Throwable => throw new SparkException("Unable to load YARN support", th) + } + } else { + new SparkHadoopUtil + } + } + + def get: SparkHadoopUtil = { + hadoop + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index ccaaecb85b..d3033ea4a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.rdd import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{NullWritable, BytesWritable} @@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(env.hadoop.newConfiguration()) + val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration()) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging { def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() @@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index fad042c7ae..32901a508f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} @@ -198,10 +199,10 @@ private[spark] object HadoopRDD { * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ - def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key) - def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key) def putCachedMetadata(key: String, value: Any) = - SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) + SparkEnv.get.hadoopJobMetadata.put(key, value) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 370ccd183c..1791ee660d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.deploy.SparkHadoopUtil import scala.collection.immutable.Set import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.hadoop.security.UserGroupInformation @@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { - val env = SparkEnv.get val conf = new JobConf(configuration) - env.hadoop.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(conf) FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { - val env = SparkEnv.get val jobConf = new JobConf(configuration) - env.hadoop.addCredentials(jobConf) + SparkHadoopUtil.get.addCredentials(jobConf) FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 24d97da6eb..1dc71a0428 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -146,26 +146,26 @@ private[spark] class ShuffleMapTask( metrics = Some(context.taskMetrics) val blockManager = SparkEnv.get.blockManager - var shuffle: ShuffleBlocks = null - var buckets: ShuffleWriterGroup = null + val shuffleBlockManager = blockManager.shuffleBlockManager + var shuffle: ShuffleWriterGroup = null + var success = false try { // Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) - shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) - buckets = shuffle.acquireWriters(partitionId) + shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser) // Write the map output to its associated buckets. for (elem <- rdd.iterator(split, context)) { val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) - buckets.writers(bucketId).write(pair) + shuffle.writers(bucketId).write(pair) } // Commit the writes. Get the size of each bucket block (total block size). var totalBytes = 0L var totalTime = 0L - val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => + val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() val size = writer.fileSegment().length totalBytes += size @@ -179,19 +179,20 @@ private[spark] class ShuffleMapTask( shuffleMetrics.shuffleWriteTime = totalTime metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) + success = true new MapStatus(blockManager.blockManagerId, compressedSizes) } catch { case e: Exception => // If there is an exception from running the task, revert the partial writes // and throw the exception upstream to Spark. - if (buckets != null) { - buckets.writers.foreach(_.revertPartialWrites()) + if (shuffle != null) { + shuffle.writers.foreach(_.revertPartialWrites()) } throw e } finally { // Release the writers back to the shuffle block manager. - if (shuffle != null && buckets != null) { - buckets.writers.foreach(_.close()) - shuffle.releaseWriters(buckets) + if (shuffle != null && shuffle.writers != null) { + shuffle.writers.foreach(_.close()) + shuffle.releaseWriters(success) } // Execute the callbacks on task completion. context.executeOnCompleteCallbacks() diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala index 55f8313e87..53bf78267e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala @@ -175,7 +175,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas reason.className, reason.description, locs.mkString("\n"))) if (numFailures(index) > MAX_TASK_FAILURES) { val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format( - taskSet.id, index, 4, reason.description) + taskSet.id, index, MAX_TASK_FAILURES, reason.description) decreaseRunningTasks(runningTasks) sched.dagScheduler.taskSetFailed(taskSet, errorMessage) // need to delete failed Taskset from schedule queue diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 76d537f8e8..fbedfbc446 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{InputStream, OutputStream} +import java.io.{File, InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{HashMap, ArrayBuffer} @@ -47,7 +47,7 @@ private[spark] class BlockManager( extends Logging { val shuffleBlockManager = new ShuffleBlockManager(this) - val diskBlockManager = new DiskBlockManager( + val diskBlockManager = new DiskBlockManager(shuffleBlockManager, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] @@ -462,15 +462,11 @@ private[spark] class BlockManager( * This is currently used for writing shuffle files out. Callers should handle error * cases. */ - def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int) + def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { - if (shuffleBlockManager.consolidateShuffleFiles) { - diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) - } val myInfo = new ShuffleBlockInfo() blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 32d2dd0694..e49c191c70 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -78,11 +78,11 @@ abstract class BlockObjectWriter(val blockId: BlockId) { /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ class DiskBlockObjectWriter( - blockId: BlockId, - file: File, - serializer: Serializer, - bufferSize: Int, - compressStream: OutputStream => OutputStream) + blockId: BlockId, + file: File, + serializer: Serializer, + bufferSize: Int, + compressStream: OutputStream => OutputStream) extends BlockObjectWriter(blockId) with Logging { @@ -111,8 +111,8 @@ class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null - private var initialPosition = 0L - private var lastValidPosition = 0L + private val initialPosition = file.length() + private var lastValidPosition = initialPosition private var initialized = false private var _timeWriting = 0L @@ -120,7 +120,6 @@ class DiskBlockObjectWriter( fos = new FileOutputStream(file, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() - initialPosition = channel.position lastValidPosition = initialPosition bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index bcb58ad946..fcd2e97982 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -20,12 +20,11 @@ package org.apache.spark.storage import java.io.File import java.text.SimpleDateFormat import java.util.{Date, Random} -import java.util.concurrent.ConcurrentHashMap import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import org.apache.spark.util.Utils /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -35,7 +34,8 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH * * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ -private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { +private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) + extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt @@ -47,54 +47,23 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null - // Stores only Blocks which have been specifically mapped to segments of files - // (rather than the default, which maps a Block to a whole file). - // This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks. - private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment] - - val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup) - addShutdownHook() - /** - * Creates a logical mapping from the given BlockId to a segment of a file. - * This will cause any accesses of the logical BlockId to be directed to the specified - * physical location. - */ - def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) { - blockToFileSegmentMap.put(blockId, fileSegment) - } - /** * Returns the phyiscal file segment in which the given BlockId is located. * If the BlockId has been mapped to a specific FileSegment, that will be returned. * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. */ def getBlockLocation(blockId: BlockId): FileSegment = { - if (blockToFileSegmentMap.internalMap.containsKey(blockId)) { - blockToFileSegmentMap.get(blockId).get + if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) { + shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) } else { val file = getFile(blockId.name) new FileSegment(file, 0, file.length()) } } - /** - * Simply returns a File to place the given Block into. This does not physically create the file. - * If filename is given, that file will be used. Otherwise, we will use the BlockId to get - * a unique filename. - */ - def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = { - val actualFilename = if (filename == "") blockId.name else filename - val file = getFile(actualFilename) - if (!allowAppending && file.exists()) { - throw new IllegalStateException( - "Attempted to create file that already exists: " + actualFilename) - } - file - } - - private def getFile(filename: String): File = { + def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -119,6 +88,8 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit new File(subDir, filename) } + def getFile(blockId: BlockId): File = getFile(blockId.name) + private def createLocalDirs(): Array[File] = { logDebug("Creating local directories at root dirs '" + rootDirs + "'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") @@ -151,10 +122,6 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit } } - private def cleanup(cleanupTime: Long) { - blockToFileSegmentMap.clearOldValues(cleanupTime) - } - private def addShutdownHook() { localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index a3c496f9e0..5a1e7b4444 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -44,7 +44,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val bytes = _bytes.duplicate() logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis - val file = diskManager.createBlockFile(blockId, allowAppending = false) + val file = diskManager.getFile(blockId) val channel = new FileOutputStream(file).getChannel() while (bytes.remaining > 0) { channel.write(bytes) @@ -64,7 +64,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis - val file = diskManager.createBlockFile(blockId, allowAppending = false) + val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) blockManager.dataSerializeStream(blockId, outputStream, values.iterator) val length = file.length diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 066e45a12b..2f1b049ce4 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -17,33 +17,45 @@ package org.apache.spark.storage +import java.io.File import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConversions._ + import org.apache.spark.serializer.Serializer +import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} +import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} +import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup -private[spark] -class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter]) +/** A group of writers for a ShuffleMapTask, one writer per reducer. */ +private[spark] trait ShuffleWriterGroup { + val writers: Array[BlockObjectWriter] -private[spark] -trait ShuffleBlocks { - def acquireWriters(mapId: Int): ShuffleWriterGroup - def releaseWriters(group: ShuffleWriterGroup) + /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ + def releaseWriters(success: Boolean) } /** - * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer - * per reducer. + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file + * per reducer (this set of files is called a ShuffleFileGroup). * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer - * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, - * it releases them for another task. + * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle + * files, it releases them for another task. * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: * - shuffleId: The unique id given to the entire shuffle stage. * - bucketId: The id of the output partition (i.e., reducer id) * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a * time owns a particular fileId, and this id is returned to a pool when the task finishes. + * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length) + * that specifies where in a given file the actual block data is located. + * + * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping + * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for + * each block stored in each file. In order to find the location of a shuffle block, we search the + * files within a ShuffleFileGroups associated with the block's reducer. */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) { @@ -52,45 +64,152 @@ class ShuffleBlockManager(blockManager: BlockManager) { val consolidateShuffleFiles = System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean - var nextFileId = new AtomicInteger(0) - val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]() + private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = { - new ShuffleBlocks { - // Get a group of writers for a map task. - override def acquireWriters(mapId: Int): ShuffleWriterGroup = { - val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - val fileId = getUnusedFileId() - val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + /** + * Contains all the state related to a particular shuffle. This includes a pool of unused + * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle. + */ + private class ShuffleState() { + val nextFileId = new AtomicInteger(0) + val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + } + + type ShuffleId = Int + private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] + + private + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) + + def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { + new ShuffleWriterGroup { + shuffleStates.putIfAbsent(shuffleId, new ShuffleState()) + private val shuffleState = shuffleStates(shuffleId) + private var fileGroup: ShuffleFileGroup = null + + val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { + fileGroup = getUnusedFileGroup() + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - if (consolidateShuffleFiles) { - val filename = physicalFileName(shuffleId, bucketId, fileId) - blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) - } else { - blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) - } + blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize) + } + } else { + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) + val blockFile = blockManager.diskBlockManager.getFile(blockId) + blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } - new ShuffleWriterGroup(mapId, fileId, writers) } - override def releaseWriters(group: ShuffleWriterGroup) { - recycleFileId(group.fileId) + override def releaseWriters(success: Boolean) { + if (consolidateShuffleFiles) { + if (success) { + val offsets = writers.map(_.fileSegment().offset) + fileGroup.recordMapOutput(mapId, offsets) + } + recycleFileGroup(fileGroup) + } + } + + private def getUnusedFileGroup(): ShuffleFileGroup = { + val fileGroup = shuffleState.unusedFileGroups.poll() + if (fileGroup != null) fileGroup else newFileGroup() + } + + private def newFileGroup(): ShuffleFileGroup = { + val fileId = shuffleState.nextFileId.getAndIncrement() + val files = Array.tabulate[File](numBuckets) { bucketId => + val filename = physicalFileName(shuffleId, bucketId, fileId) + blockManager.diskBlockManager.getFile(filename) + } + val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files) + shuffleState.allFileGroups.add(fileGroup) + fileGroup + } + + private def recycleFileGroup(group: ShuffleFileGroup) { + shuffleState.unusedFileGroups.add(group) } } } - private def getUnusedFileId(): Int = { - val fileId = unusedFileIds.poll() - if (fileId == null) nextFileId.getAndIncrement() else fileId - } - - private def recycleFileId(fileId: Int) { - if (consolidateShuffleFiles) { - unusedFileIds.add(fileId) + /** + * Returns the physical file segment in which the given BlockId is located. + * This function should only be called if shuffle file consolidation is enabled, as it is + * an error condition if we don't find the expected block. + */ + def getBlockLocation(id: ShuffleBlockId): FileSegment = { + // Search all file groups associated with this shuffle. + val shuffleState = shuffleStates(id.shuffleId) + for (fileGroup <- shuffleState.allFileGroups) { + val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) + if (segment.isDefined) { return segment.get } } + throw new IllegalStateException("Failed to find shuffle block: " + id) } private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) } + + private def cleanup(cleanupTime: Long) { + shuffleStates.clearOldValues(cleanupTime) + } +} + +private[spark] +object ShuffleBlockManager { + /** + * A group of shuffle files, one per reducer. + * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. + */ + private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) { + /** + * Stores the absolute index of each mapId in the files of this group. For instance, + * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. + */ + private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() + + /** + * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file. + * This ordering allows us to compute block lengths by examining the following block offset. + * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every + * reducer. + */ + private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { + new PrimitiveVector[Long]() + } + + def numBlocks = mapIdToIndex.size + + def apply(bucketId: Int) = files(bucketId) + + def recordMapOutput(mapId: Int, offsets: Array[Long]) { + mapIdToIndex(mapId) = numBlocks + for (i <- 0 until offsets.length) { + blockOffsetsByReducer(i) += offsets(i) + } + } + + /** Returns the FileSegment associated with the given map task, or None if no entry exists. */ + def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = { + val file = files(reducerId) + val blockOffsets = blockOffsetsByReducer(reducerId) + val index = mapIdToIndex.getOrElse(mapId, -1) + if (index >= 0) { + val offset = blockOffsets(index) + val length = + if (index + 1 < numBlocks) { + blockOffsets(index + 1) - offset + } else { + file.length() - offset + } + assert(length >= 0) + Some(new FileSegment(file, offset, length)) + } else { + None + } + } + } } diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala index 7dcadc3805..1e4db4f66b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala @@ -38,19 +38,19 @@ object StoragePerfTester { val blockManager = sc.env.blockManager def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits, + val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, new KryoSerializer()) - val buckets = shuffle.acquireWriters(mapId) + val writers = shuffle.writers for (i <- 1 to recordsPerMap) { - buckets.writers(i % numOutputSplits).write(writeData) + writers(i % numOutputSplits).write(writeData) } - buckets.writers.map {w => + writers.map {w => w.commit() total.addAndGet(w.fileSegment().length) w.close() } - shuffle.releaseWriters(buckets) + shuffle.releaseWriters(true) } val start = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 3f963727d9..67a7f87a5c 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -59,7 +59,7 @@ object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") { val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, - SHUFFLE_MAP_TASK, BLOCK_MANAGER, DISK_BLOCK_MANAGER, BROADCAST_VARS = Value + SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value type MetadataCleanerType = Value diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a3b3968c5e..fe932d8ede 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,13 +18,12 @@ package org.apache.spark.util import java.io._ -import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} +import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} import java.util.{Locale, Random, UUID} -import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import java.util.regex.Pattern +import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.Map -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.io.Source @@ -36,7 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.deploy.SparkHadoopUtil import java.nio.ByteBuffer -import org.apache.spark.{SparkEnv, SparkException, Logging} +import org.apache.spark.{SparkException, Logging} /** @@ -148,7 +147,7 @@ private[spark] object Utils extends Logging { return buf } - private val shutdownDeletePaths = new collection.mutable.HashSet[String]() + private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { @@ -280,9 +279,8 @@ private[spark] object Utils extends Logging { } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others - val env = SparkEnv.get val uri = new URI(url) - val conf = env.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) @@ -819,4 +817,10 @@ private[spark] object Utils extends Logging { // Nothing else to guard against ? hashAbs } + + /** Returns a copy of the system properties that is thread-safe to iterator over. */ + def getSystemProperties(): Map[String, String] = { + return System.getProperties().clone() + .asInstanceOf[java.util.Properties].toMap[String, String] + } } diff --git a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala similarity index 93% rename from core/src/main/scala/org/apache/spark/util/hash/BitSet.scala rename to core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 69b10566f3..5e264b48dd 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -57,10 +57,10 @@ class BitSet(numBits: Int) { assert(newBS.numWords >= numWords) assert(newBS.numWords >= other.numWords) var ind = 0 - while( ind < smaller ) { + while( ind < smaller ) { newBS.words(ind) = words(ind) & other.words(ind) ind += 1 - } + } newBS } @@ -75,18 +75,18 @@ class BitSet(numBits: Int) { assert(newBS.numWords >= other.numWords) val smaller = math.min(numWords, other.numWords) var ind = 0 - while( ind < smaller ) { + while( ind < smaller ) { newBS.words(ind) = words(ind) | other.words(ind) ind += 1 } - while( ind < numWords ) { + while( ind < numWords ) { newBS.words(ind) = words(ind) ind += 1 - } - while( ind < other.numWords ) { + } + while( ind < other.numWords ) { newBS.words(ind) = other.words(ind) ind += 1 - } + } newBS } @@ -110,7 +110,7 @@ class BitSet(numBits: Int) { */ def get(index: Int): Boolean = { val bitmask = 1L << (index & 0x3f) // mod 64 and shift - (words(index >>> 6) & bitmask) != 0 // div by 64 and mask + (words(index >> 6) & bitmask) != 0 // div by 64 and mask } @@ -181,5 +181,5 @@ class BitSet(numBits: Int) { /** Return the number of longs it would take to hold numBits. */ - private def bit2words(numBits: Int) = ((numBits - 1) >>> 6) + 1 + private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1 } diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala similarity index 93% rename from core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala rename to core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index e53551ced6..1e9faaa5a0 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -34,7 +34,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: /** * Allocate an OpenHashMap with a fixed initial capacity */ - def this(initialCapacity: Int = 64) = + def this(initialCapacity: Int = 64) = this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity)) /** @@ -42,7 +42,6 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: */ def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity)) - @transient private var _oldValues: Array[V] = null // Treat the null key differently so we can use nulls in "data" to represent empty items. @@ -71,7 +70,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: haveNullValue = true nullValue = v } else { - val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK _values(pos) = v keySet.rehashIfNeeded(k, grow, move) _oldValues = null @@ -88,7 +87,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: nullValue = v } } else { - val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK _values(pos) = mergeF(_values(pos), v) keySet.rehashIfNeeded(k, grow, move) _oldValues = null @@ -111,8 +110,8 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: } nullValue } else { - val pos = keySet.fastAdd(k) - if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { + val pos = keySet.addWithoutResize(k) + if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue keySet.rehashIfNeeded(k, grow, move) diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala similarity index 87% rename from core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala rename to core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index d083ab26ac..f8d54a8f73 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -43,6 +43,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") require(initialCapacity >= 1, "Invalid initial capacity") + require(loadFactor < 1.0, "Load factor must be less than 1.0") + require(loadFactor > 0.0, "Load factor must be greater than 0.0") import OpenHashSet._ @@ -78,11 +80,15 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( protected var _mask = _capacity - 1 protected var _size = 0 - protected var _data = classManifest[T].newArray(_capacity) protected var _bitset = new BitSet(_capacity) def getBitSet = _bitset + // Init of the array in constructor (instead of in declaration) to work around a Scala compiler + // specialization bug that would generate two arrays (one for Object and one for specialized T). + protected var _data: Array[T] = _ + _data = new Array[T](_capacity) + /** Number of elements in the set. */ def size: Int = _size @@ -97,7 +103,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * and rehash all elements. */ def add(k: T) { - fastAdd(k) + addWithoutResize(k) rehashIfNeeded(k, grow, move) } @@ -111,14 +117,14 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * @return The position where the key is placed, plus the highest order bit is set if the key * exists previously. */ - def fastAdd(k: T): Int = putInto(_bitset, _data, k) + def addWithoutResize(k: T): Int = putInto(_bitset, _data, k) /** * Rehash the set if it is overloaded. * @param k A parameter unused in the function, but to force the Scala compiler to specialize * this method. - * @param allocateFunc Closure invoked when we are allocating a new, larger array. - * @param moveFunc Closure invoked when we move the key from one position (in the old data array) + * @param allocateFunc Callback invoked when we are allocating a new, larger array. + * @param moveFunc Callback invoked when we move the key from one position (in the old data array) * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { @@ -127,7 +133,9 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( } } - /** Return the position of the element in the underlying array. */ + /** + * Return the position of the element in the underlying array, or INVALID_POS if it is not found. + */ def getPos(k: T): Int = { var pos = hashcode(hasher.hash(k)) & _mask var i = 1 @@ -163,7 +171,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( /** * Put an entry into the set. Return the position where the key is placed. In addition, the - * highest bid in the returned position is set if the key exists prior to this put. + * highest bit in the returned position is set if the key exists prior to this put. * * This function assumes the data array has at least one empty slot. */ @@ -177,7 +185,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( data(pos) = k bitset.set(pos) _size += 1 - return pos | EXISTENCE_MASK + return pos | NONEXISTENCE_MASK } else if (data(pos) == k) { // Found an existing key. return pos @@ -199,8 +207,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * * @param k A parameter unused in the function, but to force the Scala compiler to specialize * this method. - * @param allocateFunc Closure invoked when we are allocating a new, larger array. - * @param moveFunc Closure invoked when we move the key from one position (in the old data array) + * @param allocateFunc Callback invoked when we are allocating a new, larger array. + * @param moveFunc Callback invoked when we move the key from one position (in the old data array) * to a new position (in the new data array). */ private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { @@ -208,7 +216,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") allocateFunc(newCapacity) - val newData = classManifest[T].newArray(newCapacity) + val newData = new Array[T](newCapacity) val newBitset = new BitSet(newCapacity) var pos = 0 _size = 0 @@ -245,9 +253,7 @@ private[spark] object OpenHashSet { val INVALID_POS = -1 - - val EXISTENCE_MASK = 0x80000000 - + val NONEXISTENCE_MASK = 0x80000000 val POSITION_MASK = 0xEFFFFFF /** diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala similarity index 87% rename from core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala rename to core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index 08fc74e5da..987077dd8a 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -35,7 +35,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, /** * Allocate an OpenHashMap with a fixed initial capacity */ - def this(initialCapacity: Int = 64) = + def this(initialCapacity: Int = 64) = this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity)) /** @@ -55,9 +55,15 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, _values(pos) } + /** Get the value for a given key, or returns elseValue if it doesn't exist. */ + def getOrElse(k: K, elseValue: V): V = { + val pos = keySet.getPos(k) + if (pos >= 0) _values(pos) else elseValue + } + /** Set the value for a key */ def update(k: K, v: V) { - val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK _values(pos) = v keySet.rehashIfNeeded(k, grow, move) _oldValues = null @@ -66,9 +72,9 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, /** Set the value for a key */ def setMerge(k: K, v: V, mergeF: (V,V) => V) { - val pos = keySet.fastAdd(k) + val pos = keySet.addWithoutResize(k) val ind = pos & OpenHashSet.POSITION_MASK - if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { // if first add + if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add _values(ind) = v } else { _values(ind) = mergeF(_values(ind), v) @@ -85,8 +91,8 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, * @return the newly updated value. */ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = { - val pos = keySet.fastAdd(k) - if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { + val pos = keySet.addWithoutResize(k) + if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue keySet.rehashIfNeeded(k, grow, move) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala new file mode 100644 index 0000000000..369519c559 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */ +private[spark] +class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) { + private var numElements = 0 + private var array: Array[V] = _ + + // NB: This must be separate from the declaration, otherwise the specialized parent class + // will get its own array with the same initial size. TODO: Figure out why... + array = new Array[V](initialSize) + + def apply(index: Int): V = { + require(index < numElements) + array(index) + } + + def +=(value: V) { + if (numElements == array.length) { resize(array.length * 2) } + array(numElements) = value + numElements += 1 + } + + def length = numElements + + def getUnderlyingArray = array + + /** Resizes the array, dropping elements if the total length decreases. */ + def resize(newLength: Int) { + val newArray = new Array[V](newLength) + array.copyToArray(newArray) + array = newArray + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala new file mode 100644 index 0000000000..0b9056344c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -0,0 +1,84 @@ +package org.apache.spark.storage + +import java.io.{FileWriter, File} + +import scala.collection.mutable + +import com.google.common.io.Files +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { + + val rootDir0 = Files.createTempDir() + rootDir0.deleteOnExit() + val rootDir1 = Files.createTempDir() + rootDir1.deleteOnExit() + val rootDirs = rootDir0.getName + "," + rootDir1.getName + println("Created root dirs: " + rootDirs) + + val shuffleBlockManager = new ShuffleBlockManager(null) { + var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() + override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) + } + + var diskBlockManager: DiskBlockManager = _ + + override def beforeEach() { + diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs) + shuffleBlockManager.idToSegmentMap.clear() + } + + test("basic block creation") { + val blockId = new TestBlockId("test") + assertSegmentEquals(blockId, blockId.name, 0, 0) + + val newFile = diskBlockManager.getFile(blockId) + writeToFile(newFile, 10) + assertSegmentEquals(blockId, blockId.name, 0, 10) + + newFile.delete() + } + + test("block appending") { + val blockId = new TestBlockId("test") + val newFile = diskBlockManager.getFile(blockId) + writeToFile(newFile, 15) + assertSegmentEquals(blockId, blockId.name, 0, 15) + val newFile2 = diskBlockManager.getFile(blockId) + assert(newFile === newFile2) + writeToFile(newFile2, 12) + assertSegmentEquals(blockId, blockId.name, 0, 27) + newFile.delete() + } + + test("block remapping") { + val filename = "test" + val blockId0 = new ShuffleBlockId(1, 2, 3) + val newFile = diskBlockManager.getFile(filename) + writeToFile(newFile, 15) + shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15) + assertSegmentEquals(blockId0, filename, 0, 15) + + val blockId1 = new ShuffleBlockId(1, 2, 4) + val newFile2 = diskBlockManager.getFile(filename) + writeToFile(newFile2, 12) + shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12) + assertSegmentEquals(blockId1, filename, 15, 12) + + assert(newFile === newFile2) + newFile.delete() + } + + def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { + val segment = diskBlockManager.getBlockLocation(blockId) + assert(segment.file.getName === filename) + assert(segment.offset === offset) + assert(segment.length === length) + } + + def writeToFile(file: File, numBytes: Int) { + val writer = new FileWriter(file, true) + for (i <- 0 until numBytes) writer.write(i) + writer.close() + } +} diff --git a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala similarity index 98% rename from core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala rename to core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala index 41ede860d2..0f1ab3d20e 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala similarity index 89% rename from core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala rename to core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala index 355784da32..ca3f684668 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite @@ -82,7 +82,7 @@ class OpenHashMapSuite extends FunSuite { test("null keys") { val map = new OpenHashMap[String, String]() for (i <- 1 to 100) { - map("" + i) = "" + i + map(i.toString) = i.toString } assert(map.size === 100) assert(map(null) === null) @@ -94,7 +94,7 @@ class OpenHashMapSuite extends FunSuite { test("null values") { val map = new OpenHashMap[String, String]() for (i <- 1 to 100) { - map("" + i) = null + map(i.toString) = null } assert(map.size === 100) assert(map("1") === null) @@ -108,12 +108,12 @@ class OpenHashMapSuite extends FunSuite { test("changeValue") { val map = new OpenHashMap[String, String]() for (i <- 1 to 100) { - map("" + i) = "" + i + map(i.toString) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { - val res = map.changeValue("" + i, { assert(false); "" }, v => { - assert(v === "" + i) + val res = map.changeValue(i.toString, { assert(false); "" }, v => { + assert(v === i.toString) v + "!" }) assert(res === i + "!") @@ -121,7 +121,7 @@ class OpenHashMapSuite extends FunSuite { // Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a // bug where changeValue would return the wrong result when the map grew on that insert for (i <- 101 to 400) { - val res = map.changeValue("" + i, { i + "!" }, v => { assert(false); v }) + val res = map.changeValue(i.toString, { i + "!" }, v => { assert(false); v }) assert(res === i + "!") } assert(map.size === 400) @@ -138,11 +138,11 @@ class OpenHashMapSuite extends FunSuite { test("inserting in capacity-1 map") { val map = new OpenHashMap[String, String](1) for (i <- 1 to 100) { - map("" + i) = "" + i + map(i.toString) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { - assert(map("" + i) === "" + i) + assert(map(i.toString) === i.toString) } } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala new file mode 100644 index 0000000000..4e11e8a628 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -0,0 +1,145 @@ +package org.apache.spark.util.collection + +import org.scalatest.FunSuite + + +class OpenHashSetSuite extends FunSuite { + + test("primitive int") { + val set = new OpenHashSet[Int] + assert(set.size === 0) + assert(!set.contains(10)) + assert(!set.contains(50)) + assert(!set.contains(999)) + assert(!set.contains(10000)) + + set.add(10) + assert(set.contains(10)) + assert(!set.contains(50)) + assert(!set.contains(999)) + assert(!set.contains(10000)) + + set.add(50) + assert(set.size === 2) + assert(set.contains(10)) + assert(set.contains(50)) + assert(!set.contains(999)) + assert(!set.contains(10000)) + + set.add(999) + assert(set.size === 3) + assert(set.contains(10)) + assert(set.contains(50)) + assert(set.contains(999)) + assert(!set.contains(10000)) + + set.add(50) + assert(set.size === 3) + assert(set.contains(10)) + assert(set.contains(50)) + assert(set.contains(999)) + assert(!set.contains(10000)) + } + + test("primitive long") { + val set = new OpenHashSet[Long] + assert(set.size === 0) + assert(!set.contains(10L)) + assert(!set.contains(50L)) + assert(!set.contains(999L)) + assert(!set.contains(10000L)) + + set.add(10L) + assert(set.size === 1) + assert(set.contains(10L)) + assert(!set.contains(50L)) + assert(!set.contains(999L)) + assert(!set.contains(10000L)) + + set.add(50L) + assert(set.size === 2) + assert(set.contains(10L)) + assert(set.contains(50L)) + assert(!set.contains(999L)) + assert(!set.contains(10000L)) + + set.add(999L) + assert(set.size === 3) + assert(set.contains(10L)) + assert(set.contains(50L)) + assert(set.contains(999L)) + assert(!set.contains(10000L)) + + set.add(50L) + assert(set.size === 3) + assert(set.contains(10L)) + assert(set.contains(50L)) + assert(set.contains(999L)) + assert(!set.contains(10000L)) + } + + test("non-primitive") { + val set = new OpenHashSet[String] + assert(set.size === 0) + assert(!set.contains(10.toString)) + assert(!set.contains(50.toString)) + assert(!set.contains(999.toString)) + assert(!set.contains(10000.toString)) + + set.add(10.toString) + assert(set.size === 1) + assert(set.contains(10.toString)) + assert(!set.contains(50.toString)) + assert(!set.contains(999.toString)) + assert(!set.contains(10000.toString)) + + set.add(50.toString) + assert(set.size === 2) + assert(set.contains(10.toString)) + assert(set.contains(50.toString)) + assert(!set.contains(999.toString)) + assert(!set.contains(10000.toString)) + + set.add(999.toString) + assert(set.size === 3) + assert(set.contains(10.toString)) + assert(set.contains(50.toString)) + assert(set.contains(999.toString)) + assert(!set.contains(10000.toString)) + + set.add(50.toString) + assert(set.size === 3) + assert(set.contains(10.toString)) + assert(set.contains(50.toString)) + assert(set.contains(999.toString)) + assert(!set.contains(10000.toString)) + } + + test("non-primitive set growth") { + val set = new OpenHashSet[String] + for (i <- 1 to 1000) { + set.add(i.toString) + } + assert(set.size === 1000) + assert(set.capacity > 1000) + for (i <- 1 to 100) { + set.add(i.toString) + } + assert(set.size === 1000) + assert(set.capacity > 1000) + } + + test("primitive set growth") { + val set = new OpenHashSet[Long] + for (i <- 1 to 1000) { + set.add(i.toLong) + } + assert(set.size === 1000) + assert(set.capacity > 1000) + for (i <- 1 to 100) { + set.add(i.toLong) + } + assert(set.size === 1000) + assert(set.capacity > 1000) + } +} diff --git a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala similarity index 92% rename from core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala rename to core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala index b9a4b54544..dfd6aed2c4 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite @@ -58,12 +58,12 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite { test("changeValue") { val map = new PrimitiveKeyOpenHashMap[Long, String]() for (i <- 1 to 100) { - map(i.toLong) = "" + i + map(i.toLong) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { val res = map.changeValue(i.toLong, { assert(false); "" }, v => { - assert(v === "" + i) + assert(v === i.toString) v + "!" }) assert(res === i + "!") @@ -80,11 +80,11 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite { test("inserting in capacity-1 map") { val map = new PrimitiveKeyOpenHashMap[Long, String](1) for (i <- 1 to 100) { - map(i.toLong) = "" + i + map(i.toLong) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { - assert(map(i.toLong) === "" + i) + assert(map(i.toLong) === i.toString) } } } diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala deleted file mode 100644 index b5b3a4abe1..0000000000 --- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala +++ /dev/null @@ -1,74 +0,0 @@ -package org.apache.spark.util.hash - -import org.scalatest.FunSuite - - -class OpenHashSetSuite extends FunSuite { - - test("primitive int") { - val set = new OpenHashSet[Int] - assert(set.size === 0) - set.add(10) - assert(set.size === 1) - set.add(50) - assert(set.size === 2) - set.add(999) - assert(set.size === 3) - set.add(50) - assert(set.size === 3) - } - - test("primitive long") { - val set = new OpenHashSet[Long] - assert(set.size === 0) - set.add(10L) - assert(set.size === 1) - set.add(50L) - assert(set.size === 2) - set.add(999L) - assert(set.size === 3) - set.add(50L) - assert(set.size === 3) - } - - test("non-primitive") { - val set = new OpenHashSet[String] - assert(set.size === 0) - set.add(10.toString) - assert(set.size === 1) - set.add(50.toString) - assert(set.size === 2) - set.add(999.toString) - assert(set.size === 3) - set.add(50.toString) - assert(set.size === 3) - } - - test("non-primitive set growth") { - val set = new OpenHashSet[String] - for (i <- 1 to 1000) { - set.add(i.toString) - } - assert(set.size === 1000) - assert(set.capacity > 1000) - for (i <- 1 to 100) { - set.add(i.toString) - } - assert(set.size === 1000) - assert(set.capacity > 1000) - } - - test("primitive set growth") { - val set = new OpenHashSet[Long] - for (i <- 1 to 1000) { - set.add(i.toLong) - } - assert(set.size === 1000) - assert(set.capacity > 1000) - for (i <- 1 to 100) { - set.add(i.toLong) - } - assert(set.size === 1000) - assert(set.capacity > 1000) - } -} diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index f679cad713..5927f736f3 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -13,7 +13,7 @@ object in your main program (called the _driver program_). Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_ (either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are -worker processes that run computations and store data for your application. +worker processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends *tasks* for the executors to run. @@ -57,6 +57,18 @@ which takes a list of JAR files (Java/Scala) or .egg and .zip libraries (Python) worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar` and `addFile`. +## URIs for addJar / addFile + +- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and every executor + pulls the file from the driver HTTP server +- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected +- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This + means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, + or shared via NFS, GlusterFS, etc. + +Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. +Over time this can use up a significant amount of space and will need to be cleaned up. + # Monitoring Each driver program has a web UI, typically on port 4040, that displays information about running diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md index 1e5575d657..156a727026 100644 --- a/docs/ec2-scripts.md +++ b/docs/ec2-scripts.md @@ -98,7 +98,7 @@ permissions on your private key file, you can run `launch` with the `bin/hadoop` script in that directory. Note that the data in this HDFS goes away when you stop and restart a machine. - There is also a *persistent HDFS* instance in - `/root/presistent-hdfs` that will keep data across cluster restarts. + `/root/persistent-hdfs` that will keep data across cluster restarts. Typically each node has relatively little space of persistent data (about 3 GB), but you can use the `--ebs-vol-size` option to `spark-ec2` to attach a persistent EBS volume to each node for diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 65868b76b9..79848380c0 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -73,7 +73,7 @@ def parse_args(): parser.add_option("-v", "--spark-version", default="0.8.0", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option("--spark-git-repo", - default="https://github.com/mesos/spark", + default="https://github.com/apache/incubator-spark", help="Github repo from which to checkout supplied commit hash") parser.add_option("--hadoop-major-version", default="1", help="Major version of Hadoop (default: 1)") diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 646682878f..86dd9ca1b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -21,6 +21,7 @@ import java.util.Random import scala.math.exp import org.apache.spark.util.Vector import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo /** @@ -51,7 +52,7 @@ object SparkHdfsLR { System.exit(1) } val inputPath = args(1) - val conf = SparkEnv.get.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), InputFormatInfo.computePreferredLocations( diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index 62f445127c..f65f96ed0c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -2,10 +2,9 @@ package org.apache.spark.graph import com.esotericsoftware.kryo.Kryo -import org.apache.spark.graph.impl.MessageToPartition +import org.apache.spark.graph.impl.{EdgePartition, MessageToPartition} import org.apache.spark.serializer.KryoRegistrator -import org.apache.spark.graph.impl._ -import org.apache.spark.util.hash.BitSet +import org.apache.spark.util.collection.BitSet class GraphKryoRegistrator extends KryoRegistrator { diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index f26e286003..c4761d7452 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -17,26 +17,11 @@ package org.apache.spark.graph -import java.nio.ByteBuffer - - - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer - import org.apache.spark._ -import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ -import org.apache.spark.Partitioner._ - +import org.apache.spark.rdd._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.hash.BitSet -import org.apache.spark.util.hash.OpenHashSet -import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap - - - - +import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap} /** @@ -184,9 +169,9 @@ class VertexSetRDD[@specialized V: ClassManifest]( (keysIter: Iterator[VertexIdToIndexMap], valuesIter: Iterator[(Int => V, BitSet)]) => val index = keysIter.next() - assert(keysIter.hasNext() == false) + assert(keysIter.hasNext == false) val (oldValues, bs) = valuesIter.next() - assert(valuesIter.hasNext() == false) + assert(valuesIter.hasNext == false) // Allocate the array to store the results into val newBS = new BitSet(index.capacity) // Iterate over the active bits in the old bitset and @@ -246,9 +231,9 @@ class VertexSetRDD[@specialized V: ClassManifest]( (keysIter: Iterator[VertexIdToIndexMap], valuesIter: Iterator[(Int => V, BitSet)]) => val index = keysIter.next() - assert(keysIter.hasNext() == false) + assert(keysIter.hasNext == false) val (oldValues, bs: BitSet) = valuesIter.next() - assert(valuesIter.hasNext() == false) + assert(valuesIter.hasNext == false) // Cosntruct a view of the map transformation val newValues: (Int => U) = (ind: Int) => { if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) } @@ -384,7 +369,7 @@ class VertexSetRDD[@specialized V: ClassManifest]( // Get the location of the key in the index val pos = index.getPos(k) // Only if the key is already in the index - if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) { + if ((pos & OpenHashSet.NONEXISTENCE_MASK) == 0) { // Get the actual index val ind = pos & OpenHashSet.POSITION_MASK // If this value has already been seen then merge @@ -642,7 +627,7 @@ object VertexSetRDD { * * @note duplicate vertices are discarded arbitrarily * - * @tparam the vertex attribute type + * @tparam V the vertex attribute type * @param rdd the rdd containing vertices * @param index the index which must be a superset of the vertices * in RDD @@ -656,7 +641,7 @@ object VertexSetRDD { * Construct a vertex set from an RDD using an existing index and a * user defined `combiner` to merge duplicate vertices. * - * @tparam the vertex attribute type + * @tparam V the vertex attribute type * @param rdd the rdd containing vertices * @param index the index which must be a superset of the vertices * in RDD @@ -673,7 +658,7 @@ object VertexSetRDD { * Construct a vertex set from an RDD using an existing index and a * user defined `combiner` to merge duplicate vertices. * - * @tparam the vertex attribute type + * @tparam V the vertex attribute type * @param rdd the rdd containing vertices * @param index the index which must be a superset of the vertices * in RDD @@ -710,13 +695,13 @@ object VertexSetRDD { val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { // There is only one map val index = indexIter.next() - assert(!indexIter.hasNext()) + assert(!indexIter.hasNext) val values = new Array[C](index.capacity) val bs = new BitSet(index.capacity) for ((k,c) <- tblIter) { // Get the location of the key in the index val pos = index.getPos(k) - if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { + if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { throw new SparkException("Error: Trying to bind an external index " + "to an RDD which contains keys that are not in the index.") } else { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index b80713dbf4..f817435fb8 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -4,26 +4,17 @@ import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.ArrayBuilder import org.apache.spark.SparkContext._ -import org.apache.spark.Partitioner import org.apache.spark.HashPartitioner import org.apache.spark.util.ClosureCleaner -import org.apache.spark.rdd -import org.apache.spark.rdd.RDD - - import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._ - -import org.apache.spark.util.hash.BitSet -import org.apache.spark.util.hash.OpenHashSet -import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap - +import org.apache.spark.rdd.RDD +import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap} /** diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala index 37a4fb4a5e..ee28d1429e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/package.scala +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -1,8 +1,6 @@ package org.apache.spark -import org.apache.spark.util.hash.BitSet -import org.apache.spark.util.hash.OpenHashSet -import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.OpenHashSet package object graph {