[SPARK-15259] Sort time metric should not include spill and record insertion time

## What changes were proposed in this pull request?

After SPARK-14669 it seems the sort time metric includes both spill and record insertion time. This makes it not very useful since the metric becomes close to the total execution time of the node.

We should track just the time spent for in-memory sort, as before.

## How was this patch tested?

Verified metric in the UI, also unit test on UnsafeExternalRowSorter.

cc davies

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #13035 from ericl/fix-metrics.
This commit is contained in:
Eric Liang 2016-05-11 11:25:46 -07:00 committed by Davies Liu
parent 2931437972
commit 6d0368ab8d
5 changed files with 53 additions and 7 deletions

View file

@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
private long totalSpillBytes = 0L;
private long totalSortTimeNanos = 0L;
private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@ -247,6 +248,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
return peakMemoryUsedBytes;
}
/**
* @return the total amount of time spent sorting data (in-memory only).
*/
public long getSortTimeNanos() {
UnsafeInMemorySorter sorter = inMemSorter;
if (sorter != null) {
return sorter.getSortTimeNanos();
}
return totalSortTimeNanos;
}
/**
* Return the total number of bytes that has been spilled into disk so far.
*/
@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
totalSortTimeNanos += inMemSorter.getSortTimeNanos();
inMemSorter.free();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);

View file

@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter {
private long initialSize;
private long totalSortTimeNanos = 0L;
public UnsafeInMemorySorter(
final MemoryConsumer consumer,
final TaskMemoryManager memoryManager,
@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter {
return pos / 2;
}
/**
* @return the total amount of time spent sorting data (in-memory only).
*/
public long getSortTimeNanos() {
return totalSortTimeNanos;
}
public long getMemoryUsage() {
return array.size() * 8;
}
@ -265,6 +274,7 @@ public final class UnsafeInMemorySorter {
*/
public SortedIterator getSortedIterator() {
int offset = 0;
long start = System.nanoTime();
if (sorter != null) {
if (this.radixSortSupport != null) {
// TODO(ekl) we should handle NULL values before radix sort for efficiency, since they
@ -275,6 +285,7 @@ public final class UnsafeInMemorySorter {
sorter.sort(array, 0, pos / 2, sortComparator);
}
}
totalSortTimeNanos += System.nanoTime() - start;
return new SortedIterator(pos / 2, offset);
}
}

View file

@ -49,6 +49,7 @@ import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
@ -225,6 +226,25 @@ public class UnsafeExternalSorterSuite {
assertSpillFilesWereCleanedUp();
}
@Test
public void testSortTimeMetric() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
long prevSortTime = sorter.getSortTimeNanos();
assertEquals(prevSortTime, 0);
sorter.insertRecord(null, 0, 0, 0);
sorter.spill();
assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
prevSortTime = sorter.getSortTimeNanos();
sorter.spill(); // no sort needed
assertEquals(sorter.getSortTimeNanos(), prevSortTime);
sorter.insertRecord(null, 0, 0, 0);
UnsafeSorterIterator iter = sorter.getSortedIterator();
assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
}
@Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
final UnsafeExternalSorter sorter = newSorter();

View file

@ -108,6 +108,13 @@ public final class UnsafeExternalRowSorter {
return sorter.getPeakMemoryUsedBytes();
}
/**
* @return the total amount of time spent sorting data (in-memory only).
*/
public long getSortTimeNanos() {
return sorter.getSortTimeNanos();
}
private void cleanupResources() {
sorter.cleanupResources();
}

View file

@ -97,11 +97,8 @@ case class SortExec(
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
val beforeSort = System.nanoTime()
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
sortTime += (System.nanoTime() - beforeSort) / 1000000
sortTime += sorter.getSortTimeNanos / 1000000
peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@ -151,15 +148,13 @@ case class SortExec(
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val spillSizeBefore = ctx.freshName("spillSizeBefore")
val startTime = ctx.freshName("startTime")
val sortTime = metricTerm(ctx, "sortTime")
s"""
| if ($needToSort) {
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
| long $startTime = System.nanoTime();
| $addToSorter();
| $sortedIterator = $sorterVariable.sort();
| $sortTime.add((System.nanoTime() - $startTime) / 1000000);
| $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
| $peakMemory.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());