Working ExternalAppendOnlyMap for both CoGroupedRDDs and Aggregator

This commit is contained in:
Andrew Or 2013-12-24 14:27:20 -08:00
parent 97fbb3ec52
commit 6a45ec1972
4 changed files with 61 additions and 66 deletions

View file

@ -32,7 +32,6 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) {
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
println("Combining values by key!!")
//val combiners = new AppendOnlyMap[K, C]
val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
var kv: Product2[K, V] = null
@ -47,7 +46,6 @@ case class Aggregator[K, V, C] (
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
println("Combining combiners by key!!")
//val combiners = new AppendOnlyMap[K, C]
val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners)
var kc: (K, C) = null

View file

@ -101,14 +101,16 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
println("Computing in CoGroupedRDD!")
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
val combineFunction: (Seq[ArrayBuffer[Any]], Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] =
(x, y) => { x ++ y }
def combine(x: Seq[ArrayBuffer[Any]], y: Seq[ArrayBuffer[Any]]) = {
x.zipAll(y, ArrayBuffer[Any](), ArrayBuffer[Any]()).map {
case (a, b) => a ++ b
//val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combineFunction)
val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combine)
val ser = SparkEnv.get.serializerManager.get(serializerClass)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
@ -128,22 +130,18 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
def addToMap(key: K, value: Any, depNum: Int) {
val updateFunction: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] =
(hadVal, oldVal) => {
var newVal = oldVal
if (!hadVal){
newVal = Array.fill(numRdds)(new ArrayBuffer[Any])
newVal(depNum) += value
def update(hadVal: Boolean, oldVal: Seq[ArrayBuffer[Any]]): Seq[ArrayBuffer[Any]] = {
var newVal = oldVal
if (!hadVal){
newVal = Array.fill(numRdds)(new ArrayBuffer[Any])
map.changeValue(key, updateFunction)
newVal(depNum) += value
map.changeValue(key, update)
println("About to construct CoGroupedRDD iterator!")
val theIterator = map.iterator
println("Returning CoGroupedRDD iterator!")
new InterruptibleIterator(context, theIterator)
new InterruptibleIterator(context, map.iterator)
override def clearDependencies() {

View file

@ -85,12 +85,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
println("Partitioner is some partitioner! In fact, it is " + self.partitioner.toString())
self.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
}, preservesPartitioning = true)
} else if (mapSideCombine) {
println("Otherwise, combining on map side.")
val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
@ -98,10 +96,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter))
}, preservesPartitioning = true)
} else {
println("Else. No combining on map side!")
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
@ -229,8 +224,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
// into a hash table, leading to more objects in the old gen.
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
@ -650,7 +646,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* MapReduce job.
def saveAsHadoopDataset(conf: JobConf) {
val outputFormatClass = conf.getOutputFormat
val keyClass = conf.getOutputKeyClass
val valueClass = conf.getOutputValueClass
@ -670,7 +665,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
println("WRITE TO FILE")
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@ -678,17 +672,13 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, attemptNumber)
println("START LOOP\n\n\n")
var count = 0
while(iter.hasNext) {
println("Before next()")
val record =
count += 1
println("Before write. Record = "+record)
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
println("After write. Record = "+record)
println("ALL DONE! Woohoo.")

View file

@ -25,36 +25,32 @@ import scala.collection.mutable
* A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner
* function must be specified to merge values back into memory during read.
class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V,
class ExternalAppendOnlyMap[K, V](combineFunction: (V, V) => V,
memoryThresholdMB: Int = 1024)
extends Iterable[(K,V)] with Serializable {
extends Iterable[(K, V)] with Serializable {
var currentMap = new AppendOnlyMap[K,V]
var currentMap = new AppendOnlyMap[K, V]
var oldMaps = new ArrayBuffer[DiskKVIterator]
def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = {
currentMap.changeValue(key, updateFunc)
val mapSize = SizeEstimator.estimate(currentMap)
//if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
if (mapSize > 1024 * 10) {
if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
def spill(): Unit = {
val file = File.createTempFile("external_append_only_map", "") // Add spill location
val out = new ObjectOutputStream(new FileOutputStream(file))
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
sortedMap foreach {
out.writeObject( _ )
sortedMap.foreach { out.writeObject( _ ) }
currentMap = new AppendOnlyMap[K,V]
currentMap = new AppendOnlyMap[K, V]
oldMaps.append(new DiskKVIterator(file))
override def iterator: Iterator[(K,V)] = new ExternalIterator()
override def iterator: Iterator[(K, V)] = new ExternalIterator()
* An iterator that merges KV pairs from memory and disk in sorted order
@ -67,49 +63,62 @@ class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V,
val pq = mutable.PriorityQueue[KVITuple]()
val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps
inputStreams foreach { readFromIterator }
inputStreams.foreach { readFromIterator( _ ) }
override def hasNext: Boolean = !pq.isEmpty
// Combine all values from all input streams corresponding to the same key
override def next(): (K,V) = {
println(" - How many left? "+pq.length)
val minKVI = pq.dequeue()
var (minKey, minValue, minIter) = (minKVI.key, minKVI.value, minKVI.iter)
// println("Min key = "+minKey)
while (!pq.isEmpty && pq.head.key == minKey) {
val newKVI = pq.dequeue()
val (newValue, newIter) = (newKVI.value, newKVI.iter)
// println("\tfound new value to merge! "+newValue)
// println("\tcombinerFunction("+minValue+" <====> "+newValue+")")
minValue = combinerFunction(minValue, newValue)
// println("\tCombine complete! New value = "+minValue)
var (minKey, minValue) = (minKVI.key, minKVI.value)
val minHash = minKey.hashCode()
var collidedKVI = ArrayBuffer[KVITuple]()
while (!pq.isEmpty && pq.head.key.hashCode() == minHash) {
val newKVI: KVITuple = pq.dequeue()
if (newKVI.key == minKey){
minValue = combineFunction(minValue, newKVI.value)
} else {
// Collision
collidedKVI += newKVI
println("Returning minKey = "+minKey+", minValue = "+minValue)
collidedKVI.foreach { pq.enqueue( _ ) }
(minKey, minValue)
def readFromIterator(iter: Iterator[(K,V)]): Unit = {
if (iter.hasNext) {
// Read from the given iterator until a key of different hash is retrieved,
// Add each KV pair read from this iterator to the heap
def readFromIterator(iter: Iterator[(K, V)]): Unit = {
var minHash : Option[Int] = None
while (iter.hasNext) {
val (k, v) =
pq.enqueue(KVITuple(k, v, iter))
minHash match {
case None => minHash = Some(k.hashCode())
case Some(expectedHash) =>
if (k.hashCode() != expectedHash){
case class KVITuple(key:K, value:V, iter:Iterator[(K,V)])
case class KVITuple(key:K, value:V, iter:Iterator[(K, V)])
class MemoryKVIterator(map: AppendOnlyMap[K,V]) extends Iterator[(K,V)] {
class MemoryKVIterator(map: AppendOnlyMap[K, V]) extends Iterator[(K, V)] {
val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
val it = sortedMap.iterator
override def hasNext: Boolean = it.hasNext
override def next(): (K,V) =
override def next(): (K, V) =
class DiskKVIterator(file: File) extends Iterator[(K,V)] {
class DiskKVIterator(file: File) extends Iterator[(K, V)] {
val in = new ObjectInputStream(new FileInputStream(file))
var nextItem:(K,V) = _
var nextItem:(K, V) = _
var eof = false
override def hasNext: Boolean = {
@ -117,7 +126,7 @@ class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V,
return false
try {
nextItem = in.readObject().asInstanceOf[(K,V)]
nextItem = in.readObject().asInstanceOf[(K, V)]
} catch {
case e: EOFException =>
eof = true
@ -126,7 +135,7 @@ class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V,
override def next(): (K,V) = {
override def next(): (K, V) = {
if (eof) {
throw new NoSuchElementException