Removed the mapSideCombine option in CoGroupedRDD.

This commit is contained in:
Reynold Xin 2013-08-17 21:07:34 -07:00
parent 5d050a3e1f
commit 10af952a3d

View file

@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@ -52,13 +52,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
override def hashCode(): Int = idx
}
private[spark] class CoGroupAggregator
extends Aggregator[Any, Any, ArrayBuffer[Any]](
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 })
with Serializable
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
@ -66,37 +59,24 @@ private[spark] class CoGroupAggregator
*
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
* @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
* is on, Spark does an extra pass over the data on the map side to merge
* all values belonging to the same key together. This can reduce the amount
* of data shuffled if and only if the number of distinct keys is very small,
* and the ratio of key size to value size is also very small.
*/
class CoGroupedRDD[K](
@transient var rdds: Seq[RDD[(K, _)]],
part: Partitioner,
val mapSideCombine: Boolean = false,
val serializerClass: String = null)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private val aggr = new CoGroupAggregator
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd =>
rdds.map { rdd: RDD[(K, _)] =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
if (mapSideCombine) {
val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass)
} else {
new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
}
}
}
}
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
@ -145,19 +125,11 @@ class CoGroupedRDD[K](
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
if (mapSideCombine) {
// With map side combine on, for each key, the shuffle fetcher returns a list of values.
fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
case (key, values) => getSeq(key)(depNum) ++= values
}
} else {
// With map side combine off, for each key the shuffle fetcher returns a single value.
fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
case (key, value) => getSeq(key)(depNum) += value
}
}
}
}
JavaConversions.mapAsScalaMap(map).iterator
}