Merge pull request #782 from WANdisco/master
SHARK-94 Log the files computed by HadoopRDD and NewHadoopRDD
This commit is contained in:
commit
d031f73679
|
@ -88,6 +88,7 @@ class HadoopRDD[K, V](
|
||||||
|
|
||||||
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
|
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
|
||||||
val split = theSplit.asInstanceOf[HadoopPartition]
|
val split = theSplit.asInstanceOf[HadoopPartition]
|
||||||
|
logInfo("Input split: " + split.inputSplit)
|
||||||
var reader: RecordReader[K, V] = null
|
var reader: RecordReader[K, V] = null
|
||||||
|
|
||||||
val conf = confBroadcast.value.value
|
val conf = confBroadcast.value.value
|
||||||
|
|
|
@ -73,6 +73,7 @@ class NewHadoopRDD[K, V](
|
||||||
|
|
||||||
override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
|
override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
|
||||||
val split = theSplit.asInstanceOf[NewHadoopPartition]
|
val split = theSplit.asInstanceOf[NewHadoopPartition]
|
||||||
|
logInfo("Input split: " + split.serializableHadoopSplit)
|
||||||
val conf = confBroadcast.value.value
|
val conf = confBroadcast.value.value
|
||||||
val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
|
val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
|
||||||
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
|
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
|
||||||
|
|
Loading…
Reference in a new issue