Reversing the argument order in zipPartitions to enable stronger type inference.

This commit is contained in:
Joseph E. Gonzalez 2013-08-15 17:14:58 -07:00
parent 1a13460cb0
commit 53b2639a1e
4 changed files with 14 additions and 17 deletions

View file

@ -515,22 +515,19 @@ abstract class RDD[T: ClassManifest](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
def zipPartitions[B: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B]) => Iterator[V],
rdd2: RDD[B]): RDD[V] =
def zipPartitions[B: ClassManifest, V: ClassManifest]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C]): RDD[V] =
def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
rdd2: RDD[B],
rdd3: RDD[C],
rdd4: RDD[D]): RDD[V] =
def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)

View file

@ -207,12 +207,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of elements in each partition.
*/
def zipPartitions[U, V](
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V],
other: JavaRDDLike[U, _]): JavaRDD[V] = {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
rdd.zipPartitions(fn, other.rdd)(other.classManifest, f.elementType()))(f.elementType())
rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
}
// Actions (launch a job to return a value to the user program)

View file

@ -748,7 +748,7 @@ public class JavaAPISuite implements Serializable {
}
};
JavaRDD<Integer> sizes = rdd1.zipPartitions(sizesFn, rdd2);
JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}

View file

@ -40,7 +40,7 @@ class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)