From 80fc8c82ed3d7c16aa722bdf8ba60e1f2a763c29 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Sat, 16 Mar 2013 12:16:29 -0700 Subject: [PATCH] _With[Matei] --- core/src/main/scala/spark/RDD.scala | 34 ++++++++++++------------ core/src/test/scala/spark/RDDSuite.scala | 6 ++--- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index b6defdd418..96d5c0b80c 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -369,25 +369,25 @@ abstract class RDD[T: ClassManifest]( * additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def mapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false) - (f:(A, T) => U): RDD[U] = { + def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) + (f:(T, A) => U): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { - val a = constructorOfA(index) - iter.map(t => f(a, t)) + val a = constructA(index) + iter.map(t => f(t, a)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } - /** + /** * FlatMaps f over this RDD, where f takes an additional parameter of type A. This * additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def flatMapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false) - (f:(A, T) => Seq[U]): RDD[U] = { + def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false) + (f:(T, A) => Seq[U]): RDD[U] = { def iterF(index: Int, iter: Iterator[T]): Iterator[U] = { - val a = constructorOfA(index) - iter.flatMap(t => f(a, t)) + val a = constructA(index) + iter.flatMap(t => f(t, a)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning) } @@ -397,11 +397,11 @@ abstract class RDD[T: ClassManifest]( * This additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def foreachWith[A: ClassManifest](constructorOfA: Int => A) - (f:(A, T) => Unit) { + def foreachWith[A: ClassManifest](constructA: Int => A) + (f:(T, A) => Unit) { def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { - val a = constructorOfA(index) - iter.map(t => {f(a, t); t}) + val a = constructA(index) + iter.map(t => {f(t, a); t}) } (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {}) } @@ -411,11 +411,11 @@ abstract class RDD[T: ClassManifest]( * additional parameter is produced by constructorOfA, which is called in each * partition with the index of that partition. */ - def filterWith[A: ClassManifest](constructorOfA: Int => A) - (p:(A, T) => Boolean): RDD[T] = { + def filterWith[A: ClassManifest](constructA: Int => A) + (p:(T, A) => Boolean): RDD[T] = { def iterF(index: Int, iter: Iterator[T]): Iterator[T] = { - val a = constructorOfA(index) - iter.filter(t => p(a, t)) + val a = constructA(index) + iter.filter(t => p(t, a)) } new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true) } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 3d925798b7..33281d3c82 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -185,7 +185,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.mapWith( (index: Int) => new Random(index + 42)) - {(prng: Random, t: Int) => prng.nextDouble * t}.collect() + {(t: Int, prng: Random) => prng.nextDouble * t}.collect() val prn42_3 = { val prng42 = new Random(42) prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble() @@ -204,7 +204,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.flatMapWith( (index: Int) => new Random(index + 42)) - {(prng: Random, t: Int) => + {(t: Int, prng: Random) => val random = prng.nextDouble() Seq(random * t, random * t * 10)}. collect() @@ -226,7 +226,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) val sample = ints.filterWith( (index: Int) => new Random(index + 42)) - {(prng: Random, t: Int) => prng.nextInt(3) == 0}. + {(t: Int, prng: Random) => prng.nextInt(3) == 0}. collect() val checkSample = { val prng42 = new Random(42)