[SPARK-5701] Only set ShuffleReadMetrics when task has shuffle deps

The updateShuffleReadMetrics method in TaskMetrics (called by the executor heartbeater) will currently always add a ShuffleReadMetrics to TaskMetrics (with values set to 0), even when the task didn't read any shuffle data. ShuffleReadMetrics should only be added if the task reads shuffle data.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #4488 from kayousterhout/SPARK-5701 and squashes the following commits:

673ed58 [Kay Ousterhout] SPARK-5701: Only set ShuffleReadMetrics when task has shuffle deps
This commit is contained in:
Kay Ousterhout 2015-02-09 21:22:09 -08:00 committed by Andrew Or
parent a95ed52157
commit a2d33d0b01
2 changed files with 40 additions and 10 deletions

View file

@ -177,8 +177,8 @@ class TaskMetrics extends Serializable {
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
InputMetrics =synchronized {
private[spark] def getInputMetricsForReadMethod(
readMethod: DataReadMethod): InputMetrics = synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
@ -195,15 +195,17 @@ class TaskMetrics extends Serializable {
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
merged.incRecordsRead(depMetrics.recordsRead)
if (!depsShuffleReadMetrics.isEmpty) {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
}
_shuffleReadMetrics = Some(merged)
}
private[spark] def updateInputMetrics(): Unit = synchronized {

View file

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import org.scalatest.FunSuite
class TaskMetricsSuite extends FunSuite {
test("[SPARK-5701] updateShuffleReadMetrics: ShuffleReadMetrics not added when no shuffle deps") {
val taskMetrics = new TaskMetrics()
taskMetrics.updateShuffleReadMetrics()
assert(taskMetrics.shuffleReadMetrics.isEmpty)
}
}