From aae8a014259b0ae71afd7052c8e22797cfebc82e Mon Sep 17 00:00:00 2001 From: eklavya Date: Mon, 13 Jan 2014 17:53:35 +0530 Subject: [PATCH 1/8] Added setter method setGenerator to JavaRDD. --- core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 7d48ce01cf..6c91edaf5c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -133,6 +133,11 @@ JavaRDDLike[T, JavaRDD[T]] { rdd.setName(name) this } + + /** Reset generator*/ + def setGenerator(_generator: String) = { + rdd.generator = _generator + } } object JavaRDD { From dbadc6b994ff54f86b726c71fa08837a6b1e7238 Mon Sep 17 00:00:00 2001 From: eklavya Date: Mon, 13 Jan 2014 17:56:10 +0530 Subject: [PATCH 2/8] Added mapPartitions method to JavaRDD. --- .../scala/org/apache/spark/api/java/JavaRDD.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 6c91edaf5c..568ae1575b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,8 +21,10 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap} import org.apache.spark.storage.StorageLevel +import java.util.{Iterator => JIterator} +import scala.collection.JavaConversions._ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends JavaRDDLike[T, JavaRDD[T]] { @@ -138,6 +140,15 @@ JavaRDDLike[T, JavaRDD[T]] { def setGenerator(_generator: String) = { rdd.generator = _generator } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions[U: ClassTag]( + f: JFMap[JIterator[T], U], preservesPartitioning: Boolean = false): JavaRDD[U] = { + rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) + } + } object JavaRDD { From 6a65feebc708b236625a7b5859981630206cf9d3 Mon Sep 17 00:00:00 2001 From: eklavya Date: Mon, 13 Jan 2014 17:56:47 +0530 Subject: [PATCH 3/8] Added foreachPartition method to JavaRDD. --- .../main/scala/org/apache/spark/api/java/JavaRDD.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 568ae1575b..bd778550af 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,7 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap} +import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction} import org.apache.spark.storage.StorageLevel import java.util.{Iterator => JIterator} import scala.collection.JavaConversions._ @@ -149,6 +149,13 @@ JavaRDDLike[T, JavaRDD[T]] { rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) } + /** + * Applies a function f to each partition of this RDD. + */ + def foreachPartition(f: VoidFunction[JIterator[T]]) { + rdd.foreachPartition((x => f(asJavaIterator(x)))) + } + } object JavaRDD { From 8fe562c0fac25c710fc50e97690f3bca5b5e4d59 Mon Sep 17 00:00:00 2001 From: eklavya Date: Mon, 13 Jan 2014 18:09:58 +0530 Subject: [PATCH 4/8] Remove classtag from mapPartitions. --- core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index bd778550af..52265bbf4d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -144,7 +144,7 @@ JavaRDDLike[T, JavaRDD[T]] { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U: ClassTag]( + def mapPartitions[U]( f: JFMap[JIterator[T], U], preservesPartitioning: Boolean = false): JavaRDD[U] = { rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) } From fa42951e3bbd5af2d65d11e91101a775a2bd3c3f Mon Sep 17 00:00:00 2001 From: eklavya Date: Mon, 13 Jan 2014 18:13:22 +0530 Subject: [PATCH 5/8] Remove default param from mapPartitions --- core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 52265bbf4d..e7b1ceebf6 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -145,7 +145,7 @@ JavaRDDLike[T, JavaRDD[T]] { * Return a new RDD by applying a function to each partition of this RDD. */ def mapPartitions[U]( - f: JFMap[JIterator[T], U], preservesPartitioning: Boolean = false): JavaRDD[U] = { + f: JFMap[JIterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) } From e92297337387b435c9e46f56aa1a403b78647afe Mon Sep 17 00:00:00 2001 From: Saurabh Rawat Date: Mon, 13 Jan 2014 23:40:04 +0530 Subject: [PATCH 6/8] Modifications as suggested in PR feedback- - mapPartitions, foreachPartition moved to JavaRDDLike - call scala rdd's setGenerator instead of setting directly in JavaRDD --- .../org/apache/spark/api/java/JavaRDD.scala | 18 +----------------- .../apache/spark/api/java/JavaRDDLike.scala | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index e7b1ceebf6..e687bbdd99 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -138,24 +138,8 @@ JavaRDDLike[T, JavaRDD[T]] { /** Reset generator*/ def setGenerator(_generator: String) = { - rdd.generator = _generator + rdd.setGenerator(_generator) } - - /** - * Return a new RDD by applying a function to each partition of this RDD. - */ - def mapPartitions[U]( - f: JFMap[JIterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { - rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) - } - - /** - * Applies a function f to each partition of this RDD. - */ - def foreachPartition(f: VoidFunction[JIterator[T]]) { - rdd.foreachPartition((x => f(asJavaIterator(x)))) - } - } object JavaRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index ebbbbd8806..eb8e34e240 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -148,6 +148,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions[U]( + f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { + rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) + } + + /** + * Applies a function f to each partition of this RDD. + */ + def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { + rdd.foreachPartition((x => f(asJavaIterator(x)))) + } + /** * Return an RDD created by coalescing all elements within each partition into an array. */ From 1442cd5d5099de71747b1cccf463b94fdedcda1f Mon Sep 17 00:00:00 2001 From: Saurabh Rawat Date: Tue, 14 Jan 2014 14:19:02 +0530 Subject: [PATCH 7/8] Modifications as suggested in PR feedback- - more variants of mapPartitions added to JavaRDDLike - move setGenerator to JavaRDDLike - clean up --- .../org/apache/spark/api/java/JavaRDD.scala | 9 +------- .../apache/spark/api/java/JavaRDDLike.scala | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index e687bbdd99..7d48ce01cf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,10 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction} +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel -import java.util.{Iterator => JIterator} -import scala.collection.JavaConversions._ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends JavaRDDLike[T, JavaRDD[T]] { @@ -135,11 +133,6 @@ JavaRDDLike[T, JavaRDD[T]] { rdd.setName(name) this } - - /** Reset generator*/ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } } object JavaRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index eb8e34e240..808c907d37 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -156,6 +156,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) } + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue())) + } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): + JavaPairRDD[K2, V2] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType()) + } + /** * Applies a function f to each partition of this RDD. */ @@ -476,4 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name + + /** Reset generator */ + def setGenerator(_generator: String) = { + rdd.setGenerator(_generator) + } } From 60e7457266eef18f562ef5cb93d62db1af821fdf Mon Sep 17 00:00:00 2001 From: eklavya Date: Thu, 23 Jan 2014 17:40:36 +0530 Subject: [PATCH 8/8] fixed ClassTag in mapPartitions --- .../org/apache/spark/api/java/JavaRDDLike.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 808c907d37..9680c6f3e1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType()) + } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue())) } /** - * Return a new RDD by applying a function to each partition of this RDD. + * Return a new RDD by applying a function to each partition of this RDD. */ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): JavaPairRDD[K2, V2] = { @@ -148,13 +156,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } - /** - * Return a new RDD by applying a function to each partition of this RDD. - */ - def mapPartitions[U]( - f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { - rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) - } /** * Return a new RDD by applying a function to each partition of this RDD.