diff --git a/core/src/main/scala/spark/IndexedRDDFunctions.scala b/core/src/main/scala/spark/IndexedRDDFunctions.scala new file mode 100644 index 0000000000..8bfe9d75e1 --- /dev/null +++ b/core/src/main/scala/spark/IndexedRDDFunctions.scala @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark + +import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import spark._ + +import spark.rdd.ShuffledRDD +import spark.rdd.IndexedRDD +import spark.rdd.BlockIndex +import spark.rdd.RDDIndex + + +class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K,V]) + extends PairRDDFunctions[K,V](self) { + + /** + * Construct a new IndexedRDD that is indexed by only the keys in the RDD + */ + def reindex(): IndexedRDD[K,V] = IndexedRDD(self) + + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { + val cleanF = self.index.rdd.context.clean(f) + val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ + case null => null + case row => row.map(x => f(x)) + }), true) + new IndexedRDD[K,U](self.index, newValues) + } + + + /** + * Pass each value in the key-value pair RDD through a flatMap function without changing the + * keys; this also retains the original RDD's partitioning. + */ + override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { + val cleanF = self.index.rdd.context.clean(f) + val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{ + case null => null + case row => row.flatMap(x => f(x)) + }), true) + new IndexedRDD[K,U](self.index, newValues) + } + + + /** + * Generic function to combine the elements for each key using a custom set of aggregation + * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C + * Note that V and C can be different -- for example, one might group an RDD of type + * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: + * + * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) + * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) + * - `mergeCombiners`, to combine two C's into a single one. + */ + override def combineByKey[C: ClassManifest](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + partitioner: Partitioner, + mapSideCombine: Boolean = true, + serializerClass: String = null): RDD[(K, C)] = { + val newValues = self.valuesRDD.mapPartitions( + _.map{ groups: Seq[Seq[V]] => + groups.map{ group: Seq[V] => + if (group != null && !group.isEmpty) { + val c: C = createCombiner(group.head) + val sum: C = group.tail.foldLeft(c)(mergeValue) + Seq(sum) + } else { + null + } + } + }, true) + new IndexedRDD[K,C](self.index, newValues) + } + + + + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with the existing partitioner/parallelism level. + */ + override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) + new IndexedRDD[K, Seq[V]](self.index, newValues) + } + + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + override def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): + IndexedRDD[K, (Seq[V], Seq[W])] = { + //RDD[(K, (Seq[V], Seq[W]))] = { + other match { + case other: IndexedRDD[_, _] if self.index == other.index => { + // if both RDDs share exactly the same index and therefore the same super set of keys + // then we simply merge the value RDDs. + // However it is possible that both RDDs are missing a value for a given key in + // which case the returned RDD should have a null value + val newValues = + self.valuesRDD.zipPartitions(other.valuesRDD)( + (thisIter, otherIter) => { + val thisValues: Seq[Seq[V]] = thisIter.next() + assert(!thisIter.hasNext) + val otherValues: Seq[Seq[W]] = otherIter.next() + assert(!otherIter.hasNext) + // Zip the values and if both arrays are null then the key is not present and + // so the resulting value must be null (not a tuple of empty sequences) + val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{ + case (null, null) => null // The key is not present in either RDD + case (a, null) => Seq((a, Seq.empty[W])) + case (null, b) => Seq((Seq.empty[V], b)) + case (a, b) => Seq((a,b)) + }.toSeq + List(tmp).iterator + }) + new IndexedRDD[K, (Seq[V], Seq[W])](self.index, newValues) + } + case other: IndexedRDD[_, _] + if self.index.rdd.partitioner == other.index.rdd.partitioner => { + // If both RDDs are indexed using different indices but with the same partitioners + // then we we need to first merge the indicies and then use the merged index to + // merge the values. + val newIndex = + self.index.rdd.zipPartitions(other.index.rdd)( + (thisIter, otherIter) => { + val thisIndex = thisIter.next() + assert(!thisIter.hasNext) + val otherIndex = otherIter.next() + assert(!otherIter.hasNext) + val newIndex = new BlockIndex[K]() + // @todo Merge only the keys that correspond to non-null values + // Merge the keys + newIndex.putAll(thisIndex) + newIndex.putAll(otherIndex) + // We need to rekey the index + var ctr = 0 + for (e <- newIndex.entrySet) { + e.setValue(ctr) + ctr += 1 + } + List(newIndex).iterator + }).cache() + // Use the new index along with the this and the other indices to merge the values + val newValues = + newIndex.zipPartitions(self.tuples, other.tuples)( + (newIndexIter, thisTuplesIter, otherTuplesIter) => { + // Get the new index for this partition + val newIndex = newIndexIter.next() + assert(!newIndexIter.hasNext) + // Get the corresponding indicies and values for this and the other IndexedRDD + val (thisIndex, thisValues) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext) + val (otherIndex, otherValues) = otherTuplesIter.next() + assert(!otherTuplesIter.hasNext) + // Preallocate the new Values array + val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size) + // Lookup the sequences in both submaps + for ((k,ind) <- newIndex) { + val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null + val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null + // if either of the sequences is not null then the key was in one of the two tables + // and so the value should appear in the returned table + newValues(ind) = (thisSeq, otherSeq) match { + case (null, null) => null + case (a, null) => Seq( (a, Seq.empty[W]) ) + case (null, b) => Seq( (Seq.empty[V], b) ) + case (a, b) => Seq( (a,b) ) + } + } + List(newValues.toSeq).iterator + }) + new IndexedRDD(new RDDIndex(newIndex), newValues) + } + case _ => { + // Get the partitioner from the index + val partitioner = self.index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) { + other + } else { + new ShuffledRDD[K,W](other, partitioner) + } + // Join the other RDD with this RDD building a new valueset and new index on the fly + val groups = + self.tuples.zipPartitions(otherShuffled)( + (thisTuplesIter, otherTuplesIter) => { + // Get the corresponding indicies and values for this IndexedRDD + val (thisIndex, thisValues) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext()) + // Construct a new index + val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] + // Construct a new array Buffer to store the values + val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null) + // populate the newValues with the values in this IndexedRDD + for ((k,i) <- thisIndex) { + if (thisValues(i) != null) { + newValues(i) = (thisValues(i), ArrayBuffer.empty[W]) + } + } + // Now iterate through the other tuples updating the map + for ((k,w) <- otherTuplesIter){ + if (!newIndex.contains(k)) { + // update the index + val ind = newIndex.size + newIndex.put(k, ind) + // Update the values + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) + } else { + val ind = newIndex.get(k) + if(newValues(ind) == null) { + // If the other key was in the index but not in the values + // of this indexed RDD then create a new values entry for it + newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) + } else { + newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) + } + } + } + // Finalize the new values array + val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = + newValues.view.map{ + case null => null + case (s, ab) => Seq((s, ab.toSeq)) + }.toSeq + List( (newIndex, newValuesArray) ).iterator + }).cache() + + // Extract the index and values from the above RDD + val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) + val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + + new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) + } + } + } + + +} + +//(self: IndexedRDD[K, V]) extends PairRDDFunctions(self) { } + + diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 03f25040a1..ca42980b46 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -69,7 +69,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, @@ -103,7 +103,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - def combineByKey[C](createCombiner: V => C, + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = { @@ -248,7 +248,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ - def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (v, w) @@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { @@ -278,7 +278,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ - def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => @@ -294,7 +294,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) + def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } @@ -322,7 +322,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, W))] = { join(other, defaultPartitioner(self, other)) } @@ -331,7 +331,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { join(other, new HashPartitioner(numPartitions)) } @@ -341,7 +341,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, defaultPartitioner(self, other)) } @@ -351,7 +351,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ - def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -361,7 +361,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, defaultPartitioner(self, other)) } @@ -371,7 +371,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -384,7 +384,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValues[U](f: V => U): RDD[(K, U)] = { + def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } @@ -393,7 +393,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { + def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } @@ -402,7 +402,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -420,7 +420,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") @@ -441,7 +441,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -449,7 +449,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -458,7 +458,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -466,18 +466,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) + def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def groupWith[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ - def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def groupWith[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 1664471524..95adbda43f 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -774,6 +774,19 @@ abstract class RDD[T: ClassManifest]( } + + // def pairRDDFunctions[K: ClassManifest, V,](implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]): + // PairRDDFunctions[K, V] = { + // new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]]) + // } + + def pairRDDFunctions[K, V]( + implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]): + PairRDDFunctions[K, V] = { + new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]]) + } + + def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] = IndexedRDD.makeIndex(this, partitioner) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 80c65dfebd..7394730786 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -878,7 +878,7 @@ object SparkContext { // TODO: Add AccumulatorParams for other types, e.g. lists and strings implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = - new PairRDDFunctions(rdd) + rdd.pairRDDFunctions implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( rdd: RDD[(K, V)]) = diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index c2995b836a..0b444c5a7e 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -263,8 +263,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ - def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other, partitioner)) + } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -274,6 +277,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -286,6 +291,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -324,16 +331,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other)) + } /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = + def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(rdd.join(other, numPartitions)) + } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -342,6 +355,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * using the existing partitioner/parallelism level. */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -353,6 +368,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * into `numPartitions` partitions. */ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -364,6 +381,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD using the existing partitioner/parallelism level. */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -375,6 +394,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD into the given number of partitions. */ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -411,55 +432,86 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) + } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.cogroup(other))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) + } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] - = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) - + def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) + } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) + } /** Alias for cogroup. */ - def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val wm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] fromRDD(cogroupResultToJava(rdd.groupWith(other))) + } /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val w1m: ClassManifest[W1] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] + implicit val w2m: ClassManifest[W2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) + } /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the diff --git a/core/src/main/scala/spark/rdd/IndexedRDD.scala b/core/src/main/scala/spark/rdd/IndexedRDD.scala index ab1e460aeb..ac80a06c8a 100644 --- a/core/src/main/scala/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/spark/rdd/IndexedRDD.scala @@ -25,7 +25,6 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer - import spark._ import spark.rdd._ import spark.SparkContext._ @@ -77,7 +76,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( /** * An internal representation which joins the block indices with the values */ - protected val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) + protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) /** @@ -108,341 +107,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } - /** - * Construct a new IndexedRDD that is indexed by only the keys in the RDD - */ - def reindex(): IndexedRDD[K,V] = IndexedRDD(this) - - - /** - * Pass each value in the key-value pair RDD through a map function without changing the keys; - * this also retains the original RDD's partitioning. - */ - def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = { - val cleanF = index.rdd.context.clean(f) - val newValues = valuesRDD.mapPartitions(_.map(values => values.map{ - case null => null - case row => row.map(x => f(x)) - }), true) - new IndexedRDD[K,U](index, newValues) + override def pairRDDFunctions[K1, V1]( + implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]): + PairRDDFunctions[K1, V1] = { + new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]]) } - /** - * Pass each value in the key-value pair RDD through a flatMap function without changing the - * keys; this also retains the original RDD's partitioning. - */ - def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): IndexedRDD[K,U] = { - val cleanF = index.rdd.context.clean(f) - val newValues = valuesRDD.mapPartitions(_.map(values => values.map{ - case null => null - case row => row.flatMap(x => f(x)) - }), true) - new IndexedRDD[K,U](index, newValues) - } - - - /** - * Generic function to combine the elements for each key using a custom set of aggregation - * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C - * Note that V and C can be different -- for example, one might group an RDD of type - * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: - * - * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) - * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) - * - `mergeCombiners`, to combine two C's into a single one. - */ - def combineByKey[C: ClassManifest](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - serializerClass: String = null): IndexedRDD[K, C] = { - val newValues = valuesRDD.mapPartitions( - _.map{ groups: Seq[Seq[V]] => - groups.map{ group: Seq[V] => - if (group != null && !group.isEmpty) { - val c: C = createCombiner(group.head) - val sum: C = group.tail.foldLeft(c)(mergeValue) - Seq(sum) - } else { - null - } - } - }, true) - new IndexedRDD[K,C](index, newValues) - } - - /** - * Merge the values for each key using an associative function and a neutral "zero value" which may - * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for - * list concatenation, 0 for addition, or 1 for multiplication.). - */ - def foldByKey(zeroValue: V)(func: (V, V) => V): IndexedRDD[K, V] = { - // Serialize the zero value to a byte array so that we can get a new clone of it on each key - val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue) - val zeroArray = new Array[Byte](zeroBuffer.limit) - zeroBuffer.get(zeroArray) - - // When deserializing, use a lazy val to create just one instance of the serializer per task - lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() - def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) - combineByKey[V]((v: V) => func(createZero(), v), func, func) - } - - /** - * Merge the values for each key using an associative reduce function. This will also perform - * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ - * parallelism level. - */ - def reduceByKey(func: (V, V) => V): IndexedRDD[K, V] = { - combineByKey[V]((v: V) => v, func, func) - } - - - /** - * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with the existing partitioner/parallelism level. - */ - def groupByKey(): IndexedRDD[K, Seq[V]] = { - val newValues = valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) - new IndexedRDD[K, Seq[V]](index, newValues) - } - - - /** - * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the - * list of values for that key in `this` as well as `other`. - */ - def cogroup[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = { - //RDD[(K, (Seq[V], Seq[W]))] = { - other match { - case other: IndexedRDD[_, _] if other.index == index => { - // if both RDDs share exactly the same index and therefore the same super set of keys - // then we simply merge the value RDDs. - // However it is possible that both RDDs are missing a value for a given key in - // which case the returned RDD should have a null value - val newValues = - valuesRDD.zipPartitions(other.valuesRDD)( - (thisIter, otherIter) => { - val thisValues: Seq[Seq[V]] = thisIter.next() - assert(!thisIter.hasNext()) - val otherValues: Seq[Seq[W]] = otherIter.next() - assert(!otherIter.hasNext()) - // Zip the values and if both arrays are null then the key is not present and - // so the resulting value must be null (not a tuple of empty sequences) - val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{ - case (null, null) => null // The key is not present in either RDD - case (a, null) => Seq((a, Seq.empty[W])) - case (null, b) => Seq((Seq.empty[V], b)) - case (a, b) => Seq((a,b)) - }.toSeq - List(tmp).iterator - }) - new IndexedRDD[K, (Seq[V], Seq[W])](index, newValues) - } - case other: IndexedRDD[_, _] if other.index.rdd.partitioner == index.rdd.partitioner => { - // If both RDDs are indexed using different indices but with the same partitioners - // then we we need to first merge the indicies and then use the merged index to - // merge the values. - val newIndex = - index.rdd.zipPartitions(other.index.rdd)( - (thisIter, otherIter) => { - val thisIndex = thisIter.next() - assert(!thisIter.hasNext()) - val otherIndex = otherIter.next() - assert(!otherIter.hasNext()) - val newIndex = new BlockIndex[K]() - // @todo Merge only the keys that correspond to non-null values - // Merge the keys - newIndex.putAll(thisIndex) - newIndex.putAll(otherIndex) - // We need to rekey the index - var ctr = 0 - for (e <- newIndex.entrySet) { - e.setValue(ctr) - ctr += 1 - } - List(newIndex).iterator - }).cache() - // Use the new index along with the this and the other indices to merge the values - val newValues = - newIndex.zipPartitions(tuples, other.tuples)( - (newIndexIter, thisTuplesIter, otherTuplesIter) => { - // Get the new index for this partition - val newIndex = newIndexIter.next() - assert(!newIndexIter.hasNext()) - // Get the corresponding indicies and values for this and the other IndexedRDD - val (thisIndex, thisValues) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext()) - val (otherIndex, otherValues) = otherTuplesIter.next() - assert(!otherTuplesIter.hasNext()) - // Preallocate the new Values array - val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size) - // Lookup the sequences in both submaps - for ((k,ind) <- newIndex) { - val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null - val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null - // if either of the sequences is not null then the key was in one of the two tables - // and so the value should appear in the returned table - newValues(ind) = (thisSeq, otherSeq) match { - case (null, null) => null - case (a, null) => Seq( (a, Seq.empty[W]) ) - case (null, b) => Seq( (Seq.empty[V], b) ) - case (a, b) => Seq( (a,b) ) - } - } - List(newValues.toSeq).iterator - }) - new IndexedRDD(new RDDIndex(newIndex), newValues) - } - case _ => { - // Get the partitioner from the index - val partitioner = index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - // Shuffle the other RDD using the partitioner for this index - val otherShuffled = - if (other.partitioner == Some(partitioner)) { - other - } else { - new ShuffledRDD[K,W](other, partitioner) - } - // Join the other RDD with this RDD building a new valueset and new index on the fly - val groups = - tuples.zipPartitions(otherShuffled)( - (thisTuplesIter, otherTuplesIter) => { - // Get the corresponding indicies and values for this IndexedRDD - val (thisIndex, thisValues) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext()) - // Construct a new index - val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] - // Construct a new array Buffer to store the values - val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null) - // populate the newValues with the values in this IndexedRDD - for ((k,i) <- thisIndex) { - if (thisValues(i) != null) { - newValues(i) = (thisValues(i), ArrayBuffer.empty[W]) - } - } - // Now iterate through the other tuples updating the map - for ((k,w) <- otherTuplesIter){ - if (!newIndex.contains(k)) { - // update the index - val ind = newIndex.size - newIndex.put(k, ind) - // Update the values - newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) - } else { - val ind = newIndex.get(k) - if(newValues(ind) == null) { - // If the other key was in the index but not in the values - // of this indexed RDD then create a new values entry for it - newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) - } else { - newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) - } - } - } - // Finalize the new values array - val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = - newValues.view.map{ - case null => null - case (s, ab) => Seq((s, ab.toSeq)) - }.toSeq - List( (newIndex, newValuesArray) ).iterator - }).cache() - - // Extract the index and values from the above RDD - val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - - new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) - } - } - } - - - - // /** - // * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a - // * tuple with the list of values for that key in `this`, `other1` and `other2`. - // */ - // def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - // : IndexedRDD[K, (Seq[V], Seq[W1], Seq[W2])] = { - // cogroup(other1, other2, defaultPartitioner(this, other1, other2)) - // } - - // /** - // * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a - // * tuple with the list of values for that key in `this`, `other1` and `other2`. - // */ - // def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) - // : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - // cogroup(other1, other2, new HashPartitioner(numPartitions)) - // } - - /** Alias for cogroup. */ - def groupWith[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = { - cogroup(other) - } - - // /** Alias for cogroup. */ - // def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - // : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - // cogroup(other1, other2, defaultPartitioner(self, other1, other2)) - // } - - - /** - * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each - * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and - * (k, v2) is in `other`. Performs a hash join across the cluster. - */ - def join[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, W)] = { - cogroup(other).flatMapValues { - case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) - } - } - - - /** - * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the - * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the - * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output - * using the existing partitioner/parallelism level. - */ - def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, Option[W])] = { - cogroup(other).flatMapValues { - case (vs, ws) => - if (ws.isEmpty) { - vs.iterator.map(v => (v, None)) - } else { - for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) - } - } - - } - - - /** - * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the - * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the - * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting - * RDD using the existing partitioner/parallelism level. - */ - def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Option[V], W)] = { - cogroup(other).flatMapValues { - case (vs, ws) => - if (vs.isEmpty) { - ws.iterator.map(w => (None, w)) - } else { - for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) - } - } - - } + /**