From 8ac0f35be42765fcd6f02dcf0f070f2ef2377a85 Mon Sep 17 00:00:00 2001 From: Stephen Haberman Date: Tue, 8 Jan 2013 09:57:45 -0600 Subject: [PATCH] Add JavaRDDLike.keyBy. --- core/src/main/scala/spark/api/java/JavaRDDLike.scala | 8 ++++++++ core/src/test/scala/spark/JavaAPISuite.java | 12 ++++++++++++ 2 files changed, 20 insertions(+) diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 81d3a94466..d15f6dd02f 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -298,4 +298,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Save this RDD as a SequenceFile of serialized objects. */ def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path) + + /** + * Creates tuples of the elements in this RDD by applying `f`. + */ + def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = { + implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + JavaPairRDD.fromRDD(rdd.keyBy(f)) + } } diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 0817d1146c..c61913fc82 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -629,4 +629,16 @@ public class JavaAPISuite implements Serializable { floatAccum.setValue(5.0f); Assert.assertEquals((Float) 5.0f, floatAccum.value()); } + + @Test + public void keyBy() { + JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2)); + List> s = rdd.keyBy(new Function() { + public String call(Integer t) throws Exception { + return t.toString(); + } + }).collect(); + Assert.assertEquals(new Tuple2("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2("2", 2), s.get(1)); + } }