From 47507d69d98b47685d667d7ec52dcc4116465eb1 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 28 Aug 2012 22:40:00 -0700 Subject: [PATCH 1/4] Made region used by spark-ec2 configurable. --- ec2/spark_ec2.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 931e4068de..ddd3d5aa6d 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -59,7 +59,9 @@ def parse_args(): "WARNING: must be 64-bit; small instances won't work") parser.add_option("-m", "--master-instance-type", default="", help="Master instance type (leave empty for same as instance-type)") - parser.add_option("-z", "--zone", default="us-east-1b", + parser.add_option("-r", "--region", default="us-east-1", + help="EC2 region zone to launch instances in") + parser.add_option("-z", "--zone", default="", help="Availability zone to launch instances in") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + @@ -470,7 +472,7 @@ def ssh(host, opts, command): def main(): (opts, action, cluster_name) = parse_args() - conn = boto.connect_ec2() + conn = boto.ec2.connect_to_region(opts.region) # Select an AZ at random if it was not specified. if opts.zone == "": From 940869dfdad5c785404e16f63681a96b885c749a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 29 Aug 2012 23:00:02 -0700 Subject: [PATCH 2/4] Disable running combiners on map tasks when mergeCombiners function is not specified by the user. --- core/src/main/scala/spark/ShuffledRDD.scala | 38 +++++++++++++----- .../spark/scheduler/ShuffleMapTask.scala | 39 +++++++++++++------ 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 594dbd235f..8293048caa 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -27,16 +27,36 @@ class ShuffledRDD[K, V, C]( override def compute(split: Split): Iterator[(K, C)] = { val combiners = new JHashMap[K, C] - def mergePair(k: K, c: C) { - val oldC = combiners.get(k) - if (oldC == null) { - combiners.put(k, c) - } else { - combiners.put(k, aggregator.mergeCombiners(oldC, c)) - } - } val fetcher = SparkEnv.get.shuffleFetcher - fetcher.fetch[K, C](dep.shuffleId, split.index, mergePair) + + if (aggregator.mergeCombiners != null) { + // If mergeCombiners is specified, combiners are applied on the map + // partitions. In this case, post-shuffle we get a list of outputs from + // the combiners and merge them using mergeCombiners. + def mergePairWithMapSideCombiners(k: K, c: C) { + val oldC = combiners.get(k) + if (oldC == null) { + combiners.put(k, c) + } else { + combiners.put(k, aggregator.mergeCombiners(oldC, c)) + } + } + fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners) + } else { + // If mergeCombiners is not specified, no combiner is applied on the map + // partitions (i.e. map side aggregation is turned off). Post-shuffle we + // get a list of values and we use mergeValue to merge them. + def mergePairWithoutMapSideCombiners(k: K, v: V) { + val oldC = combiners.get(k) + if (oldC == null) { + combiners.put(k, aggregator.createCombiner(v)) + } else { + combiners.put(k, aggregator.mergeValue(oldC, v)) + } + } + fetcher.fetch[K, V](dep.shuffleId, split.index, mergePairWithoutMapSideCombiners) + } + return new Iterator[(K, C)] { var iter = combiners.entrySet().iterator() diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index e0e050d7c9..4828039bbd 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -104,27 +104,44 @@ class ShuffleMapTask( val numOutputSplits = dep.partitioner.numPartitions val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] val partitioner = dep.partitioner - val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) - for (elem <- rdd.iterator(split)) { - val (k, v) = elem.asInstanceOf[(Any, Any)] - var bucketId = partitioner.getPartition(k) - val bucket = buckets(bucketId) - var existing = bucket.get(k) - if (existing == null) { - bucket.put(k, aggregator.createCombiner(v)) + + val bucketIterators = + if (aggregator.mergeCombiners != null) { + // Apply combiners (map-side aggregation) to the map output. + val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) + for (elem <- rdd.iterator(split)) { + val (k, v) = elem.asInstanceOf[(Any, Any)] + val bucketId = partitioner.getPartition(k) + val bucket = buckets(bucketId) + val existing = bucket.get(k) + if (existing == null) { + bucket.put(k, aggregator.createCombiner(v)) + } else { + bucket.put(k, aggregator.mergeValue(existing, v)) + } + } + buckets.map(_.iterator) } else { - bucket.put(k, aggregator.mergeValue(existing, v)) + // No combiners (no map-side aggregation). Simply partition the map output. + val buckets = Array.tabulate(numOutputSplits)(_ => new ArrayBuffer[(Any, Any)]) + for (elem <- rdd.iterator(split)) { + val pair = elem.asInstanceOf[(Any, Any)] + val bucketId = partitioner.getPartition(pair._1) + buckets(bucketId) += pair + } + buckets.map(_.iterator) } - } + val ser = SparkEnv.get.serializer.newInstance() val blockManager = SparkEnv.get.blockManager for (i <- 0 until numOutputSplits) { val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i // Get a scala iterator from java map - val iter: Iterator[(Any, Any)] = buckets(i).iterator + val iter: Iterator[(Any, Any)] = bucketIterators(i) // TODO: This should probably be DISK_ONLY blockManager.put(blockId, iter, StorageLevel.MEMORY_ONLY, false) } + return SparkEnv.get.blockManager.blockManagerId } From 5945bcdcc56a71324357b02c21bef80dd7efd13a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 29 Aug 2012 23:32:08 -0700 Subject: [PATCH 3/4] Added a new flag in Aggregator to indicate applying map side combiners. --- core/src/main/scala/spark/Aggregator.scala | 12 +++++++++++- core/src/main/scala/spark/ShuffledRDD.scala | 13 ++++++------- .../main/scala/spark/scheduler/ShuffleMapTask.scala | 2 +- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 6f99270b1e..6516bea157 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -1,7 +1,17 @@ package spark +/** A set of functions used to aggregate data. + * + * @param createCombiner function to create the initial value of the aggregation. + * @param mergeValue function to merge a new value into the aggregation result. + * @param mergeCombiners function to merge outputs from multiple mergeValue function. + * @param mapSideCombine whether to apply combiners on map partitions, also + * known as map-side aggregations. When set to false, + * mergeCombiners function is not used. + */ class Aggregator[K, V, C] ( val createCombiner: V => C, val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C) + val mergeCombiners: (C, C) => C, + val mapSideCombine: Boolean = true) extends Serializable diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 8293048caa..3616d8e47e 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -29,10 +29,9 @@ class ShuffledRDD[K, V, C]( val combiners = new JHashMap[K, C] val fetcher = SparkEnv.get.shuffleFetcher - if (aggregator.mergeCombiners != null) { - // If mergeCombiners is specified, combiners are applied on the map - // partitions. In this case, post-shuffle we get a list of outputs from - // the combiners and merge them using mergeCombiners. + if (aggregator.mapSideCombine) { + // Apply combiners on map partitions. In this case, post-shuffle we get a + // list of outputs from the combiners and merge them using mergeCombiners. def mergePairWithMapSideCombiners(k: K, c: C) { val oldC = combiners.get(k) if (oldC == null) { @@ -43,9 +42,9 @@ class ShuffledRDD[K, V, C]( } fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners) } else { - // If mergeCombiners is not specified, no combiner is applied on the map - // partitions (i.e. map side aggregation is turned off). Post-shuffle we - // get a list of values and we use mergeValue to merge them. + // Do not apply combiners on map partitions (i.e. map side aggregation is + // turned off). Post-shuffle we get a list of values and we use mergeValue + // to merge them. def mergePairWithoutMapSideCombiners(k: K, v: V) { val oldC = combiners.get(k) if (oldC == null) { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 940932cc51..a281ae94c5 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -108,7 +108,7 @@ class ShuffleMapTask( val partitioner = dep.partitioner val bucketIterators = - if (aggregator.mergeCombiners != null) { + if (aggregator.mapSideCombine) { // Apply combiners (map-side aggregation) to the map output. val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any]) for (elem <- rdd.iterator(split)) { From a8a2a08a1a7e652920702f25a89e43788d538d05 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 30 Aug 2012 12:34:28 -0700 Subject: [PATCH 4/4] Added a test for testing map-side combine on/off switch. --- core/src/test/scala/spark/ShuffleSuite.scala | 45 +++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 99d13b31ef..f622c413f7 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -2,6 +2,7 @@ package spark import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.ShouldMatchers import org.scalatest.prop.Checkers import org.scalacheck.Arbitrary._ import org.scalacheck.Gen @@ -13,7 +14,7 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ -class ShuffleSuite extends FunSuite with BeforeAndAfter { +class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter { var sc: SparkContext = _ @@ -196,4 +197,46 @@ class ShuffleSuite extends FunSuite with BeforeAndAfter { // Test that a shuffle on the file works, because this used to be a bug assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) } + + test("map-side combine") { + sc = new SparkContext("local", "test") + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1), (1, 1)), 2) + + // Test with map-side combine on. + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 8), (2, 1))) + + // Turn off map-side combine and test the results. + val aggregator = new Aggregator[Int, Int, Int]( + (v: Int) => v, + _+_, + _+_, + false) + val shuffledRdd = new ShuffledRDD( + pairs, aggregator, new HashPartitioner(2)) + assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1))) + + // Turn map-side combine off and pass a wrong mergeCombine function. Should + // not see an exception because mergeCombine should not have been called. + val aggregatorWithException = new Aggregator[Int, Int, Int]( + (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false) + val shuffledRdd1 = new ShuffledRDD( + pairs, aggregatorWithException, new HashPartitioner(2)) + assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1))) + + // Now run the same mergeCombine function with map-side combine on. We + // expect to see an exception thrown. + val aggregatorWithException1 = new Aggregator[Int, Int, Int]( + (v: Int) => v, _+_, ShuffleSuite.mergeCombineException) + val shuffledRdd2 = new ShuffledRDD( + pairs, aggregatorWithException1, new HashPartitioner(2)) + evaluating { shuffledRdd2.collect() } should produce [SparkException] + } +} + +object ShuffleSuite { + def mergeCombineException(x: Int, y: Int): Int = { + throw new SparkException("Exception for map-side combine.") + x + y + } }