diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 590106388a..505ef76941 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -163,7 +163,7 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( extends RDD[U](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator + override def compute(split: Split) = prev.iterator(split).flatMap(f) } class FilteredRDD[T: ClassManifest]( @@ -314,8 +314,8 @@ extends RDD[(K, U)](prev.context) { override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner override def compute(split: Split) = { - prev.iterator(split).toStream.flatMap { + prev.iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) - }.iterator + } } }