Remove unneeded zipWithIndex.

Also rename r->rdd and remove unneeded extra type info.
This commit is contained in:
Stephen Haberman 2013-02-05 21:26:44 -06:00
parent f2bc748013
commit f4d43cb43e

View file

@ -47,7 +47,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
@transient var deps_ = {
val deps = new ArrayBuffer[Dependency[_]]
for ((rdd, index) <- rdds.zipWithIndex) {
for (rdd <- rdds) {
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
deps += new OneToOneDependency(rdd)
@ -65,12 +65,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
@transient var splits_ : Array[Split] = {
val array = new Array[Split](part.numPartitions)
for (i <- 0 until array.size) {
array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
// Each CoGroupSplit will have a dependency per contributing RDD
array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _] =>
new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
new ShuffleCoGroupSplitDep(s.shuffleId)
case _ =>
new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep
new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
}
}.toList)
}
@ -97,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
}
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
for ((k, v) <- rdd.iterator(itsSplit, context)) {
getSeq(k.asInstanceOf[K])(depNum) += v