From a6ae2b48320d367be5fede60687331ce0d563d00 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Sun, 27 Oct 2013 13:35:04 -0500 Subject: [PATCH 01/25] Handle ConcurrentModificationExceptions in SparkContext init. System.getProperties.toMap will fail-fast when concurrently modified, and it seems like some other thread started by SparkContext does a System.setProperty during it's initialization. Handle this by just looping on ConcurrentModificationException, which seems the safest, since the non-fail-fast methods (Hastable.entrySet) have undefined behavior under concurrent modification. --- .../src/main/scala/org/apache/spark/SparkContext.scala | 6 +++--- core/src/main/scala/org/apache/spark/util/Utils.scala | 10 ++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 564466cfd5..d694dfe4d9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -248,8 +248,8 @@ class SparkContext( conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) { - conf.set(key.substring("spark.hadoop.".length), System.getProperty(key)) + Utils.getSystemProperties.foreach { case (key, value) if key.startsWith("spark.hadoop.") => + conf.set(key.substring("spark.hadoop.".length), value) } val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a3b3968c5e..d637a0a91d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, import org.apache.spark.deploy.SparkHadoopUtil import java.nio.ByteBuffer import org.apache.spark.{SparkEnv, SparkException, Logging} +import java.util.ConcurrentModificationException /** @@ -819,4 +820,13 @@ private[spark] object Utils extends Logging { // Nothing else to guard against ? hashAbs } + + /** Returns a copy of the system properties that is thread-safe to iterator over. */ + def getSystemProperties(): Map[String, String] = { + try { + return System.getProperties().toMap[String, String] + } catch { + case e: ConcurrentModificationException => getSystemProperties() + } + } } From eeb5f64c67c98f7252aab0fe2c55b86976463c74 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Tue, 29 Oct 2013 17:12:16 -0500 Subject: [PATCH 02/25] Remove SparkHadoopUtil stuff from SparkEnv --- .../scala/org/apache/spark/SparkContext.scala | 11 +++++----- .../scala/org/apache/spark/SparkEnv.scala | 14 ------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 21 +++++++++++++++++++ .../org/apache/spark/rdd/CheckpointRDD.scala | 7 ++++--- .../org/apache/spark/rdd/HadoopRDD.scala | 7 ++++--- .../spark/scheduler/InputFormatInfo.scala | 7 +++---- .../scala/org/apache/spark/util/Utils.scala | 3 +-- 7 files changed, 38 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c9bc01cba5..c1d74dc60c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.deploy.LocalSparkCluster +import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -245,7 +245,7 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { val env = SparkEnv.get - val conf = env.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { @@ -379,7 +379,7 @@ class SparkContext( minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. - SparkEnv.get.hadoop.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -697,7 +697,7 @@ class SparkContext( val uri = new URI(path) key = uri.getScheme match { case null | "file" => - if (env.hadoop.isYarnMode()) { + if (SparkHadoopUtil.get.isYarnMode()) { // In order for this to work on yarn the user must specify the --addjars option to // the client to upload the file into the distributed cache to make it show up in the // current working directory. @@ -932,9 +932,8 @@ class SparkContext( * prevent accidental overriding of checkpoint files in the existing directory. */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { - val env = SparkEnv.get val path = new Path(dir) - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index aaab717bcf..270d8d4b2d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -25,7 +25,6 @@ import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster} import org.apache.spark.network.ConnectionManager import org.apache.spark.serializer.{Serializer, SerializerManager} @@ -58,19 +57,6 @@ class SparkEnv ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() - val hadoop = { - val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if(yarnMode) { - try { - Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] - } catch { - case th: Throwable => throw new SparkException("Unable to load YARN support", th) - } - } else { - new SparkHadoopUtil - } - } - def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 83cd3df5fa..8c0c111d61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -22,6 +22,8 @@ import org.apache.hadoop.mapred.JobConf import com.google.common.collect.MapMaker +import org.apache.spark.SparkException + /** * Contains util methods to interact with Hadoop from Spark. @@ -47,3 +49,22 @@ class SparkHadoopUtil { def isYarnMode(): Boolean = { false } } + +object SparkHadoopUtil { + private val hadoop = { + val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if (yarnMode) { + try { + Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + } catch { + case th: Throwable => throw new SparkException("Unable to load YARN support", th) + } + } else { + new SparkHadoopUtil + } + } + + def get: SparkHadoopUtil = { + hadoop + } +} diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index ccaaecb85b..d3033ea4a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.rdd import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{NullWritable, BytesWritable} @@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(env.hadoop.newConfiguration()) + val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration()) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging { def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() @@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index fad042c7ae..0e8eaf4be6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} @@ -198,10 +199,10 @@ private[spark] object HadoopRDD { * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ - def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + def getCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.get(key) - def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + def containsCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.containsKey(key) def putCachedMetadata(key: String, value: Any) = - SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) + SparkHadoopUtil.get.hadoopJobMetadata.put(key, value) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 370ccd183c..1791ee660d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.deploy.SparkHadoopUtil import scala.collection.immutable.Set import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.hadoop.security.UserGroupInformation @@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { - val env = SparkEnv.get val conf = new JobConf(configuration) - env.hadoop.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(conf) FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { - val env = SparkEnv.get val jobConf = new JobConf(configuration) - env.hadoop.addCredentials(jobConf) + SparkHadoopUtil.get.addCredentials(jobConf) FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a3b3968c5e..fd2811e44c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -280,9 +280,8 @@ private[spark] object Utils extends Logging { } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others - val env = SparkEnv.get val uri = new URI(url) - val conf = env.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) From 3a388c320c5079bec44fe51d2f8218af2e56d98e Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 29 Oct 2013 19:20:40 -0500 Subject: [PATCH 03/25] Use Properties.clone() instead. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d637a0a91d..b20c0e5308 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -823,10 +823,7 @@ private[spark] object Utils extends Logging { /** Returns a copy of the system properties that is thread-safe to iterator over. */ def getSystemProperties(): Map[String, String] = { - try { - return System.getProperties().toMap[String, String] - } catch { - case e: ConcurrentModificationException => getSystemProperties() - } + return System.getProperties().clone() + .asInstanceOf[java.util.Properties].toMap[String, String] } } From e5e0ebdb1190a256e51dbf1265c6957f0cd56a29 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Tue, 29 Oct 2013 20:12:45 -0500 Subject: [PATCH 04/25] fix sparkhdfs lr test --- .../src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 646682878f..86dd9ca1b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -21,6 +21,7 @@ import java.util.Random import scala.math.exp import org.apache.spark.util.Vector import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo /** @@ -51,7 +52,7 @@ object SparkHdfsLR { System.exit(1) } val inputPath = args(1) - val conf = SparkEnv.get.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), InputFormatInfo.computePreferredLocations( From f231aaa24c56402ee364ac27142c3c35567d64f2 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Wed, 30 Oct 2013 11:46:12 -0500 Subject: [PATCH 05/25] move the hadoopJobMetadata back into SparkEnv --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 5 +++++ .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 7 ------- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 270d8d4b2d..ff2df8fb6a 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -31,6 +31,7 @@ import org.apache.spark.serializer.{Serializer, SerializerManager} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.api.python.PythonWorkerFactory +import com.google.common.collect.MapMaker /** * Holds all the runtime environment objects for a running Spark instance (either master or worker), @@ -57,6 +58,10 @@ class SparkEnv ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() + def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8c0c111d61..6bc846aa92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -20,19 +20,13 @@ package org.apache.spark.deploy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf -import com.google.common.collect.MapMaker - import org.apache.spark.SparkException - /** * Contains util methods to interact with Hadoop from Spark. */ private[spark] class SparkHadoopUtil { - // A general, soft-reference map for metadata needed during HadoopRDD split computation - // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). - private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop @@ -47,7 +41,6 @@ class SparkHadoopUtil { def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } - } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0e8eaf4be6..32901a508f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -199,10 +199,10 @@ private[spark] object HadoopRDD { * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ - def getCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.get(key) + def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key) - def containsCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.containsKey(key) + def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key) def putCachedMetadata(key: String, value: Any) = - SparkHadoopUtil.get.hadoopJobMetadata.put(key, value) + SparkEnv.get.hadoopJobMetadata.put(key, value) } From 09f3b677cb7cce08882ea030e9af5798a63046ba Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Wed, 30 Oct 2013 12:29:39 -0500 Subject: [PATCH 06/25] Avoid match errors when filtering for spark.hadoop settings. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d694dfe4d9..28ac49a24a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -248,8 +248,10 @@ class SparkContext( conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - Utils.getSystemProperties.foreach { case (key, value) if key.startsWith("spark.hadoop.") => - conf.set(key.substring("spark.hadoop.".length), value) + Utils.getSystemProperties.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + conf.set(key.substring("spark.hadoop.".length), value) + } } val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize) From ff038eb4e0fbe94db85f6e6dec717d39fcf2048c Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 30 Oct 2013 15:26:51 -0700 Subject: [PATCH 07/25] Fixed incorrect log message in local scheduler --- .../org/apache/spark/scheduler/local/LocalTaskSetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala index 55f8313e87..53bf78267e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala @@ -175,7 +175,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas reason.className, reason.description, locs.mkString("\n"))) if (numFailures(index) > MAX_TASK_FAILURES) { val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format( - taskSet.id, index, 4, reason.description) + taskSet.id, index, MAX_TASK_FAILURES, reason.description) decreaseRunningTasks(runningTasks) sched.dagScheduler.taskSetFailed(taskSet, errorMessage) // need to delete failed Taskset from schedule queue From e54a37fe15a8fa8daec6c00fde4d191680b004c4 Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Fri, 1 Nov 2013 10:58:11 -0700 Subject: [PATCH 08/25] Document all the URIs for addJar/addFile --- docs/cluster-overview.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index f679cad713..5927f736f3 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -13,7 +13,7 @@ object in your main program (called the _driver program_). Specifically, to run on a cluster, the SparkContext can connect to several types of _cluster managers_ (either Spark's own standalone cluster manager or Mesos/YARN), which allocate resources across applications. Once connected, Spark acquires *executors* on nodes in the cluster, which are -worker processes that run computations and store data for your application. +worker processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends *tasks* for the executors to run. @@ -57,6 +57,18 @@ which takes a list of JAR files (Java/Scala) or .egg and .zip libraries (Python) worker nodes. You can also dynamically add new files to be sent to executors with `SparkContext.addJar` and `addFile`. +## URIs for addJar / addFile + +- **file:** - Absolute paths and `file:/` URIs are served by the driver's HTTP file server, and every executor + pulls the file from the driver HTTP server +- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected +- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This + means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, + or shared via NFS, GlusterFS, etc. + +Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes. +Over time this can use up a significant amount of space and will need to be cleaned up. + # Monitoring Each driver program has a web UI, typically on port 4040, that displays information about running From f3679fd494893d4feafec5c44d31adf1ee3aa2dc Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Fri, 1 Nov 2013 11:08:03 -0700 Subject: [PATCH 09/25] Add local: URI support to addFile as well --- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8d7c5a77da..880b49e8ef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -591,7 +591,8 @@ class SparkContext( val uri = new URI(path) val key = uri.getScheme match { case null | "file" => env.httpFileServer.addFile(new File(uri.getPath)) - case _ => path + case "local" => "file:" + uri.getPath + case _ => path } addedFiles(key) = System.currentTimeMillis From 3f89354c45fce179e6cc8e7a4e8294694d24ae18 Mon Sep 17 00:00:00 2001 From: "Fabrizio (Misto) Milo" Date: Fri, 1 Nov 2013 17:47:37 -0700 Subject: [PATCH 10/25] fix persistent-hdfs --- docs/ec2-scripts.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/ec2-scripts.md b/docs/ec2-scripts.md index 1e5575d657..156a727026 100644 --- a/docs/ec2-scripts.md +++ b/docs/ec2-scripts.md @@ -98,7 +98,7 @@ permissions on your private key file, you can run `launch` with the `bin/hadoop` script in that directory. Note that the data in this HDFS goes away when you stop and restart a machine. - There is also a *persistent HDFS* instance in - `/root/presistent-hdfs` that will keep data across cluster restarts. + `/root/persistent-hdfs` that will keep data across cluster restarts. Typically each node has relatively little space of persistent data (about 3 GB), but you can use the `--ebs-vol-size` option to `spark-ec2` to attach a persistent EBS volume to each node for From 4b5d61f31fc095ed7e6e0701796b9d7f35378bbf Mon Sep 17 00:00:00 2001 From: "Fabrizio (Misto) Milo" Date: Fri, 1 Nov 2013 18:41:49 -0700 Subject: [PATCH 11/25] update default github --- ec2/spark_ec2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 65868b76b9..79848380c0 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -73,7 +73,7 @@ def parse_args(): parser.add_option("-v", "--spark-version", default="0.8.0", help="Version of Spark to use: 'X.Y.Z' or a specific git hash") parser.add_option("--spark-git-repo", - default="https://github.com/mesos/spark", + default="https://github.com/apache/incubator-spark", help="Github repo from which to checkout supplied commit hash") parser.add_option("--hadoop-major-version", default="1", help="Major version of Hadoop (default: 1)") From 895747bb055ed1b4d0b5db8baf8b416d8c1683ca Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 2 Nov 2013 12:58:44 -0700 Subject: [PATCH 12/25] Fixed a typo in Hadoop version in README. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 28ad1e4604..456b8060ef 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ described below. When developing a Spark application, specify the Hadoop version by adding the "hadoop-client" artifact to your project's dependencies. For example, if you're -using Hadoop 1.0.1 and build your application using SBT, add this entry to +using Hadoop 1.2.1 and build your application using SBT, add this entry to `libraryDependencies`: "org.apache.hadoop" % "hadoop-client" % "1.2.1" From 1e9543b567b81cf3207984402269d230c10e713e Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 2 Nov 2013 23:19:01 -0700 Subject: [PATCH 13/25] Fixed a bug that uses twice amount of memory for the primitive arrays due to a scala compiler bug. Also addressed Matei's code review comment. --- .../main/scala/org/apache/spark/util/Utils.scala | 12 +++++------- .../spark/util/{hash => collection}/BitSet.scala | 6 +++--- .../util/{hash => collection}/OpenHashMap.scala | 12 ++++++++---- .../util/{hash => collection}/OpenHashSet.scala | 16 +++++++++------- .../PrimitiveKeyOpenHashMap.scala | 14 +++++++++----- .../util/{hash => collection}/BitSetSuite.scala | 2 +- .../{hash => collection}/OpenHashMapSuite.scala | 2 +- .../{hash => collection}/OpenHashSetSuite.scala | 2 +- .../PrimitiveKeyOpenHashSetSuite.scala | 2 +- 9 files changed, 38 insertions(+), 30 deletions(-) rename core/src/main/scala/org/apache/spark/util/{hash => collection}/BitSet.scala (95%) rename core/src/main/scala/org/apache/spark/util/{hash => collection}/OpenHashMap.scala (91%) rename core/src/main/scala/org/apache/spark/util/{hash => collection}/OpenHashSet.scala (95%) rename core/src/main/scala/org/apache/spark/util/{hash => collection}/PrimitiveKeyOpenHashMap.scala (87%) rename core/src/test/scala/org/apache/spark/util/{hash => collection}/BitSetSuite.scala (98%) rename core/src/test/scala/org/apache/spark/util/{hash => collection}/OpenHashMapSuite.scala (98%) rename core/src/test/scala/org/apache/spark/util/{hash => collection}/OpenHashSetSuite.scala (97%) rename core/src/test/scala/org/apache/spark/util/{hash => collection}/PrimitiveKeyOpenHashSetSuite.scala (98%) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0c5c12b7a8..fe932d8ede 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,13 +18,12 @@ package org.apache.spark.util import java.io._ -import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} +import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} import java.util.{Locale, Random, UUID} -import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import java.util.regex.Pattern +import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.Map -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.io.Source @@ -36,8 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.deploy.SparkHadoopUtil import java.nio.ByteBuffer -import org.apache.spark.{SparkEnv, SparkException, Logging} -import java.util.ConcurrentModificationException +import org.apache.spark.{SparkException, Logging} /** @@ -149,7 +147,7 @@ private[spark] object Utils extends Logging { return buf } - private val shutdownDeletePaths = new collection.mutable.HashSet[String]() + private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { diff --git a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala similarity index 95% rename from core/src/main/scala/org/apache/spark/util/hash/BitSet.scala rename to core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 0ec002b5d0..6604ec738c 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -24,8 +24,8 @@ package org.apache.spark.util.hash */ class BitSet(numBits: Int) { - private val words = new Array[Long](bit2words(numBits)) - private val numWords = words.length + private[this] val words = new Array[Long](bit2words(numBits)) + private[this] val numWords = words.length /** * Sets the bit at the specified index to true. diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala similarity index 91% rename from core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala rename to core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index a376d1015a..ed117b2abf 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -34,7 +34,11 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: def this() = this(64) protected var _keySet = new OpenHashSet[K](initialCapacity) - private var _values = new Array[V](_keySet.capacity) + + // Init in constructor (instead of in declaration) to work around a Scala compiler specialization + // bug that would generate two arrays (one for Object and one for specialized T). + private var _values: Array[V] = _ + _values = new Array[V](_keySet.capacity) @transient private var _oldValues: Array[V] = null @@ -64,7 +68,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: haveNullValue = true nullValue = v } else { - val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK _values(pos) = v _keySet.rehashIfNeeded(k, grow, move) _oldValues = null @@ -87,7 +91,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: } nullValue } else { - val pos = _keySet.fastAdd(k) + val pos = _keySet.addWithoutResize(k) if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala similarity index 95% rename from core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala rename to core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 7aa3f6220c..e98a93dc2a 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -78,9 +78,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( protected var _mask = _capacity - 1 protected var _size = 0 - protected var _data = classManifest[T].newArray(_capacity) protected var _bitset = new BitSet(_capacity) + // Init of the array in constructor (instead of in declaration) to work around a Scala compiler + // specialization bug that would generate two arrays (one for Object and one for specialized T). + protected var _data: Array[T] = _ + _data = new Array[T](_capacity) + /** Number of elements in the set. */ def size: Int = _size @@ -95,7 +99,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * and rehash all elements. */ def add(k: T) { - fastAdd(k) + addWithoutResize(k) rehashIfNeeded(k, grow, move) } @@ -109,7 +113,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * @return The position where the key is placed, plus the highest order bit is set if the key * exists previously. */ - def fastAdd(k: T): Int = putInto(_bitset, _data, k) + def addWithoutResize(k: T): Int = putInto(_bitset, _data, k) /** * Rehash the set if it is overloaded. @@ -154,7 +158,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( /** * Put an entry into the set. Return the position where the key is placed. In addition, the - * highest bid in the returned position is set if the key exists prior to this put. + * highest bit in the returned position is set if the key exists prior to this put. * * This function assumes the data array has at least one empty slot. */ @@ -236,9 +240,7 @@ private[spark] object OpenHashSet { val INVALID_POS = -1 - val EXISTENCE_MASK = 0x80000000 - val POSITION_MASK = 0xEFFFFFF /** diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala similarity index 87% rename from core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala rename to core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index 14c1367207..e8f28ecdd7 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -36,8 +36,12 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int]) - protected var _keySet = new OpenHashSet[K](initialCapacity) - private var _values = new Array[V](_keySet.capacity) + // Init in constructor (instead of in declaration) to work around a Scala compiler specialization + // bug that would generate two arrays (one for Object and one for specialized T). + protected var _keySet: OpenHashSet[K] = _ + private var _values: Array[V] = _ + _keySet = new OpenHashSet[K](initialCapacity) + _values = new Array[V](_keySet.capacity) private var _oldValues: Array[V] = null @@ -51,7 +55,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, /** Set the value for a key */ def update(k: K, v: V) { - val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK _values(pos) = v _keySet.rehashIfNeeded(k, grow, move) _oldValues = null @@ -64,7 +68,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, * @return the newly updated value. */ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = { - val pos = _keySet.fastAdd(k) + val pos = _keySet.addWithoutResize(k) if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue diff --git a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala similarity index 98% rename from core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala rename to core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala index 41ede860d2..0f1ab3d20e 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala similarity index 98% rename from core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala rename to core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala index 355784da32..5e74ca1f7e 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala similarity index 97% rename from core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala rename to core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index b5b3a4abe1..40049e8475 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala similarity index 98% rename from core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala rename to core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala index b9a4b54544..dc7f6cb023 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite From eb5f8a3f977688beb2f068050d8fabe7e15141d3 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 3 Nov 2013 18:04:21 -0800 Subject: [PATCH 14/25] Code review feedback. --- .../apache/spark/util/collection/BitSet.scala | 4 +- .../spark/util/collection/OpenHashMap.scala | 2 +- .../spark/util/collection/OpenHashSet.scala | 20 +++-- .../collection/PrimitiveKeyOpenHashMap.scala | 2 +- .../util/collection/OpenHashMapSuite.scala | 16 ++-- .../util/collection/OpenHashSetSuite.scala | 73 ++++++++++++++++++- .../PrimitiveKeyOpenHashSetSuite.scala | 8 +- 7 files changed, 100 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 6604ec738c..a1a452315d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -45,7 +45,7 @@ class BitSet(numBits: Int) { */ def get(index: Int): Boolean = { val bitmask = 1L << (index & 0x3f) // mod 64 and shift - (words(index >>> 6) & bitmask) != 0 // div by 64 and mask + (words(index >> 6) & bitmask) != 0 // div by 64 and mask } /** Return the number of bits set to true in this BitSet. */ @@ -99,5 +99,5 @@ class BitSet(numBits: Int) { } /** Return the number of longs it would take to hold numBits. */ - private def bit2words(numBits: Int) = ((numBits - 1) >>> 6) + 1 + private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1 } diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index ed117b2abf..80545c9688 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -92,7 +92,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: nullValue } else { val pos = _keySet.addWithoutResize(k) - if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { + if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue _keySet.rehashIfNeeded(k, grow, move) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index e98a93dc2a..4592e4f939 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -43,6 +43,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") require(initialCapacity >= 1, "Invalid initial capacity") + require(loadFactor < 1.0, "Load factor must be less than 1.0") + require(loadFactor > 0.0, "Load factor must be greater than 0.0") import OpenHashSet._ @@ -119,8 +121,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * Rehash the set if it is overloaded. * @param k A parameter unused in the function, but to force the Scala compiler to specialize * this method. - * @param allocateFunc Closure invoked when we are allocating a new, larger array. - * @param moveFunc Closure invoked when we move the key from one position (in the old data array) + * @param allocateFunc Callback invoked when we are allocating a new, larger array. + * @param moveFunc Callback invoked when we move the key from one position (in the old data array) * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { @@ -129,7 +131,9 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( } } - /** Return the position of the element in the underlying array. */ + /** + * Return the position of the element in the underlying array, or INVALID_POS if it is not found. + */ def getPos(k: T): Int = { var pos = hashcode(hasher.hash(k)) & _mask var i = 1 @@ -172,7 +176,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( data(pos) = k bitset.set(pos) _size += 1 - return pos | EXISTENCE_MASK + return pos | NONEXISTENCE_MASK } else if (data(pos) == k) { // Found an existing key. return pos @@ -194,8 +198,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * * @param k A parameter unused in the function, but to force the Scala compiler to specialize * this method. - * @param allocateFunc Closure invoked when we are allocating a new, larger array. - * @param moveFunc Closure invoked when we move the key from one position (in the old data array) + * @param allocateFunc Callback invoked when we are allocating a new, larger array. + * @param moveFunc Callback invoked when we move the key from one position (in the old data array) * to a new position (in the new data array). */ private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { @@ -203,7 +207,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") allocateFunc(newCapacity) - val newData = classManifest[T].newArray(newCapacity) + val newData = new Array[T](newCapacity) val newBitset = new BitSet(newCapacity) var pos = 0 _size = 0 @@ -240,7 +244,7 @@ private[spark] object OpenHashSet { val INVALID_POS = -1 - val EXISTENCE_MASK = 0x80000000 + val NONEXISTENCE_MASK = 0x80000000 val POSITION_MASK = 0xEFFFFFF /** diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index e8f28ecdd7..4adf9cfb76 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -69,7 +69,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, */ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = { val pos = _keySet.addWithoutResize(k) - if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { + if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue _keySet.rehashIfNeeded(k, grow, move) diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala index 5e74ca1f7e..ca3f684668 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala @@ -82,7 +82,7 @@ class OpenHashMapSuite extends FunSuite { test("null keys") { val map = new OpenHashMap[String, String]() for (i <- 1 to 100) { - map("" + i) = "" + i + map(i.toString) = i.toString } assert(map.size === 100) assert(map(null) === null) @@ -94,7 +94,7 @@ class OpenHashMapSuite extends FunSuite { test("null values") { val map = new OpenHashMap[String, String]() for (i <- 1 to 100) { - map("" + i) = null + map(i.toString) = null } assert(map.size === 100) assert(map("1") === null) @@ -108,12 +108,12 @@ class OpenHashMapSuite extends FunSuite { test("changeValue") { val map = new OpenHashMap[String, String]() for (i <- 1 to 100) { - map("" + i) = "" + i + map(i.toString) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { - val res = map.changeValue("" + i, { assert(false); "" }, v => { - assert(v === "" + i) + val res = map.changeValue(i.toString, { assert(false); "" }, v => { + assert(v === i.toString) v + "!" }) assert(res === i + "!") @@ -121,7 +121,7 @@ class OpenHashMapSuite extends FunSuite { // Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a // bug where changeValue would return the wrong result when the map grew on that insert for (i <- 101 to 400) { - val res = map.changeValue("" + i, { i + "!" }, v => { assert(false); v }) + val res = map.changeValue(i.toString, { i + "!" }, v => { assert(false); v }) assert(res === i + "!") } assert(map.size === 400) @@ -138,11 +138,11 @@ class OpenHashMapSuite extends FunSuite { test("inserting in capacity-1 map") { val map = new OpenHashMap[String, String](1) for (i <- 1 to 100) { - map("" + i) = "" + i + map(i.toString) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { - assert(map("" + i) === "" + i) + assert(map(i.toString) === i.toString) } } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index 40049e8475..4e11e8a628 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -8,40 +8,111 @@ class OpenHashSetSuite extends FunSuite { test("primitive int") { val set = new OpenHashSet[Int] assert(set.size === 0) + assert(!set.contains(10)) + assert(!set.contains(50)) + assert(!set.contains(999)) + assert(!set.contains(10000)) + set.add(10) - assert(set.size === 1) + assert(set.contains(10)) + assert(!set.contains(50)) + assert(!set.contains(999)) + assert(!set.contains(10000)) + set.add(50) assert(set.size === 2) + assert(set.contains(10)) + assert(set.contains(50)) + assert(!set.contains(999)) + assert(!set.contains(10000)) + set.add(999) assert(set.size === 3) + assert(set.contains(10)) + assert(set.contains(50)) + assert(set.contains(999)) + assert(!set.contains(10000)) + set.add(50) assert(set.size === 3) + assert(set.contains(10)) + assert(set.contains(50)) + assert(set.contains(999)) + assert(!set.contains(10000)) } test("primitive long") { val set = new OpenHashSet[Long] assert(set.size === 0) + assert(!set.contains(10L)) + assert(!set.contains(50L)) + assert(!set.contains(999L)) + assert(!set.contains(10000L)) + set.add(10L) assert(set.size === 1) + assert(set.contains(10L)) + assert(!set.contains(50L)) + assert(!set.contains(999L)) + assert(!set.contains(10000L)) + set.add(50L) assert(set.size === 2) + assert(set.contains(10L)) + assert(set.contains(50L)) + assert(!set.contains(999L)) + assert(!set.contains(10000L)) + set.add(999L) assert(set.size === 3) + assert(set.contains(10L)) + assert(set.contains(50L)) + assert(set.contains(999L)) + assert(!set.contains(10000L)) + set.add(50L) assert(set.size === 3) + assert(set.contains(10L)) + assert(set.contains(50L)) + assert(set.contains(999L)) + assert(!set.contains(10000L)) } test("non-primitive") { val set = new OpenHashSet[String] assert(set.size === 0) + assert(!set.contains(10.toString)) + assert(!set.contains(50.toString)) + assert(!set.contains(999.toString)) + assert(!set.contains(10000.toString)) + set.add(10.toString) assert(set.size === 1) + assert(set.contains(10.toString)) + assert(!set.contains(50.toString)) + assert(!set.contains(999.toString)) + assert(!set.contains(10000.toString)) + set.add(50.toString) assert(set.size === 2) + assert(set.contains(10.toString)) + assert(set.contains(50.toString)) + assert(!set.contains(999.toString)) + assert(!set.contains(10000.toString)) + set.add(999.toString) assert(set.size === 3) + assert(set.contains(10.toString)) + assert(set.contains(50.toString)) + assert(set.contains(999.toString)) + assert(!set.contains(10000.toString)) + set.add(50.toString) assert(set.size === 3) + assert(set.contains(10.toString)) + assert(set.contains(50.toString)) + assert(set.contains(999.toString)) + assert(!set.contains(10000.toString)) } test("non-primitive set growth") { diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala index dc7f6cb023..dfd6aed2c4 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala @@ -58,12 +58,12 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite { test("changeValue") { val map = new PrimitiveKeyOpenHashMap[Long, String]() for (i <- 1 to 100) { - map(i.toLong) = "" + i + map(i.toLong) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { val res = map.changeValue(i.toLong, { assert(false); "" }, v => { - assert(v === "" + i) + assert(v === i.toString) v + "!" }) assert(res === i + "!") @@ -80,11 +80,11 @@ class PrimitiveKeyOpenHashSetSuite extends FunSuite { test("inserting in capacity-1 map") { val map = new PrimitiveKeyOpenHashMap[Long, String](1) for (i <- 1 to 100) { - map(i.toLong) = "" + i + map(i.toLong) = i.toString } assert(map.size === 100) for (i <- 1 to 100) { - assert(map(i.toLong) === "" + i) + assert(map(i.toLong) === i.toString) } } } From 84991a1b91cf1b3d3e51b984877016ff4a506cfc Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 31 Oct 2013 15:13:37 -0700 Subject: [PATCH 15/25] Memory-optimized shuffle file consolidation Overhead of each shuffle block for consolidation has been reduced from >300 bytes to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks, net overhead was ~8,400,000 bytes. Despite the memory-optimized implementation incurring extra CPU overhead, the runtime of the shuffle phase in this test was only around 2% slower, while the reduce phase was 40% faster, when compared to not using any shuffle file consolidation. --- .../apache/spark/storage/BlockManager.scala | 10 +- .../spark/storage/BlockObjectWriter.scala | 15 +- .../spark/storage/DiskBlockManager.scala | 56 +---- .../org/apache/spark/storage/DiskStore.scala | 4 +- .../spark/storage/ShuffleBlockManager.scala | 214 ++++++++++++++++-- .../apache/spark/util/MetadataCleaner.scala | 2 +- .../apache/spark/util/PrimitiveVector.scala | 48 ++++ .../spark/storage/DiskBlockManagerSuite.scala | 80 +++++++ 8 files changed, 350 insertions(+), 79 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala create mode 100644 core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 76d537f8e8..fbedfbc446 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{InputStream, OutputStream} +import java.io.{File, InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{HashMap, ArrayBuffer} @@ -47,7 +47,7 @@ private[spark] class BlockManager( extends Logging { val shuffleBlockManager = new ShuffleBlockManager(this) - val diskBlockManager = new DiskBlockManager( + val diskBlockManager = new DiskBlockManager(shuffleBlockManager, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] @@ -462,15 +462,11 @@ private[spark] class BlockManager( * This is currently used for writing shuffle files out. Callers should handle error * cases. */ - def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int) + def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { - if (shuffleBlockManager.consolidateShuffleFiles) { - diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) - } val myInfo = new ShuffleBlockInfo() blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 32d2dd0694..e49c191c70 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -78,11 +78,11 @@ abstract class BlockObjectWriter(val blockId: BlockId) { /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ class DiskBlockObjectWriter( - blockId: BlockId, - file: File, - serializer: Serializer, - bufferSize: Int, - compressStream: OutputStream => OutputStream) + blockId: BlockId, + file: File, + serializer: Serializer, + bufferSize: Int, + compressStream: OutputStream => OutputStream) extends BlockObjectWriter(blockId) with Logging { @@ -111,8 +111,8 @@ class DiskBlockObjectWriter( private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null - private var initialPosition = 0L - private var lastValidPosition = 0L + private val initialPosition = file.length() + private var lastValidPosition = initialPosition private var initialized = false private var _timeWriting = 0L @@ -120,7 +120,6 @@ class DiskBlockObjectWriter( fos = new FileOutputStream(file, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() - initialPosition = channel.position lastValidPosition = initialPosition bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index bcb58ad946..4f9537d1c7 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -20,12 +20,11 @@ package org.apache.spark.storage import java.io.File import java.text.SimpleDateFormat import java.util.{Date, Random} -import java.util.concurrent.ConcurrentHashMap import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import org.apache.spark.util.Utils /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -35,7 +34,7 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH * * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ -private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { +private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt @@ -47,54 +46,25 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null - // Stores only Blocks which have been specifically mapped to segments of files - // (rather than the default, which maps a Block to a whole file). - // This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks. - private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment] - - val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup) - addShutdownHook() - /** - * Creates a logical mapping from the given BlockId to a segment of a file. - * This will cause any accesses of the logical BlockId to be directed to the specified - * physical location. - */ - def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) { - blockToFileSegmentMap.put(blockId, fileSegment) - } - /** * Returns the phyiscal file segment in which the given BlockId is located. * If the BlockId has been mapped to a specific FileSegment, that will be returned. * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. */ def getBlockLocation(blockId: BlockId): FileSegment = { - if (blockToFileSegmentMap.internalMap.containsKey(blockId)) { - blockToFileSegmentMap.get(blockId).get - } else { - val file = getFile(blockId.name) - new FileSegment(file, 0, file.length()) + if (blockId.isShuffle) { + val segment = shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) + if (segment.isDefined) { return segment.get } + // If no special mapping found, assume standard block -> file mapping... } + + val file = getFile(blockId.name) + new FileSegment(file, 0, file.length()) } - /** - * Simply returns a File to place the given Block into. This does not physically create the file. - * If filename is given, that file will be used. Otherwise, we will use the BlockId to get - * a unique filename. - */ - def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = { - val actualFilename = if (filename == "") blockId.name else filename - val file = getFile(actualFilename) - if (!allowAppending && file.exists()) { - throw new IllegalStateException( - "Attempted to create file that already exists: " + actualFilename) - } - file - } - - private def getFile(filename: String): File = { + def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -119,6 +89,8 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit new File(subDir, filename) } + def getFile(blockId: BlockId): File = getFile(blockId.name) + private def createLocalDirs(): Array[File] = { logDebug("Creating local directories at root dirs '" + rootDirs + "'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") @@ -151,10 +123,6 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit } } - private def cleanup(cleanupTime: Long) { - blockToFileSegmentMap.clearOldValues(cleanupTime) - } - private def addShutdownHook() { localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index a3c496f9e0..5a1e7b4444 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -44,7 +44,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val bytes = _bytes.duplicate() logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis - val file = diskManager.createBlockFile(blockId, allowAppending = false) + val file = diskManager.getFile(blockId) val channel = new FileOutputStream(file).getChannel() while (bytes.remaining > 0) { channel.write(bytes) @@ -64,7 +64,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis - val file = diskManager.createBlockFile(blockId, allowAppending = false) + val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) blockManager.dataSerializeStream(blockId, outputStream, values.iterator) val length = file.length diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 066e45a12b..c61febf830 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -17,17 +17,29 @@ package org.apache.spark.storage +import java.io.File +import java.util import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConversions._ +import scala.collection.mutable + +import org.apache.spark.Logging import org.apache.spark.serializer.Serializer +import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, AGodDamnPrimitiveVector, TimeStampedHashMap} private[spark] -class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter]) +class ShuffleWriterGroup( + val mapId: Int, + val fileGroup: ShuffleFileGroup, + val writers: Array[BlockObjectWriter]) private[spark] trait ShuffleBlocks { + /** Get a group of writers for this map task. */ def acquireWriters(mapId: Int): ShuffleWriterGroup + def releaseWriters(group: ShuffleWriterGroup) } @@ -46,51 +58,219 @@ trait ShuffleBlocks { * time owns a particular fileId, and this id is returned to a pool when the task finishes. */ private[spark] -class ShuffleBlockManager(blockManager: BlockManager) { +class ShuffleBlockManager(blockManager: BlockManager) extends Logging { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean - var nextFileId = new AtomicInteger(0) - val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]() + /** + * Contains a pool of unused ShuffleFileGroups. + * One group is needed per concurrent thread (mapper) operating on the same shuffle. + */ + private class ShuffleFileGroupPool { + private val nextFileId = new AtomicInteger(0) + private val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + + def getNextFileId() = nextFileId.getAndIncrement() + def getUnusedFileGroup() = unusedFileGroups.poll() + def returnFileGroup(group: ShuffleFileGroup) = unusedFileGroups.add(group) + def returnFileGroups(groups: Seq[ShuffleFileGroup]) = unusedFileGroups.addAll(groups) + } + + type ShuffleId = Int + private val shuffleToFileGroupPoolMap = new TimeStampedHashMap[ShuffleId, ShuffleFileGroupPool] + + /** + * Maps reducers (of a particular shuffle) to the set of files that have blocks destined for them. + * Each reducer will have one ShuffleFile per concurrent thread that executed during mapping. + */ + private val shuffleToReducerToFilesMap = + new TimeStampedHashMap[ShuffleId, Array[ConcurrentLinkedQueue[ShuffleFile]]] + + private + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = { + initializeShuffleMetadata(shuffleId, numBuckets) + new ShuffleBlocks { - // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - val fileId = getUnusedFileId() + val fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets) val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) if (consolidateShuffleFiles) { - val filename = physicalFileName(shuffleId, bucketId, fileId) - blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) + blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize) } else { - blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) + val blockFile = blockManager.diskBlockManager.getFile(blockId) + blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } } - new ShuffleWriterGroup(mapId, fileId, writers) + new ShuffleWriterGroup(mapId, fileGroup, writers) } override def releaseWriters(group: ShuffleWriterGroup) { - recycleFileId(group.fileId) + if (consolidateShuffleFiles) { + val fileGroup = group.fileGroup + fileGroup.addMapper(group.mapId) + for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) { + shuffleFile.recordMapOutput(writer.fileSegment().offset) + } + recycleFileGroup(shuffleId, fileGroup) + } } } } - private def getUnusedFileId(): Int = { - val fileId = unusedFileIds.poll() - if (fileId == null) nextFileId.getAndIncrement() else fileId + def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) { + val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool()) + if (prev == None) { + val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets) + for (reducerId <- 0 until numBuckets) { + reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]() + } + shuffleToReducerToFilesMap.put(shuffleId, reducerToFilesMap) + } } - private def recycleFileId(fileId: Int) { - if (consolidateShuffleFiles) { - unusedFileIds.add(fileId) + private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = { + if (!consolidateShuffleFiles) { return null } + + val pool = shuffleToFileGroupPoolMap(shuffleId) + var fileGroup = pool.getUnusedFileGroup() + + // If we reuse a file group, ensure we maintain mapId monotonicity. + val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]() + while (fileGroup != null && fileGroup.maxMapId >= mapId) { + fileGroupsToReturn += fileGroup + fileGroup = pool.getUnusedFileGroup() } + pool.returnFileGroups(fileGroupsToReturn) // re-add incompatible file groups + + if (fileGroup == null) { + val fileId = pool.getNextFileId() + val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId => + val filename = physicalFileName(shuffleId, bucketId, fileId) + val file = blockManager.diskBlockManager.getFile(filename) + val shuffleFile = new ShuffleFile(file) + shuffleToReducerToFilesMap(shuffleId)(bucketId).add(shuffleFile) + shuffleFile + } + new ShuffleFileGroup(shuffleId, fileId, files) + } else { + fileGroup + } + } + + private def recycleFileGroup(shuffleId: Int, fileGroup: ShuffleFileGroup) { + shuffleToFileGroupPoolMap(shuffleId).returnFileGroup(fileGroup) + } + + /** + * Returns the physical file segment in which the given BlockId is located. + * If we have no special mapping, None will be returned. + */ + def getBlockLocation(id: ShuffleBlockId): Option[FileSegment] = { + // Search all files associated with the given reducer. + // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m). + if (consolidateShuffleFiles) { + val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId) + for (file <- filesForReducer) { + val segment = file.getFileSegmentFor(id.mapId) + if (segment != None) { return segment } + } + + logInfo("Failed to find shuffle block: " + id) + } + None } private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) } + + private def cleanup(cleanupTime: Long) { + shuffleToFileGroupPoolMap.clearOldValues(cleanupTime) + shuffleToReducerToFilesMap.clearOldValues(cleanupTime) + } +} + +/** + * A group of shuffle files, one per reducer. + * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. + * Mappers must be added in monotonically increasing order by id for efficiency purposes. + */ +private[spark] +class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) { + private val mapIds = new AGodDamnPrimitiveVector[Int]() + + files.foreach(_.setShuffleFileGroup(this)) + + /** The maximum map id (i.e., last added map task) in this file group. */ + def maxMapId = if (mapIds.length > 0) mapIds(mapIds.length - 1) else -1 + + def apply(bucketId: Int) = files(bucketId) + + def addMapper(mapId: Int) { + assert(mapId > maxMapId, "Attempted to insert mapId out-of-order") + mapIds += mapId + } + + /** + * Uses binary search, giving O(log n) runtime. + * NB: Could be improved to amortized O(1) for usual access pattern, where nodes are accessed + * in order of monotonically increasing mapId. That approach is more fragile in general, however. + */ + def indexOf(mapId: Int): Int = { + val index = util.Arrays.binarySearch(mapIds.getUnderlyingArray, 0, mapIds.length, mapId) + if (index >= 0) index else -1 + } +} + +/** + * A single, consolidated shuffle file that may contain many actual blocks. All blocks are destined + * to the same reducer. + */ +private[spark] +class ShuffleFile(val file: File) { + /** + * Consecutive offsets of blocks into the file, ordered by position in the file. + * This ordering allows us to compute block lengths by examining the following block offset. + */ + val blockOffsets = new AGodDamnPrimitiveVector[Long]() + + /** Back pointer to whichever ShuffleFileGroup this file is a part of. */ + private var shuffleFileGroup : ShuffleFileGroup = _ + + // Required due to circular dependency between ShuffleFileGroup and ShuffleFile. + def setShuffleFileGroup(group: ShuffleFileGroup) { + assert(shuffleFileGroup == null) + shuffleFileGroup = group + } + + def recordMapOutput(offset: Long) { + blockOffsets += offset + } + + /** + * Returns the FileSegment associated with the given map task, or + * None if this ShuffleFile does not have an entry for it. + */ + def getFileSegmentFor(mapId: Int): Option[FileSegment] = { + val index = shuffleFileGroup.indexOf(mapId) + if (index >= 0) { + val offset = blockOffsets(index) + val length = + if (index + 1 < blockOffsets.length) { + blockOffsets(index + 1) - offset + } else { + file.length() - offset + } + assert(length >= 0) + return Some(new FileSegment(file, offset, length)) + } else { + None + } + } } diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 3f963727d9..67a7f87a5c 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -59,7 +59,7 @@ object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") { val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, - SHUFFLE_MAP_TASK, BLOCK_MANAGER, DISK_BLOCK_MANAGER, BROADCAST_VARS = Value + SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value type MetadataCleanerType = Value diff --git a/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala new file mode 100644 index 0000000000..d316601b90 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */ +class AGodDamnPrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest] + (initialSize: Int = 64) +{ + private var numElements = 0 + private var array = new Array[V](initialSize) + + def apply(index: Int): V = { + require(index < numElements) + array(index) + } + + def +=(value: V) { + if (numElements == array.length) { resize(array.length * 2) } + array(numElements) = value + numElements += 1 + } + + def length = numElements + + def getUnderlyingArray = array + + /** Resizes the array, dropping elements if the total length decreases. */ + def resize(newLength: Int) { + val newArray = new Array[V](newLength) + array.copyToArray(newArray) + array = newArray + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala new file mode 100644 index 0000000000..12ca920094 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -0,0 +1,80 @@ +package org.apache.spark.storage + +import org.scalatest.{BeforeAndAfterEach, FunSuite} +import java.io.{FileWriter, File} +import java.nio.file.Files +import scala.collection.mutable + +class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { + + val rootDir0 = Files.createTempDirectory("disk-block-manager-suite-0") + val rootDir1 = Files.createTempDirectory("disk-block-manager-suite-1") + val rootDirs = rootDir0.getFileName + "," + rootDir1.getFileName + println("Created root dirs: " + rootDirs) + + val shuffleBlockManager = new ShuffleBlockManager(null) { + var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() + override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap.get(id) + } + + var diskBlockManager: DiskBlockManager = _ + + override def beforeEach() { + diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs) + shuffleBlockManager.idToSegmentMap.clear() + } + + test("basic block creation") { + val blockId = new TestBlockId("test") + assertSegmentEquals(blockId, blockId.name, 0, 0) + + val newFile = diskBlockManager.getFile(blockId) + writeToFile(newFile, 10) + assertSegmentEquals(blockId, blockId.name, 0, 10) + + newFile.delete() + } + + test("block appending") { + val blockId = new TestBlockId("test") + val newFile = diskBlockManager.getFile(blockId) + writeToFile(newFile, 15) + assertSegmentEquals(blockId, blockId.name, 0, 15) + val newFile2 = diskBlockManager.getFile(blockId) + assert(newFile === newFile2) + writeToFile(newFile2, 12) + assertSegmentEquals(blockId, blockId.name, 0, 27) + newFile.delete() + } + + test("block remapping") { + val filename = "test" + val blockId0 = new ShuffleBlockId(1, 2, 3) + val newFile = diskBlockManager.getFile(filename) + writeToFile(newFile, 15) + shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15) + assertSegmentEquals(blockId0, filename, 0, 15) + + val blockId1 = new ShuffleBlockId(1, 2, 4) + val newFile2 = diskBlockManager.getFile(filename) + writeToFile(newFile2, 12) + shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12) + assertSegmentEquals(blockId1, filename, 15, 12) + + assert(newFile === newFile2) + newFile.delete() + } + + def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) { + val segment = diskBlockManager.getBlockLocation(blockId) + assert(segment.file.getName === filename) + assert(segment.offset === offset) + assert(segment.length === length) + } + + def writeToFile(file: File, numBytes: Int) { + val writer = new FileWriter(file, true) + for (i <- 0 until numBytes) writer.write(i) + writer.close() + } +} From 7453f31181a173f1cacb2c957455bf05e52f43c2 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Fri, 1 Nov 2013 21:01:04 -0700 Subject: [PATCH 16/25] Address minor comments --- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 3 ++- .../org/apache/spark/storage/ShuffleBlockManager.scala | 7 ++++--- .../spark/util/{ => collection}/PrimitiveVector.scala | 7 +++---- 3 files changed, 9 insertions(+), 8 deletions(-) rename core/src/main/scala/org/apache/spark/util/{ => collection}/PrimitiveVector.scala (90%) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 4f9537d1c7..bde3d1f592 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -34,7 +34,8 @@ import org.apache.spark.util.Utils * * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ -private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) extends PathResolver with Logging { +private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) + extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index c61febf830..d718c87cab 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -27,7 +27,8 @@ import scala.collection.mutable import org.apache.spark.Logging import org.apache.spark.serializer.Serializer -import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, AGodDamnPrimitiveVector, TimeStampedHashMap} +import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} +import org.apache.spark.util.collection.PrimitiveVector private[spark] class ShuffleWriterGroup( @@ -203,7 +204,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { */ private[spark] class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) { - private val mapIds = new AGodDamnPrimitiveVector[Int]() + private val mapIds = new PrimitiveVector[Int]() files.foreach(_.setShuffleFileGroup(this)) @@ -238,7 +239,7 @@ class ShuffleFile(val file: File) { * Consecutive offsets of blocks into the file, ordered by position in the file. * This ordering allows us to compute block lengths by examining the following block offset. */ - val blockOffsets = new AGodDamnPrimitiveVector[Long]() + val blockOffsets = new PrimitiveVector[Long]() /** Back pointer to whichever ShuffleFileGroup this file is a part of. */ private var shuffleFileGroup : ShuffleFileGroup = _ diff --git a/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala similarity index 90% rename from core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala rename to core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index d316601b90..721f12b711 100644 --- a/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.util.collection /** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */ -class AGodDamnPrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest] - (initialSize: Int = 64) -{ +private[spark] +class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) { private var numElements = 0 private var array = new Array[V](initialSize) From 7d44dec9bd7c4bbfb8daf4843a0968797e009bea Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Fri, 1 Nov 2013 21:04:09 -0700 Subject: [PATCH 17/25] Fix weird bug with specialized PrimitiveVector --- .../org/apache/spark/util/collection/PrimitiveVector.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index 721f12b711..369519c559 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -21,7 +21,11 @@ package org.apache.spark.util.collection private[spark] class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) { private var numElements = 0 - private var array = new Array[V](initialSize) + private var array: Array[V] = _ + + // NB: This must be separate from the declaration, otherwise the specialized parent class + // will get its own array with the same initial size. TODO: Figure out why... + array = new Array[V](initialSize) def apply(index: Int): V = { require(index < numElements) From 1592adfa259860494353babfb48c80b7d1087379 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sat, 2 Nov 2013 00:19:04 -0700 Subject: [PATCH 18/25] Add documentation and address other comments --- .../spark/storage/DiskBlockManager.scala | 12 ++--- .../spark/storage/ShuffleBlockManager.scala | 49 ++++++++++++------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index bde3d1f592..fcd2e97982 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -55,14 +55,12 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. */ def getBlockLocation(blockId: BlockId): FileSegment = { - if (blockId.isShuffle) { - val segment = shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) - if (segment.isDefined) { return segment.get } - // If no special mapping found, assume standard block -> file mapping... + if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) { + shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) + } else { + val file = getFile(blockId.name) + new FileSegment(file, 0, file.length()) } - - val file = getFile(blockId.name) - new FileSegment(file, 0, file.length()) } def getFile(filename: String): File = { diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index d718c87cab..d1e3074683 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -45,18 +45,24 @@ trait ShuffleBlocks { } /** - * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer - * per reducer. + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and + * per reducer (this set of files is called a ShuffleFileGroup). * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle - * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer - * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, - * it releases them for another task. + * blocks are aggregated into the same file. There is one "combined shuffle file" (ShuffleFile) per + * reducer per concurrently executing shuffle task. As soon as a task finishes writing to its + * shuffle files, it releases them for another task. * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: * - shuffleId: The unique id given to the entire shuffle stage. * - bucketId: The id of the output partition (i.e., reducer id) * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a * time owns a particular fileId, and this id is returned to a pool when the task finishes. + * + * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping + * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block + * stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles + * destined for the block's reducer. + * */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) extends Logging { @@ -124,9 +130,9 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { } } - def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) { + private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) { val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool()) - if (prev == None) { + if (!prev.isDefined) { val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets) for (reducerId <- 0 until numBuckets) { reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]() @@ -142,6 +148,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { var fileGroup = pool.getUnusedFileGroup() // If we reuse a file group, ensure we maintain mapId monotonicity. + // This means we may create extra ShuffleFileGroups if we're trying to run a map task + // that is out-of-order with respect to its mapId (which may happen when failures occur). val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]() while (fileGroup != null && fileGroup.maxMapId >= mapId) { fileGroupsToReturn += fileGroup @@ -170,21 +178,19 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { /** * Returns the physical file segment in which the given BlockId is located. - * If we have no special mapping, None will be returned. + * This function should only be called if shuffle file consolidation is enabled, as it is + * an error condition if we don't find the expected block. */ - def getBlockLocation(id: ShuffleBlockId): Option[FileSegment] = { + def getBlockLocation(id: ShuffleBlockId): FileSegment = { // Search all files associated with the given reducer. // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m). - if (consolidateShuffleFiles) { - val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId) - for (file <- filesForReducer) { - val segment = file.getFileSegmentFor(id.mapId) - if (segment != None) { return segment } - } - - logInfo("Failed to find shuffle block: " + id) + val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId) + for (file <- filesForReducer) { + val segment = file.getFileSegmentFor(id.mapId) + if (segment != None) { return segment.get } } - None + + throw new IllegalStateException("Failed to find shuffle block: " + id) } private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { @@ -204,6 +210,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { */ private[spark] class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) { + /** + * Contains the set of mappers that have written to this file group, in the same order as they + * have written to their respective files. + */ private val mapIds = new PrimitiveVector[Int]() files.foreach(_.setShuffleFileGroup(this)) @@ -238,8 +248,9 @@ class ShuffleFile(val file: File) { /** * Consecutive offsets of blocks into the file, ordered by position in the file. * This ordering allows us to compute block lengths by examining the following block offset. + * blockOffsets(i) contains the offset for the mapper in shuffleFileGroup.mapIds(i). */ - val blockOffsets = new PrimitiveVector[Long]() + private val blockOffsets = new PrimitiveVector[Long]() /** Back pointer to whichever ShuffleFileGroup this file is a part of. */ private var shuffleFileGroup : ShuffleFileGroup = _ From 3ca52309f2af07e6dbca56017360a0a814b8f9ca Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sat, 2 Nov 2013 00:29:31 -0700 Subject: [PATCH 19/25] Fix test breakage --- .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 12ca920094..89a7c6ecde 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -14,7 +14,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { val shuffleBlockManager = new ShuffleBlockManager(null) { var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]() - override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap.get(id) + override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id) } var diskBlockManager: DiskBlockManager = _ From 8703898d3f2c6b6e08b3ef91da67876589aba184 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 3 Nov 2013 00:34:53 -0700 Subject: [PATCH 20/25] Address Reynold's comments --- .../spark/storage/ShuffleBlockManager.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index d1e3074683..57b1a28543 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -45,7 +45,7 @@ trait ShuffleBlocks { } /** - * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file * per reducer (this set of files is called a ShuffleFileGroup). * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle @@ -57,11 +57,13 @@ trait ShuffleBlocks { * - bucketId: The id of the output partition (i.e., reducer id) * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a * time owns a particular fileId, and this id is returned to a pool when the task finishes. + * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length) + * that specifies where in a given file the actual block data is located. * * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping - * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block - * stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles - * destined for the block's reducer. + * ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list of offsets for each + * block stored in that file. In order to find the location of a shuffle block, we search all + * ShuffleFiles destined for the block's reducer. * */ private[spark] @@ -98,18 +100,22 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = { + def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { initializeShuffleMetadata(shuffleId, numBuckets) new ShuffleBlocks { override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - val fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets) - val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - if (consolidateShuffleFiles) { + var fileGroup: ShuffleFileGroup = null + val writers = if (consolidateShuffleFiles) { + fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets) + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize) - } else { + } + } else { + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } @@ -142,8 +148,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { } private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = { - if (!consolidateShuffleFiles) { return null } - val pool = shuffleToFileGroupPoolMap(shuffleId) var fileGroup = pool.getUnusedFileGroup() From a0bb569a818f6ce66c192a3f5782ff56cf58b1d3 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 3 Nov 2013 20:45:11 -0800 Subject: [PATCH 21/25] use OpenHashMap, remove monotonicity requirement, fix failure bug --- .../spark/scheduler/ShuffleMapTask.scala | 4 +- .../spark/storage/ShuffleBlockManager.scala | 56 ++++++------------- .../spark/storage/StoragePerfTester.scala | 2 +- .../collection/PrimitiveKeyOpenHashMap.scala | 5 ++ 4 files changed, 26 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 24d97da6eb..c502f8f91a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -148,6 +148,7 @@ private[spark] class ShuffleMapTask( val blockManager = SparkEnv.get.blockManager var shuffle: ShuffleBlocks = null var buckets: ShuffleWriterGroup = null + var success = false try { // Obtain all the block writers for shuffle blocks. @@ -179,6 +180,7 @@ private[spark] class ShuffleMapTask( shuffleMetrics.shuffleWriteTime = totalTime metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) + success = true new MapStatus(blockManager.blockManagerId, compressedSizes) } catch { case e: Exception => // If there is an exception from running the task, revert the partial writes @@ -191,7 +193,7 @@ private[spark] class ShuffleMapTask( // Release the writers back to the shuffle block manager. if (shuffle != null && buckets != null) { buckets.writers.foreach(_.close()) - shuffle.releaseWriters(buckets) + shuffle.releaseWriters(buckets, success) } // Execute the callbacks on task completion. context.executeOnCompleteCallbacks() diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 57b1a28543..8b202ac112 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -28,7 +28,7 @@ import scala.collection.mutable import org.apache.spark.Logging import org.apache.spark.serializer.Serializer import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} -import org.apache.spark.util.collection.PrimitiveVector +import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} private[spark] class ShuffleWriterGroup( @@ -41,7 +41,8 @@ trait ShuffleBlocks { /** Get a group of writers for this map task. */ def acquireWriters(mapId: Int): ShuffleWriterGroup - def releaseWriters(group: ShuffleWriterGroup) + /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ + def releaseWriters(group: ShuffleWriterGroup, success: Boolean) } /** @@ -123,12 +124,14 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { new ShuffleWriterGroup(mapId, fileGroup, writers) } - override def releaseWriters(group: ShuffleWriterGroup) { + override def releaseWriters(group: ShuffleWriterGroup, success: Boolean) { if (consolidateShuffleFiles) { val fileGroup = group.fileGroup - fileGroup.addMapper(group.mapId) - for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) { - shuffleFile.recordMapOutput(writer.fileSegment().offset) + if (success) { + fileGroup.addMapper(group.mapId) + for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) { + shuffleFile.recordMapOutput(writer.fileSegment().offset) + } } recycleFileGroup(shuffleId, fileGroup) } @@ -149,18 +152,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = { val pool = shuffleToFileGroupPoolMap(shuffleId) - var fileGroup = pool.getUnusedFileGroup() - - // If we reuse a file group, ensure we maintain mapId monotonicity. - // This means we may create extra ShuffleFileGroups if we're trying to run a map task - // that is out-of-order with respect to its mapId (which may happen when failures occur). - val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]() - while (fileGroup != null && fileGroup.maxMapId >= mapId) { - fileGroupsToReturn += fileGroup - fileGroup = pool.getUnusedFileGroup() - } - pool.returnFileGroups(fileGroupsToReturn) // re-add incompatible file groups - + val fileGroup = pool.getUnusedFileGroup() if (fileGroup == null) { val fileId = pool.getNextFileId() val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId => @@ -187,7 +179,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { */ def getBlockLocation(id: ShuffleBlockId): FileSegment = { // Search all files associated with the given reducer. - // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m). val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId) for (file <- filesForReducer) { val segment = file.getFileSegmentFor(id.mapId) @@ -210,37 +201,24 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { /** * A group of shuffle files, one per reducer. * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. - * Mappers must be added in monotonically increasing order by id for efficiency purposes. */ private[spark] class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) { /** - * Contains the set of mappers that have written to this file group, in the same order as they - * have written to their respective files. + * Stores the absolute index of each mapId in the files of this group. For instance, + * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. */ - private val mapIds = new PrimitiveVector[Int]() + private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() files.foreach(_.setShuffleFileGroup(this)) - /** The maximum map id (i.e., last added map task) in this file group. */ - def maxMapId = if (mapIds.length > 0) mapIds(mapIds.length - 1) else -1 - def apply(bucketId: Int) = files(bucketId) def addMapper(mapId: Int) { - assert(mapId > maxMapId, "Attempted to insert mapId out-of-order") - mapIds += mapId + mapIdToIndex(mapId) = mapIdToIndex.size } - /** - * Uses binary search, giving O(log n) runtime. - * NB: Could be improved to amortized O(1) for usual access pattern, where nodes are accessed - * in order of monotonically increasing mapId. That approach is more fragile in general, however. - */ - def indexOf(mapId: Int): Int = { - val index = util.Arrays.binarySearch(mapIds.getUnderlyingArray, 0, mapIds.length, mapId) - if (index >= 0) index else -1 - } + def indexOf(mapId: Int): Int = mapIdToIndex.getOrElse(mapId, -1) } /** @@ -252,7 +230,7 @@ class ShuffleFile(val file: File) { /** * Consecutive offsets of blocks into the file, ordered by position in the file. * This ordering allows us to compute block lengths by examining the following block offset. - * blockOffsets(i) contains the offset for the mapper in shuffleFileGroup.mapIds(i). + * Note: shuffleFileGroup.indexOf(mapId) returns the index of the mapper into this array. */ private val blockOffsets = new PrimitiveVector[Long]() @@ -284,7 +262,7 @@ class ShuffleFile(val file: File) { file.length() - offset } assert(length >= 0) - return Some(new FileSegment(file, offset, length)) + Some(new FileSegment(file, offset, length)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala index 7dcadc3805..021f6f6688 100644 --- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala @@ -50,7 +50,7 @@ object StoragePerfTester { w.close() } - shuffle.releaseWriters(buckets) + shuffle.releaseWriters(buckets, true) } val start = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index 4adf9cfb76..a119880884 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -53,6 +53,11 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, _values(pos) } + def getOrElse(k: K, elseValue: V): V = { + val pos = _keySet.getPos(k) + if (pos >= 0) _values(pos) else elseValue + } + /** Set the value for a key */ def update(k: K, v: V) { val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK From 39d93ed4b90b8b302a978df878fd020e7d1fcf56 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 3 Nov 2013 21:52:59 -0800 Subject: [PATCH 22/25] Clean up test files properly For some reason, even calling java.nio.Files.createTempDirectory().getFile.deleteOnExit() does not delete the directory on exit. Guava's analagous function seems to work, however. --- .../spark/storage/DiskBlockManagerSuite.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 89a7c6ecde..0b9056344c 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -1,15 +1,19 @@ package org.apache.spark.storage -import org.scalatest.{BeforeAndAfterEach, FunSuite} import java.io.{FileWriter, File} -import java.nio.file.Files + import scala.collection.mutable +import com.google.common.io.Files +import org.scalatest.{BeforeAndAfterEach, FunSuite} + class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { - val rootDir0 = Files.createTempDirectory("disk-block-manager-suite-0") - val rootDir1 = Files.createTempDirectory("disk-block-manager-suite-1") - val rootDirs = rootDir0.getFileName + "," + rootDir1.getFileName + val rootDir0 = Files.createTempDir() + rootDir0.deleteOnExit() + val rootDir1 = Files.createTempDir() + rootDir1.deleteOnExit() + val rootDirs = rootDir0.getName + "," + rootDir1.getName println("Created root dirs: " + rootDirs) val shuffleBlockManager = new ShuffleBlockManager(null) { From b0cf19fe3c395e84e730e97ec211d7ef935951e1 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 3 Nov 2013 22:16:58 -0800 Subject: [PATCH 23/25] Add javadoc and remove unused code --- .../scala/org/apache/spark/storage/ShuffleBlockManager.scala | 1 - .../apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 8b202ac112..a3bb425208 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -85,7 +85,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { def getNextFileId() = nextFileId.getAndIncrement() def getUnusedFileGroup() = unusedFileGroups.poll() def returnFileGroup(group: ShuffleFileGroup) = unusedFileGroups.add(group) - def returnFileGroups(groups: Seq[ShuffleFileGroup]) = unusedFileGroups.addAll(groups) } type ShuffleId = Int diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index a119880884..d76143e45a 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -53,6 +53,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, _values(pos) } + /** Get the value for a given key, or returns elseValue if it doesn't exist. */ def getOrElse(k: K, elseValue: V): V = { val pos = _keySet.getPos(k) if (pos >= 0) _values(pos) else elseValue From 6201e5e2493b0f9addba57f60d6ddb88e572b858 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 4 Nov 2013 09:41:04 -0800 Subject: [PATCH 24/25] Refactor ShuffleBlockManager to reduce public interface - ShuffleBlocks has been removed and replaced by ShuffleWriterGroup. - ShuffleWriterGroup no longer contains a reference to a ShuffleFileGroup. - ShuffleFile has been removed and its contents are now within ShuffleFileGroup. - ShuffleBlockManager.forShuffle has been replaced by a more stateful forMapTask. --- .../spark/scheduler/ShuffleMapTask.scala | 21 +- .../spark/storage/ShuffleBlockManager.scala | 268 +++++++----------- .../spark/storage/StoragePerfTester.scala | 10 +- 3 files changed, 122 insertions(+), 177 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index c502f8f91a..1dc71a0428 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -146,27 +146,26 @@ private[spark] class ShuffleMapTask( metrics = Some(context.taskMetrics) val blockManager = SparkEnv.get.blockManager - var shuffle: ShuffleBlocks = null - var buckets: ShuffleWriterGroup = null + val shuffleBlockManager = blockManager.shuffleBlockManager + var shuffle: ShuffleWriterGroup = null var success = false try { // Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) - shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) - buckets = shuffle.acquireWriters(partitionId) + shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser) // Write the map output to its associated buckets. for (elem <- rdd.iterator(split, context)) { val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) - buckets.writers(bucketId).write(pair) + shuffle.writers(bucketId).write(pair) } // Commit the writes. Get the size of each bucket block (total block size). var totalBytes = 0L var totalTime = 0L - val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => + val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() val size = writer.fileSegment().length totalBytes += size @@ -185,15 +184,15 @@ private[spark] class ShuffleMapTask( } catch { case e: Exception => // If there is an exception from running the task, revert the partial writes // and throw the exception upstream to Spark. - if (buckets != null) { - buckets.writers.foreach(_.revertPartialWrites()) + if (shuffle != null) { + shuffle.writers.foreach(_.revertPartialWrites()) } throw e } finally { // Release the writers back to the shuffle block manager. - if (shuffle != null && buckets != null) { - buckets.writers.foreach(_.close()) - shuffle.releaseWriters(buckets, success) + if (shuffle != null && shuffle.writers != null) { + shuffle.writers.foreach(_.close()) + shuffle.releaseWriters(success) } // Execute the callbacks on task completion. context.executeOnCompleteCallbacks() diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index a3bb425208..6346db3894 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -18,31 +18,23 @@ package org.apache.spark.storage import java.io.File -import java.util import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ -import scala.collection.mutable import org.apache.spark.Logging import org.apache.spark.serializer.Serializer import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} +import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup -private[spark] -class ShuffleWriterGroup( - val mapId: Int, - val fileGroup: ShuffleFileGroup, - val writers: Array[BlockObjectWriter]) - -private[spark] -trait ShuffleBlocks { - /** Get a group of writers for this map task. */ - def acquireWriters(mapId: Int): ShuffleWriterGroup +/** A group of writers for a ShuffleMapTask, one writer per reducer. */ +private[spark] trait ShuffleWriterGroup { + val writers: Array[BlockObjectWriter] /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */ - def releaseWriters(group: ShuffleWriterGroup, success: Boolean) + def releaseWriters(success: Boolean) } /** @@ -50,9 +42,9 @@ trait ShuffleBlocks { * per reducer (this set of files is called a ShuffleFileGroup). * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle - * blocks are aggregated into the same file. There is one "combined shuffle file" (ShuffleFile) per - * reducer per concurrently executing shuffle task. As soon as a task finishes writing to its - * shuffle files, it releases them for another task. + * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle + * files, it releases them for another task. * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: * - shuffleId: The unique id given to the entire shuffle stage. * - bucketId: The id of the output partition (i.e., reducer id) @@ -62,10 +54,9 @@ trait ShuffleBlocks { * that specifies where in a given file the actual block data is located. * * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping - * ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list of offsets for each - * block stored in that file. In order to find the location of a shuffle block, we search all - * ShuffleFiles destined for the block's reducer. - * + * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for + * each block stored in each file. In order to find the location of a shuffle block, we search the + * files within a ShuffleFileGroups associated with the block's reducer. */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) extends Logging { @@ -74,102 +65,74 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { val consolidateShuffleFiles = System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean - /** - * Contains a pool of unused ShuffleFileGroups. - * One group is needed per concurrent thread (mapper) operating on the same shuffle. - */ - private class ShuffleFileGroupPool { - private val nextFileId = new AtomicInteger(0) - private val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - def getNextFileId() = nextFileId.getAndIncrement() - def getUnusedFileGroup() = unusedFileGroups.poll() - def returnFileGroup(group: ShuffleFileGroup) = unusedFileGroups.add(group) + /** + * Contains all the state related to a particular shuffle. This includes a pool of unused + * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle. + */ + private class ShuffleState() { + val nextFileId = new AtomicInteger(0) + val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() + val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]() } type ShuffleId = Int - private val shuffleToFileGroupPoolMap = new TimeStampedHashMap[ShuffleId, ShuffleFileGroupPool] - - /** - * Maps reducers (of a particular shuffle) to the set of files that have blocks destined for them. - * Each reducer will have one ShuffleFile per concurrent thread that executed during mapping. - */ - private val shuffleToReducerToFilesMap = - new TimeStampedHashMap[ShuffleId, Array[ConcurrentLinkedQueue[ShuffleFile]]] + private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { - initializeShuffleMetadata(shuffleId, numBuckets) + def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = + new ShuffleWriterGroup { + shuffleStates.putIfAbsent(shuffleId, new ShuffleState()) + private val shuffleState = shuffleStates(shuffleId) + private var fileGroup: ShuffleFileGroup = null - new ShuffleBlocks { - override def acquireWriters(mapId: Int): ShuffleWriterGroup = { - val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 - var fileGroup: ShuffleFileGroup = null - val writers = if (consolidateShuffleFiles) { - fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets) - Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize) - } - } else { - Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => - val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - val blockFile = blockManager.diskBlockManager.getFile(blockId) - blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) - } + val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) { + fileGroup = getUnusedFileGroup() + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) + blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize) + } + } else { + Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => + val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) + val blockFile = blockManager.diskBlockManager.getFile(blockId) + blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } - new ShuffleWriterGroup(mapId, fileGroup, writers) } - override def releaseWriters(group: ShuffleWriterGroup, success: Boolean) { + override def releaseWriters(success: Boolean) { if (consolidateShuffleFiles) { - val fileGroup = group.fileGroup if (success) { - fileGroup.addMapper(group.mapId) - for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) { - shuffleFile.recordMapOutput(writer.fileSegment().offset) - } + val offsets = writers.map(_.fileSegment().offset) + fileGroup.recordMapOutput(mapId, offsets) } - recycleFileGroup(shuffleId, fileGroup) + recycleFileGroup(fileGroup) } } - } - } - private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) { - val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool()) - if (!prev.isDefined) { - val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets) - for (reducerId <- 0 until numBuckets) { - reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]() + private def getUnusedFileGroup(): ShuffleFileGroup = { + val fileGroup = shuffleState.unusedFileGroups.poll() + if (fileGroup != null) fileGroup else newFileGroup() } - shuffleToReducerToFilesMap.put(shuffleId, reducerToFilesMap) - } - } - private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = { - val pool = shuffleToFileGroupPoolMap(shuffleId) - val fileGroup = pool.getUnusedFileGroup() - if (fileGroup == null) { - val fileId = pool.getNextFileId() - val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId => - val filename = physicalFileName(shuffleId, bucketId, fileId) - val file = blockManager.diskBlockManager.getFile(filename) - val shuffleFile = new ShuffleFile(file) - shuffleToReducerToFilesMap(shuffleId)(bucketId).add(shuffleFile) - shuffleFile + private def newFileGroup(): ShuffleFileGroup = { + val fileId = shuffleState.nextFileId.getAndIncrement() + val files = Array.tabulate[File](numBuckets) { bucketId => + val filename = physicalFileName(shuffleId, bucketId, fileId) + blockManager.diskBlockManager.getFile(filename) + } + val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files) + shuffleState.allFileGroups.add(fileGroup) + fileGroup } - new ShuffleFileGroup(shuffleId, fileId, files) - } else { - fileGroup - } - } - private def recycleFileGroup(shuffleId: Int, fileGroup: ShuffleFileGroup) { - shuffleToFileGroupPoolMap(shuffleId).returnFileGroup(fileGroup) - } + private def recycleFileGroup(group: ShuffleFileGroup) { + shuffleState.unusedFileGroups.add(group) + } + } /** * Returns the physical file segment in which the given BlockId is located. @@ -177,13 +140,12 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { * an error condition if we don't find the expected block. */ def getBlockLocation(id: ShuffleBlockId): FileSegment = { - // Search all files associated with the given reducer. - val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId) - for (file <- filesForReducer) { - val segment = file.getFileSegmentFor(id.mapId) - if (segment != None) { return segment.get } + // Search all file groups associated with this shuffle. + val shuffleState = shuffleStates(id.shuffleId) + for (fileGroup <- shuffleState.allFileGroups) { + val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) + if (segment.isDefined) { return segment.get } } - throw new IllegalStateException("Failed to find shuffle block: " + id) } @@ -192,78 +154,62 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { } private def cleanup(cleanupTime: Long) { - shuffleToFileGroupPoolMap.clearOldValues(cleanupTime) - shuffleToReducerToFilesMap.clearOldValues(cleanupTime) + shuffleStates.clearOldValues(cleanupTime) } } -/** - * A group of shuffle files, one per reducer. - * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. - */ private[spark] -class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) { +object ShuffleBlockManager { /** - * Stores the absolute index of each mapId in the files of this group. For instance, - * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. + * A group of shuffle files, one per reducer. + * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. */ - private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() + private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) { + /** + * Stores the absolute index of each mapId in the files of this group. For instance, + * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. + */ + private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() - files.foreach(_.setShuffleFileGroup(this)) + /** + * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file. + * This ordering allows us to compute block lengths by examining the following block offset. + * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every + * reducer. + */ + private val blockOffsetsByReducer = Array.tabulate[PrimitiveVector[Long]](files.length) { _ => + new PrimitiveVector[Long]() + } - def apply(bucketId: Int) = files(bucketId) + def numBlocks = mapIdToIndex.size - def addMapper(mapId: Int) { - mapIdToIndex(mapId) = mapIdToIndex.size - } + def apply(bucketId: Int) = files(bucketId) - def indexOf(mapId: Int): Int = mapIdToIndex.getOrElse(mapId, -1) -} + def recordMapOutput(mapId: Int, offsets: Array[Long]) { + mapIdToIndex(mapId) = numBlocks + for (i <- 0 until offsets.length) { + blockOffsetsByReducer(i) += offsets(i) + } + } -/** - * A single, consolidated shuffle file that may contain many actual blocks. All blocks are destined - * to the same reducer. - */ -private[spark] -class ShuffleFile(val file: File) { - /** - * Consecutive offsets of blocks into the file, ordered by position in the file. - * This ordering allows us to compute block lengths by examining the following block offset. - * Note: shuffleFileGroup.indexOf(mapId) returns the index of the mapper into this array. - */ - private val blockOffsets = new PrimitiveVector[Long]() - - /** Back pointer to whichever ShuffleFileGroup this file is a part of. */ - private var shuffleFileGroup : ShuffleFileGroup = _ - - // Required due to circular dependency between ShuffleFileGroup and ShuffleFile. - def setShuffleFileGroup(group: ShuffleFileGroup) { - assert(shuffleFileGroup == null) - shuffleFileGroup = group - } - - def recordMapOutput(offset: Long) { - blockOffsets += offset - } - - /** - * Returns the FileSegment associated with the given map task, or - * None if this ShuffleFile does not have an entry for it. - */ - def getFileSegmentFor(mapId: Int): Option[FileSegment] = { - val index = shuffleFileGroup.indexOf(mapId) - if (index >= 0) { - val offset = blockOffsets(index) - val length = - if (index + 1 < blockOffsets.length) { - blockOffsets(index + 1) - offset - } else { - file.length() - offset - } - assert(length >= 0) - Some(new FileSegment(file, offset, length)) - } else { - None + /** Returns the FileSegment associated with the given map task, or None if no entry exists. */ + def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = { + val file = files(reducerId) + val blockOffsets = blockOffsetsByReducer(reducerId) + val index = mapIdToIndex.getOrElse(mapId, -1) + if (index >= 0) { + val offset = blockOffsets(index) + val length = + if (index + 1 < numBlocks) { + blockOffsets(index + 1) - offset + } else { + file.length() - offset + } + assert(length >= 0) + Some(new FileSegment(file, offset, length)) + } else { + None + } } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala index 021f6f6688..1e4db4f66b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala @@ -38,19 +38,19 @@ object StoragePerfTester { val blockManager = sc.env.blockManager def writeOutputBytes(mapId: Int, total: AtomicLong) = { - val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits, + val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, new KryoSerializer()) - val buckets = shuffle.acquireWriters(mapId) + val writers = shuffle.writers for (i <- 1 to recordsPerMap) { - buckets.writers(i % numOutputSplits).write(writeData) + writers(i % numOutputSplits).write(writeData) } - buckets.writers.map {w => + writers.map {w => w.commit() total.addAndGet(w.fileSegment().length) w.close() } - shuffle.releaseWriters(buckets, true) + shuffle.releaseWriters(true) } val start = System.currentTimeMillis() From 1ba11b1c6aeda084cb158262ec0aa37a7b70fe32 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 4 Nov 2013 17:16:20 -0800 Subject: [PATCH 25/25] Minor cleanup in ShuffleBlockManager --- .../org/apache/spark/storage/ShuffleBlockManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 6346db3894..2f1b049ce4 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ -import org.apache.spark.Logging import org.apache.spark.serializer.Serializer import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} @@ -59,7 +58,7 @@ private[spark] trait ShuffleWriterGroup { * files within a ShuffleFileGroups associated with the block's reducer. */ private[spark] -class ShuffleBlockManager(blockManager: BlockManager) extends Logging { +class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = @@ -83,7 +82,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup) - def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = + def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleWriterGroup { shuffleStates.putIfAbsent(shuffleId, new ShuffleState()) private val shuffleState = shuffleStates(shuffleId) @@ -133,6 +132,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { shuffleState.unusedFileGroups.add(group) } } + } /** * Returns the physical file segment in which the given BlockId is located. @@ -177,7 +177,7 @@ object ShuffleBlockManager { * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every * reducer. */ - private val blockOffsetsByReducer = Array.tabulate[PrimitiveVector[Long]](files.length) { _ => + private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) { new PrimitiveVector[Long]() }