Updated PruneDependency to change "split" to "partition".
This commit is contained in:
parent
eedc542a02
commit
c109f29c97
|
@ -64,14 +64,14 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
|
|||
|
||||
|
||||
/**
|
||||
* Represents a dependency between the SplitsPruningRDD and its parent. In this
|
||||
* case, the child RDD contains a subset of splits of the parents'.
|
||||
* Represents a dependency between the PartitionPruningRDD and its parent. In this
|
||||
* case, the child RDD contains a subset of partitions of the parents'.
|
||||
*/
|
||||
class PruneDependency[T](rdd: RDD[T], @transient splitsFilterFunc: Int => Boolean)
|
||||
class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
|
||||
extends NarrowDependency[T](rdd) {
|
||||
|
||||
@transient
|
||||
val splits: Array[Split] = rdd.splits.filter(s => splitsFilterFunc(s.index))
|
||||
val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index))
|
||||
|
||||
override def getParents(partitionId: Int) = List(splits(partitionId).index)
|
||||
override def getParents(partitionId: Int) = List(partitions(partitionId).index)
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ class PartitionPruningRDD[T: ClassManifest](
|
|||
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
|
||||
|
||||
@transient
|
||||
val partitions_ : Array[Split] = dependencies_.head.asInstanceOf[PruneDependency[T]].splits
|
||||
val partitions_ : Array[Split] = dependencies_.head.asInstanceOf[PruneDependency[T]].partitions
|
||||
|
||||
override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context)
|
||||
|
||||
|
|
Loading…
Reference in a new issue