Add more logging for number of records fetched by each reduce

This commit is contained in:
Matei Zaharia 2012-10-04 11:54:47 -07:00
parent 3d24281fbf
commit 588120cd71

View file

@ -19,6 +19,7 @@ class SimpleShuffleFetcher extends ShuffleFetcher with Logging {
} }
for ((serverUri, inputIds) <- Utils.randomize(splitsByUri)) { for ((serverUri, inputIds) <- Utils.randomize(splitsByUri)) {
for (i <- inputIds) { for (i <- inputIds) {
var numRecords = 0
try { try {
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, reduceId) val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, reduceId)
// TODO: multithreaded fetch // TODO: multithreaded fetch
@ -29,12 +30,16 @@ class SimpleShuffleFetcher extends ShuffleFetcher with Logging {
while (true) { while (true) {
val pair = inputStream.readObject().asInstanceOf[(K, V)] val pair = inputStream.readObject().asInstanceOf[(K, V)]
func(pair._1, pair._2) func(pair._1, pair._2)
numRecords += 1
} }
} finally { } finally {
inputStream.close() inputStream.close()
} }
} catch { } catch {
case e: EOFException => {} // We currently assume EOF means we read the whole thing case e: EOFException => {
// We currently assume EOF means we read the whole thing
logInfo("Reduce %s got %s records from map %s".format(reduceId, numRecords, i))
}
case other: Exception => { case other: Exception => {
logError("Fetch failed", other) logError("Fetch failed", other)
throw new FetchFailedException(serverUri, shuffleId, i, reduceId, other) throw new FetchFailedException(serverUri, shuffleId, i, reduceId, other)