SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader insta...

...ntiation

Author: Sandy Ryza <sandy@cloudera.com>

Closes #3045 from sryza/sandy-spark-4178 and squashes the following commits:

8d2e70e [Sandy Ryza] Kostas's review feedback
e5b27c0 [Sandy Ryza] SPARK-4178. Hadoop input metrics ignore bytes read in RecordReader instantiation
This commit is contained in:
Sandy Ryza 2014-11-03 15:19:01 -08:00 committed by Patrick Wendell
parent 25bef7e695
commit 28128150e7
3 changed files with 54 additions and 26 deletions

View file

@ -211,8 +211,22 @@ class HadoopRDD[K, V](
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
@ -223,17 +237,6 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread.
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
var recordsSinceMetricsUpdate = 0
override def getNext() = {

View file

@ -107,6 +107,20 @@ class NewHadoopRDD[K, V](
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
@ -119,18 +133,6 @@ class NewHadoopRDD[K, V](
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread.
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
var havePair = false

View file

@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import java.io.{FileWriter, PrintWriter, File}
class InputMetricsSuite extends FunSuite with SharedSparkContext {
test("input metrics when reading text file") {
test("input metrics when reading text file with single split") {
val file = new File(getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(file))
pw.println("some stuff")
@ -48,6 +48,29 @@ class InputMetricsSuite extends FunSuite with SharedSparkContext {
// Wait for task end events to come in
sc.listenerBus.waitUntilEmpty(500)
assert(taskBytesRead.length == 2)
assert(taskBytesRead.sum == file.length())
assert(taskBytesRead.sum >= file.length())
}
test("input metrics when reading text file with multiple splits") {
val file = new File(getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(file))
for (i <- 0 until 10000) {
pw.println("some stuff")
}
pw.close()
file.deleteOnExit()
val taskBytesRead = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
}
})
sc.textFile("file://" + file.getAbsolutePath, 2).count()
// Wait for task end events to come in
sc.listenerBus.waitUntilEmpty(500)
assert(taskBytesRead.length == 2)
assert(taskBytesRead.sum >= file.length())
}
}