[SPARK-34015][R] Fixing input timing in gapply

### What changes were proposed in this pull request?

When sparkR is run at log level INFO, a summary of how the worker spent its time processing the partition is printed. There is a logic error where it is over-reporting the time inputting rows.

In detail: the variable inputElap in a wider context is used to mark the end of reading rows, but in the part changed here it was used as a local variable for measuring the beginning of compute time in a loop over the groups in the partition. Thus, the error is not observable if there is only one group per partition, which is what you get in unit tests.

For our application, here's what a log entry looks like before these changes were applied:

`20/10/09 04:08:58 INFO RRunner: Times: boot = 0.013 s, init = 0.005 s, broadcast = 0.000 s, read-input = 529.471 s, compute = 492.037 s, write-output = 0.020 s, total = 1021.546 s`

this indicates that we're spending more time reading rows than operating on the rows.

After these changes, it looks like this:

`20/12/15 06:43:29 INFO RRunner: Times: boot = 0.013 s, init = 0.010 s, broadcast = 0.000 s, read-input = 120.275 s, compute = 1680.161 s, write-output = 0.045 s, total = 1812.553 s
`
### Why are the changes needed?

Metrics shouldn't mislead?

### Does this PR introduce _any_ user-facing change?

Aside from no longer misleading, no

### How was this patch tested?

unit tests passed. Field test results seem plausible

Closes #31021 from WamBamBoozle/input_timing.

Authored-by: Tom.Howland <Tom.Howland@target.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Tom.Howland 2021-01-06 11:40:02 +09:00 committed by HyukjinKwon
parent b77d11dfd9
commit 3d8ee492d6

View file

@ -196,7 +196,7 @@ if (isEmpty != 0) {
outputs <- list()
for (i in seq_len(length(data))) {
# Timing reading input data for execution
inputElap <- elapsedSecs()
computeStart <- elapsedSecs()
output <- compute(mode, partition, serializer, deserializer, keys[[i]],
colNames, computeFunc, data[[i]])
computeElap <- elapsedSecs()
@ -204,17 +204,18 @@ if (isEmpty != 0) {
outputs[[length(outputs) + 1L]] <- output
} else {
outputResult(serializer, output, outputCon)
outputComputeElapsDiff <- outputComputeElapsDiff + (elapsedSecs() - computeElap)
}
outputElap <- elapsedSecs()
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap)
outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap)
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - computeStart)
}
if (serializer == "arrow") {
# See https://stat.ethz.ch/pipermail/r-help/2010-September/252046.html
# rbind.fill might be an alternative to make it faster if plyr is installed.
outputStart <- elapsedSecs()
combined <- do.call("rbind", outputs)
SparkR:::writeSerializeInArrow(outputCon, combined)
outputComputeElapsDiff <- elapsedSecs() - outputStart
}
}
} else {