Modifications as suggested in PR feedback-

- more variants of mapPartitions added to JavaRDDLike
- move setGenerator to JavaRDDLike
- clean up
This commit is contained in:
Saurabh Rawat 2014-01-14 14:19:02 +05:30
parent e922973373
commit 1442cd5d50
2 changed files with 23 additions and 8 deletions

View file

@ -21,10 +21,8 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
import java.util.{Iterator => JIterator}
import scala.collection.JavaConversions._
class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
@ -135,11 +133,6 @@ JavaRDDLike[T, JavaRDD[T]] {
rdd.setName(name)
this
}
/** Reset generator*/
def setGenerator(_generator: String) = {
rdd.setGenerator(_generator)
}
}
object JavaRDD {

View file

@ -156,6 +156,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning)
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
JavaPairRDD[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
}
/**
* Applies a function f to each partition of this RDD.
*/
@ -476,4 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
def name(): String = rdd.name
/** Reset generator */
def setGenerator(_generator: String) = {
rdd.setGenerator(_generator)
}
}