From bfcddf4700023f53d5eed92ef8ef75c072af3ced Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 14 Sep 2013 15:53:42 -0700 Subject: [PATCH] Make mapPartitionsWithIndex work with JavaRDD's --- .../main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 9ad175ec19..264c4bc3de 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -71,9 +71,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ - def mapPartitionsWithIndex(f: JFunction2[Int, T, R], + def mapPartitionsWithIndex[R: ClassManifest](f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = - new JavaRDD(MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)) + new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), + preservesPartitioning)) /** * Return a new RDD by applying a function to all elements of this RDD.