cogrouped RDD stores the amount of time taken to read shuffle data in each task
This commit is contained in:
parent
295b534398
commit
e319ac74c1
|
@ -110,7 +110,9 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
|
||||||
mySeq(depNum) += v
|
mySeq(depNum) += v
|
||||||
}
|
}
|
||||||
val fetcher = SparkEnv.get.shuffleFetcher
|
val fetcher = SparkEnv.get.shuffleFetcher
|
||||||
fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
|
val fetchItr = fetcher.fetch[K, Seq[Any]](shuffleId, split.index)
|
||||||
|
fetchItr.foreach(mergePair)
|
||||||
|
context.task.setShuffleReadMillis(fetchItr.getNetMillis)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
JavaConversions.mapAsScalaMap(map).iterator
|
JavaConversions.mapAsScalaMap(map).iterator
|
||||||
|
|
|
@ -16,6 +16,10 @@ private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
|
||||||
def preferredLocations: Seq[String] = Nil
|
def preferredLocations: Seq[String] = Nil
|
||||||
|
|
||||||
var generation: Long = -1 // Map output tracker generation. Will be set by TaskScheduler.
|
var generation: Long = -1 // Map output tracker generation. Will be set by TaskScheduler.
|
||||||
|
|
||||||
|
private var shufflerReadMillis : Option[Long] = None
|
||||||
|
def setShuffleReadMillis(millis: Long) = shufflerReadMillis = Some(millis)
|
||||||
|
def getShuffleReadMillis = shufflerReadMillis
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue