Implement PairRDDFunctions.partitionBy

This commit is contained in:
Ankur Dave 2011-10-09 15:52:09 -07:00
parent 06637cb69e
commit 2d7057bf5d

View file

@ -77,6 +77,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
bufs.asInstanceOf[RDD[(K, Seq[V])]] bufs.asInstanceOf[RDD[(K, Seq[V])]]
} }
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, defaultParallelism, partitioner)
bufs.flatMapValues(buf => buf)
}
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) } val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) } val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }