Bug squashed. CustomParallelInMemoryShuffle is rocking!

We were serializing one (the wrong) thing, trying to deserialize another (the right thing).
This commit is contained in:
Mosharaf Chowdhury 2010-12-22 17:03:31 -08:00
parent 23586d3bef
commit c484b735bb

View file

@ -70,7 +70,7 @@ extends Shuffle[K, V, C] with Logging {
// Write buckets(i) to a byte array & put in splitsCache instead of file
val baos = new ByteArrayOutputStream
val oos = new ObjectOutputStream(baos)
oos.writeObject(buckets(i))
buckets(i).foreach(pair => oos.writeObject(pair))
oos.close
baos.close
@ -119,7 +119,7 @@ extends Shuffle[K, V, C] with Logging {
CustomParallelInMemoryShuffle.MaxRxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
// Select a random split to pull
val splitIndex = selectRandomSplit
@ -190,7 +190,6 @@ extends Shuffle[K, V, C] with Logging {
try{
while (true) {
// logInfo("" + inputStream.readObject.isInstanceOf[(K, C)])
val (k, c) = inputStream.readObject.asInstanceOf[(K, C)]
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)