[SPARK-26697][CORE] Log local & remote block sizes.

## What changes were proposed in this pull request?

To help debugging failed or slow tasks, its really useful to know the
size of the blocks getting fetched.  Though that is available at the
debug level, debug logs aren't on in general -- but there is already an
info level log line that this augments a little.

## How was this patch tested?

Ran very basic local-cluster mode app, looked at logs.  Example line:

```
INFO ShuffleBlockFetcherIterator: Getting 2 (194.0 B) non-empty blocks including 1 (97.0 B) local blocks and 1 (97.0 B) remote blocks
```

Full suite via jenkins.

Closes #23621 from squito/SPARK-26697.

Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Imran Rashid 2019-01-24 16:13:58 -08:00 committed by Marcelo Vanzin
parent 8d667c511c
commit 3699763fda

View file

@ -273,6 +273,8 @@ final class ShuffleBlockFetcherIterator(
// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
// at most maxBytesInFlight in order to limit the amount of data in flight.
val remoteRequests = new ArrayBuffer[FetchRequest]
var localBlockBytes = 0L
var remoteBlockBytes = 0L
for ((address, blockInfos) <- blocksByAddress) {
if (address.executorId == blockManager.blockManagerId.executorId) {
@ -284,6 +286,7 @@ final class ShuffleBlockFetcherIterator(
case None => // do nothing.
}
localBlocks ++= blockInfos.map(_._1)
localBlockBytes += blockInfos.map(_._2).sum
numBlocksToFetch += localBlocks.size
} else {
val iterator = blockInfos.iterator
@ -291,6 +294,7 @@ final class ShuffleBlockFetcherIterator(
var curBlocks = new ArrayBuffer[(BlockId, Long)]
while (iterator.hasNext) {
val (blockId, size) = iterator.next()
remoteBlockBytes += size
if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
} else if (size == 0) {
@ -317,8 +321,10 @@ final class ShuffleBlockFetcherIterator(
}
}
}
logInfo(s"Getting $numBlocksToFetch non-empty blocks including ${localBlocks.size}" +
s" local blocks and ${remoteBlocks.size} remote blocks")
val totalBytes = localBlockBytes + remoteBlockBytes
logInfo(s"Getting $numBlocksToFetch (${Utils.bytesToString(totalBytes)}) non-empty blocks " +
s"including ${localBlocks.size} (${Utils.bytesToString(localBlockBytes)}) local blocks and " +
s"${remoteBlocks.size} (${Utils.bytesToString(remoteBlockBytes)}) remote blocks")
remoteRequests
}