Allowing RDD to select its implementation of PairRDDFunctions

This commit is contained in:
Joseph E. Gonzalez 2013-08-27 18:16:19 -07:00
parent 55c6e73bfb
commit 93503a7054
6 changed files with 380 additions and 370 deletions

View file

@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark
import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import spark._
import spark.rdd.ShuffledRDD
import spark.rdd.IndexedRDD
import spark.rdd.BlockIndex
import spark.rdd.RDDIndex
class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K,V])
extends PairRDDFunctions[K,V](self) {
/**
* Construct a new IndexedRDD that is indexed by only the keys in the RDD
*/
def reindex(): IndexedRDD[K,V] = IndexedRDD(self)
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = {
val cleanF = self.index.rdd.context.clean(f)
val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{
case null => null
case row => row.map(x => f(x))
}), true)
new IndexedRDD[K,U](self.index, newValues)
}
/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = {
val cleanF = self.index.rdd.context.clean(f)
val newValues = self.valuesRDD.mapPartitions(_.map(values => values.map{
case null => null
case row => row.flatMap(x => f(x))
}), true)
new IndexedRDD[K,U](self.index, newValues)
}
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*/
override def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializerClass: String = null): RDD[(K, C)] = {
val newValues = self.valuesRDD.mapPartitions(
_.map{ groups: Seq[Seq[V]] =>
groups.map{ group: Seq[V] =>
if (group != null && !group.isEmpty) {
val c: C = createCombiner(group.head)
val sum: C = group.tail.foldLeft(c)(mergeValue)
Seq(sum)
} else {
null
}
}
}, true)
new IndexedRDD[K,C](self.index, newValues)
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true)
new IndexedRDD[K, Seq[V]](self.index, newValues)
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
override def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner):
IndexedRDD[K, (Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
other match {
case other: IndexedRDD[_, _] if self.index == other.index => {
// if both RDDs share exactly the same index and therefore the same super set of keys
// then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value
val newValues =
self.valuesRDD.zipPartitions(other.valuesRDD)(
(thisIter, otherIter) => {
val thisValues: Seq[Seq[V]] = thisIter.next()
assert(!thisIter.hasNext)
val otherValues: Seq[Seq[W]] = otherIter.next()
assert(!otherIter.hasNext)
// Zip the values and if both arrays are null then the key is not present and
// so the resulting value must be null (not a tuple of empty sequences)
val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{
case (null, null) => null // The key is not present in either RDD
case (a, null) => Seq((a, Seq.empty[W]))
case (null, b) => Seq((Seq.empty[V], b))
case (a, b) => Seq((a,b))
}.toSeq
List(tmp).iterator
})
new IndexedRDD[K, (Seq[V], Seq[W])](self.index, newValues)
}
case other: IndexedRDD[_, _]
if self.index.rdd.partitioner == other.index.rdd.partitioner => {
// If both RDDs are indexed using different indices but with the same partitioners
// then we we need to first merge the indicies and then use the merged index to
// merge the values.
val newIndex =
self.index.rdd.zipPartitions(other.index.rdd)(
(thisIter, otherIter) => {
val thisIndex = thisIter.next()
assert(!thisIter.hasNext)
val otherIndex = otherIter.next()
assert(!otherIter.hasNext)
val newIndex = new BlockIndex[K]()
// @todo Merge only the keys that correspond to non-null values
// Merge the keys
newIndex.putAll(thisIndex)
newIndex.putAll(otherIndex)
// We need to rekey the index
var ctr = 0
for (e <- newIndex.entrySet) {
e.setValue(ctr)
ctr += 1
}
List(newIndex).iterator
}).cache()
// Use the new index along with the this and the other indices to merge the values
val newValues =
newIndex.zipPartitions(self.tuples, other.tuples)(
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition
val newIndex = newIndexIter.next()
assert(!newIndexIter.hasNext)
// Get the corresponding indicies and values for this and the other IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext)
val (otherIndex, otherValues) = otherTuplesIter.next()
assert(!otherTuplesIter.hasNext)
// Preallocate the new Values array
val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size)
// Lookup the sequences in both submaps
for ((k,ind) <- newIndex) {
val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null
val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null
// if either of the sequences is not null then the key was in one of the two tables
// and so the value should appear in the returned table
newValues(ind) = (thisSeq, otherSeq) match {
case (null, null) => null
case (a, null) => Seq( (a, Seq.empty[W]) )
case (null, b) => Seq( (Seq.empty[V], b) )
case (a, b) => Seq( (a,b) )
}
}
List(newValues.toSeq).iterator
})
new IndexedRDD(new RDDIndex(newIndex), newValues)
}
case _ => {
// Get the partitioner from the index
val partitioner = self.index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled =
if (other.partitioner == Some(partitioner)) {
other
} else {
new ShuffledRDD[K,W](other, partitioner)
}
// Join the other RDD with this RDD building a new valueset and new index on the fly
val groups =
self.tuples.zipPartitions(otherShuffled)(
(thisTuplesIter, otherTuplesIter) => {
// Get the corresponding indicies and values for this IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]]
// Construct a new array Buffer to store the values
val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null)
// populate the newValues with the values in this IndexedRDD
for ((k,i) <- thisIndex) {
if (thisValues(i) != null) {
newValues(i) = (thisValues(i), ArrayBuffer.empty[W])
}
}
// Now iterate through the other tuples updating the map
for ((k,w) <- otherTuplesIter){
if (!newIndex.contains(k)) {
// update the index
val ind = newIndex.size
newIndex.put(k, ind)
// Update the values
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
} else {
val ind = newIndex.get(k)
if(newValues(ind) == null) {
// If the other key was in the index but not in the values
// of this indexed RDD then create a new values entry for it
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
} else {
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
}
}
}
// Finalize the new values array
val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
newValues.view.map{
case null => null
case (s, ab) => Seq((s, ab.toSeq))
}.toSeq
List( (newIndex, newValuesArray) ).iterator
}).cache()
// Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
}
}
}
}
//(self: IndexedRDD[K, V]) extends PairRDDFunctions(self) { }

View file

@ -69,7 +69,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* In addition, users can control the partitioning of the output RDD, and whether to perform * In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key). * map-side aggregation (if a mapper can produce multiple items with the same key).
*/ */
def combineByKey[C](createCombiner: V => C, def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, mergeCombiners: (C, C) => C,
partitioner: Partitioner, partitioner: Partitioner,
@ -103,7 +103,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/** /**
* Simplified version of combineByKey that hash-partitions the output RDD. * Simplified version of combineByKey that hash-partitions the output RDD.
*/ */
def combineByKey[C](createCombiner: V => C, def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = { numPartitions: Int): RDD[(K, C)] = {
@ -248,7 +248,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/ */
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { this.cogroup(other, partitioner).flatMapValues {
case (vs, ws) => case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w) for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD. * partition the output RDD.
*/ */
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { this.cogroup(other, partitioner).flatMapValues {
case (vs, ws) => case (vs, ws) =>
if (ws.isEmpty) { if (ws.isEmpty) {
@ -278,7 +278,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
* partition the output RDD. * partition the output RDD.
*/ */
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = { : RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { this.cogroup(other, partitioner).flatMapValues {
case (vs, ws) => case (vs, ws) =>
@ -294,7 +294,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Simplified version of combineByKey that hash-partitions the resulting RDD using the * Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level. * existing partitioner/parallelism level.
*/ */
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = { : RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
} }
@ -322,7 +322,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster. * (k, v2) is in `other`. Performs a hash join across the cluster.
*/ */
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { def join[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
join(other, defaultPartitioner(self, other)) join(other, defaultPartitioner(self, other))
} }
@ -331,7 +331,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster. * (k, v2) is in `other`. Performs a hash join across the cluster.
*/ */
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
join(other, new HashPartitioner(numPartitions)) join(other, new HashPartitioner(numPartitions))
} }
@ -341,7 +341,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level. * using the existing partitioner/parallelism level.
*/ */
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, defaultPartitioner(self, other)) leftOuterJoin(other, defaultPartitioner(self, other))
} }
@ -351,7 +351,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions. * into `numPartitions` partitions.
*/ */
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, new HashPartitioner(numPartitions)) leftOuterJoin(other, new HashPartitioner(numPartitions))
} }
@ -361,7 +361,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level. * RDD using the existing partitioner/parallelism level.
*/ */
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, defaultPartitioner(self, other)) rightOuterJoin(other, defaultPartitioner(self, other))
} }
@ -371,7 +371,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions. * RDD into the given number of partitions.
*/ */
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, new HashPartitioner(numPartitions)) rightOuterJoin(other, new HashPartitioner(numPartitions))
} }
@ -384,7 +384,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Pass each value in the key-value pair RDD through a map function without changing the keys; * Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning. * this also retains the original RDD's partitioning.
*/ */
def mapValues[U](f: V => U): RDD[(K, U)] = { def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f) val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF) new MappedValuesRDD(self, cleanF)
} }
@ -393,7 +393,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Pass each value in the key-value pair RDD through a flatMap function without changing the * Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning. * keys; this also retains the original RDD's partitioning.
*/ */
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f) val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF) new FlatMappedValuesRDD(self, cleanF)
} }
@ -402,7 +402,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`. * list of values for that key in `this` as well as `other`.
*/ */
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.") throw new SparkException("Default partitioner cannot partition array keys.")
} }
@ -420,7 +420,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`. * tuple with the list of values for that key in `this`, `other1` and `other2`.
*/ */
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.") throw new SparkException("Default partitioner cannot partition array keys.")
@ -441,7 +441,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`. * list of values for that key in `this` as well as `other`.
*/ */
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { def cogroup[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other)) cogroup(other, defaultPartitioner(self, other))
} }
@ -449,7 +449,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`. * tuple with the list of values for that key in `this`, `other1` and `other2`.
*/ */
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2)) cogroup(other1, other2, defaultPartitioner(self, other1, other2))
} }
@ -458,7 +458,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`. * list of values for that key in `this` as well as `other`.
*/ */
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, new HashPartitioner(numPartitions)) cogroup(other, new HashPartitioner(numPartitions))
} }
@ -466,18 +466,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`. * tuple with the list of values for that key in `this`, `other1` and `other2`.
*/ */
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions)) cogroup(other1, other2, new HashPartitioner(numPartitions))
} }
/** Alias for cogroup. */ /** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { def groupWith[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other)) cogroup(other, defaultPartitioner(self, other))
} }
/** Alias for cogroup. */ /** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) def groupWith[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2)) cogroup(other1, other2, defaultPartitioner(self, other1, other2))
} }

View file

@ -774,6 +774,19 @@ abstract class RDD[T: ClassManifest](
} }
// def pairRDDFunctions[K: ClassManifest, V,](implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]):
// PairRDDFunctions[K, V] = {
// new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]])
// }
def pairRDDFunctions[K, V](
implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]):
PairRDDFunctions[K, V] = {
new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]])
}
def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] = def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] =
IndexedRDD.makeIndex(this, partitioner) IndexedRDD.makeIndex(this, partitioner)

View file

@ -878,7 +878,7 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings // TODO: Add AccumulatorParams for other types, e.g. lists and strings
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd) rdd.pairRDDFunctions
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
rdd: RDD[(K, V)]) = rdd: RDD[(K, V)]) =

View file

@ -263,8 +263,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* the merging locally on each mapper before sending results to a reducer, similarly to a * the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. * "combiner" in MapReduce.
*/ */
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, partitioner)) fromRDD(rdd.join(other, partitioner))
}
/** /**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@ -274,6 +277,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/ */
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (V, Optional[W])] = { : JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, partitioner) val joinResult = rdd.leftOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
} }
@ -286,6 +291,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/ */
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], W)] = { : JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, partitioner) val joinResult = rdd.rightOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
} }
@ -324,16 +331,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster. * (k, v2) is in `other`. Performs a hash join across the cluster.
*/ */
def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other)) fromRDD(rdd.join(other))
}
/** /**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster. * (k, v2) is in `other`. Performs a hash join across the cluster.
*/ */
def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(rdd.join(other, numPartitions)) fromRDD(rdd.join(other, numPartitions))
}
/** /**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@ -342,6 +355,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* using the existing partitioner/parallelism level. * using the existing partitioner/parallelism level.
*/ */
def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other) val joinResult = rdd.leftOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
} }
@ -353,6 +368,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* into `numPartitions` partitions. * into `numPartitions` partitions.
*/ */
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = { def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.leftOuterJoin(other, numPartitions) val joinResult = rdd.leftOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
} }
@ -364,6 +381,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD using the existing partitioner/parallelism level. * RDD using the existing partitioner/parallelism level.
*/ */
def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other) val joinResult = rdd.rightOuterJoin(other)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
} }
@ -375,6 +394,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* RDD into the given number of partitions. * RDD into the given number of partitions.
*/ */
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = { def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
val joinResult = rdd.rightOuterJoin(other, numPartitions) val joinResult = rdd.rightOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
} }
@ -411,55 +432,86 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* list of values for that key in `this` as well as `other`. * list of values for that key in `this` as well as `other`.
*/ */
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W])] = : JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
}
/** /**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`. * tuple with the list of values for that key in `this`, `other1` and `other2`.
*/ */
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner) def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
}
/** /**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`. * list of values for that key in `this` as well as `other`.
*/ */
def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other))) fromRDD(cogroupResultToJava(rdd.cogroup(other)))
}
/** /**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`. * tuple with the list of values for that key in `this`, `other1` and `other2`.
*/ */
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
}
/** /**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`. * list of values for that key in `this` as well as `other`.
*/ */
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = {
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
}
/** /**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`. * tuple with the list of values for that key in `this`, `other1` and `other2`.
*/ */
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
}
/** Alias for cogroup. */ /** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = {
implicit val wm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
fromRDD(cogroupResultToJava(rdd.groupWith(other))) fromRDD(cogroupResultToJava(rdd.groupWith(other)))
}
/** Alias for cogroup. */ /** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = {
implicit val w1m: ClassManifest[W1] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]]
implicit val w2m: ClassManifest[W2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]]
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
}
/** /**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the * Return the list of values in the RDD for key `key`. This operation is done efficiently if the

View file

@ -25,7 +25,6 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import spark._ import spark._
import spark.rdd._ import spark.rdd._
import spark.SparkContext._ import spark.SparkContext._
@ -77,7 +76,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
/** /**
* An internal representation which joins the block indices with the values * An internal representation which joins the block indices with the values
*/ */
protected val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD) protected[spark] val tuples = new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
/** /**
@ -108,341 +107,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
} }
/** override def pairRDDFunctions[K1, V1](
* Construct a new IndexedRDD that is indexed by only the keys in the RDD implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]):
*/ PairRDDFunctions[K1, V1] = {
def reindex(): IndexedRDD[K,V] = IndexedRDD(this) new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]])
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = {
val cleanF = index.rdd.context.clean(f)
val newValues = valuesRDD.mapPartitions(_.map(values => values.map{
case null => null
case row => row.map(x => f(x))
}), true)
new IndexedRDD[K,U](index, newValues)
}
/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): IndexedRDD[K,U] = {
val cleanF = index.rdd.context.clean(f)
val newValues = valuesRDD.mapPartitions(_.map(values => values.map{
case null => null
case row => row.flatMap(x => f(x))
}), true)
new IndexedRDD[K,U](index, newValues)
}
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*/
def combineByKey[C: ClassManifest](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
serializerClass: String = null): IndexedRDD[K, C] = {
val newValues = valuesRDD.mapPartitions(
_.map{ groups: Seq[Seq[V]] =>
groups.map{ group: Seq[V] =>
if (group != null && !group.isEmpty) {
val c: C = createCombiner(group.head)
val sum: C = group.tail.foldLeft(c)(mergeValue)
Seq(sum)
} else {
null
}
}
}, true)
new IndexedRDD[K,C](index, newValues)
}
/**
* Merge the values for each key using an associative function and a neutral "zero value" which may
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
* list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V)(func: (V, V) => V): IndexedRDD[K, V] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func)
}
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): IndexedRDD[K, V] = {
combineByKey[V]((v: V) => v, func, func)
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
def groupByKey(): IndexedRDD[K, Seq[V]] = {
val newValues = valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true)
new IndexedRDD[K, Seq[V]](index, newValues)
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = {
//RDD[(K, (Seq[V], Seq[W]))] = {
other match {
case other: IndexedRDD[_, _] if other.index == index => {
// if both RDDs share exactly the same index and therefore the same super set of keys
// then we simply merge the value RDDs.
// However it is possible that both RDDs are missing a value for a given key in
// which case the returned RDD should have a null value
val newValues =
valuesRDD.zipPartitions(other.valuesRDD)(
(thisIter, otherIter) => {
val thisValues: Seq[Seq[V]] = thisIter.next()
assert(!thisIter.hasNext())
val otherValues: Seq[Seq[W]] = otherIter.next()
assert(!otherIter.hasNext())
// Zip the values and if both arrays are null then the key is not present and
// so the resulting value must be null (not a tuple of empty sequences)
val tmp: Seq[Seq[(Seq[V], Seq[W])]] = thisValues.view.zip(otherValues).map{
case (null, null) => null // The key is not present in either RDD
case (a, null) => Seq((a, Seq.empty[W]))
case (null, b) => Seq((Seq.empty[V], b))
case (a, b) => Seq((a,b))
}.toSeq
List(tmp).iterator
})
new IndexedRDD[K, (Seq[V], Seq[W])](index, newValues)
}
case other: IndexedRDD[_, _] if other.index.rdd.partitioner == index.rdd.partitioner => {
// If both RDDs are indexed using different indices but with the same partitioners
// then we we need to first merge the indicies and then use the merged index to
// merge the values.
val newIndex =
index.rdd.zipPartitions(other.index.rdd)(
(thisIter, otherIter) => {
val thisIndex = thisIter.next()
assert(!thisIter.hasNext())
val otherIndex = otherIter.next()
assert(!otherIter.hasNext())
val newIndex = new BlockIndex[K]()
// @todo Merge only the keys that correspond to non-null values
// Merge the keys
newIndex.putAll(thisIndex)
newIndex.putAll(otherIndex)
// We need to rekey the index
var ctr = 0
for (e <- newIndex.entrySet) {
e.setValue(ctr)
ctr += 1
}
List(newIndex).iterator
}).cache()
// Use the new index along with the this and the other indices to merge the values
val newValues =
newIndex.zipPartitions(tuples, other.tuples)(
(newIndexIter, thisTuplesIter, otherTuplesIter) => {
// Get the new index for this partition
val newIndex = newIndexIter.next()
assert(!newIndexIter.hasNext())
// Get the corresponding indicies and values for this and the other IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
val (otherIndex, otherValues) = otherTuplesIter.next()
assert(!otherTuplesIter.hasNext())
// Preallocate the new Values array
val newValues = new Array[Seq[(Seq[V],Seq[W])]](newIndex.size)
// Lookup the sequences in both submaps
for ((k,ind) <- newIndex) {
val thisSeq = if (thisIndex.contains(k)) thisValues(thisIndex.get(k)) else null
val otherSeq = if (otherIndex.contains(k)) otherValues(otherIndex.get(k)) else null
// if either of the sequences is not null then the key was in one of the two tables
// and so the value should appear in the returned table
newValues(ind) = (thisSeq, otherSeq) match {
case (null, null) => null
case (a, null) => Seq( (a, Seq.empty[W]) )
case (null, b) => Seq( (Seq.empty[V], b) )
case (a, b) => Seq( (a,b) )
}
}
List(newValues.toSeq).iterator
})
new IndexedRDD(new RDDIndex(newIndex), newValues)
}
case _ => {
// Get the partitioner from the index
val partitioner = index.rdd.partitioner match {
case Some(p) => p
case None => throw new SparkException("An index must have a partitioner.")
}
// Shuffle the other RDD using the partitioner for this index
val otherShuffled =
if (other.partitioner == Some(partitioner)) {
other
} else {
new ShuffledRDD[K,W](other, partitioner)
}
// Join the other RDD with this RDD building a new valueset and new index on the fly
val groups =
tuples.zipPartitions(otherShuffled)(
(thisTuplesIter, otherTuplesIter) => {
// Get the corresponding indicies and values for this IndexedRDD
val (thisIndex, thisValues) = thisTuplesIter.next()
assert(!thisTuplesIter.hasNext())
// Construct a new index
val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]]
// Construct a new array Buffer to store the values
val newValues = ArrayBuffer.fill[(Seq[V], Seq[W])](thisValues.size)(null)
// populate the newValues with the values in this IndexedRDD
for ((k,i) <- thisIndex) {
if (thisValues(i) != null) {
newValues(i) = (thisValues(i), ArrayBuffer.empty[W])
}
}
// Now iterate through the other tuples updating the map
for ((k,w) <- otherTuplesIter){
if (!newIndex.contains(k)) {
// update the index
val ind = newIndex.size
newIndex.put(k, ind)
// Update the values
newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
} else {
val ind = newIndex.get(k)
if(newValues(ind) == null) {
// If the other key was in the index but not in the values
// of this indexed RDD then create a new values entry for it
newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
} else {
newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
}
}
}
// Finalize the new values array
val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] =
newValues.view.map{
case null => null
case (s, ab) => Seq((s, ab.toSeq))
}.toSeq
List( (newIndex, newValuesArray) ).iterator
}).cache()
// Extract the index and values from the above RDD
val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
}
}
} }
// /**
// * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
// * tuple with the list of values for that key in `this`, `other1` and `other2`.
// */
// def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
// : IndexedRDD[K, (Seq[V], Seq[W1], Seq[W2])] = {
// cogroup(other1, other2, defaultPartitioner(this, other1, other2))
// }
// /**
// * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
// * tuple with the list of values for that key in `this`, `other1` and `other2`.
// */
// def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
// : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
// cogroup(other1, other2, new HashPartitioner(numPartitions))
// }
/** Alias for cogroup. */
def groupWith[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Seq[V], Seq[W])] = {
cogroup(other)
}
// /** Alias for cogroup. */
// def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
// : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
// cogroup(other1, other2, defaultPartitioner(self, other1, other2))
// }
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, W)] = {
cogroup(other).flatMapValues {
case (vs, ws) =>
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (V, Option[W])] = {
cogroup(other).flatMapValues {
case (vs, ws) =>
if (ws.isEmpty) {
vs.iterator.map(v => (v, None))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
}
}
}
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): IndexedRDD[K, (Option[V], W)] = {
cogroup(other).flatMapValues {
case (vs, ws) =>
if (vs.isEmpty) {
ws.iterator.map(w => (None, w))
} else {
for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
}
}
}
/** /**