From 38dd42a50c3887c866244e4a4fba4a6e69353b2d Mon Sep 17 00:00:00 2001 From: Venkata krishnan Sowrirajan Date: Tue, 10 Aug 2021 09:53:53 +0800 Subject: [PATCH] [SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages ### What changes were proposed in this pull request? Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of https://github.com/apache/spark/pull/33034 will remove those changes once it is merged and remove the WIP at that time. ### Why are the changes needed? Minor cleanup to make code more readable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No tests, just changing log messages Closes #33561 from venkata91/SPARK-36332. Authored-by: Venkata krishnan Sowrirajan Signed-off-by: yi.wu (cherry picked from commit ab897109a3fa9f83a20857a292dd68fe97e447a8) Signed-off-by: yi.wu --- .../shuffle/RemoteBlockPushResolver.java | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 9a45f2c895..4f26ddf39a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -684,9 +684,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private void writeBuf(ByteBuffer buf) throws IOException { while (buf.hasRemaining()) { long updatedPos = partitionInfo.getDataFilePos() + length; - logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current pos" - + " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId, - partitionInfo.shuffleMergeId, partitionInfo.reduceId, + logger.debug("{} current pos {} updated pos {}", partitionInfo, partitionInfo.getDataFilePos(), updatedPos); length += partitionInfo.dataChannel.write(buf, updatedPos); } @@ -801,9 +799,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { return; } abortIfNecessary(); - logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData writable", - partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId, - partitionInfo.reduceId); + logger.trace("{} onData writable", partitionInfo); if (partitionInfo.getCurrentMapIndex() < 0) { partitionInfo.setCurrentMapIndex(mapIndex); } @@ -823,9 +819,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { throw ioe; } } else { - logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData deferred", - partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId, - partitionInfo.reduceId); + logger.trace("{} onData deferred", partitionInfo); // If we cannot write to disk, we buffer the current block chunk in memory so it could // potentially be written to disk later. We take our best effort without guarantee // that the block will be written to disk. If the block data is divided into multiple @@ -858,9 +852,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { @Override public void onComplete(String streamId) throws IOException { synchronized (partitionInfo) { - logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onComplete invoked", - partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId, - partitionInfo.reduceId); + logger.trace("{} onComplete invoked", partitionInfo); // Initially when this request got to the server, the shuffle merge finalize request // was not received yet or this was the latest stage attempt (or latest shuffleMergeId) // generating shuffle output for the shuffle ID. By the time we finish reading this @@ -942,9 +934,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { synchronized (partitionInfo) { if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId), partitionInfo.shuffleMergeId, partitionInfo.reduceId)) { - logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {}" - + " encountered failure", partitionInfo.appId, partitionInfo.shuffleId, - partitionInfo.shuffleMergeId, partitionInfo.reduceId); + logger.debug("{} encountered failure", partitionInfo); partitionInfo.setCurrentMapIndex(-1); } } @@ -1038,9 +1028,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } public void setDataFilePos(long dataFilePos) { - logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} current pos {}" - + " update pos {}", appId, shuffleId, shuffleMergeId, reduceId, this.dataFilePos, - dataFilePos); + logger.trace("{} current pos {} update pos {}", this, this.dataFilePos, dataFilePos); this.dataFilePos = dataFilePos; } @@ -1049,9 +1037,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } void setCurrentMapIndex(int mapIndex) { - logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} updated mapIndex {}" - + " current mapIndex {}", appId, shuffleId, shuffleMergeId, reduceId, - currentMapIndex, mapIndex); + logger.trace("{} mapIndex {} current mapIndex {}", this, currentMapIndex, mapIndex); this.currentMapIndex = mapIndex; } @@ -1060,8 +1046,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } void blockMerged(int mapIndex) { - logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} updated merging mapIndex {}", - appId, shuffleId, shuffleMergeId, reduceId, mapIndex); + logger.debug("{} updated merging mapIndex {}", this, mapIndex); mapTracker.add(mapIndex); chunkTracker.add(mapIndex); lastMergedMapIndex = mapIndex; @@ -1079,9 +1064,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { */ void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { try { - logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} index current {}" - + " updated {}", appId, shuffleId, shuffleMergeId, reduceId, - this.lastChunkOffset, chunkOffset); + logger.trace("{} index current {} updated {}", this, this.lastChunkOffset, + chunkOffset); if (indexMetaUpdateFailed) { indexFile.getChannel().position(indexFile.getPos()); } @@ -1109,8 +1093,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { return; } chunkTracker.add(mapIndex); - logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} mapIndex {}" - + " write chunk to meta file", appId, shuffleId, shuffleMergeId, reduceId, mapIndex); + logger.trace("{} mapIndex {} write chunk to meta file", this, mapIndex); if (indexMetaUpdateFailed) { metaFile.getChannel().position(metaFile.getPos()); } @@ -1175,6 +1158,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { } } + @Override + public String toString() { + return String.format("Application %s shuffleId %s shuffleMergeId %s reduceId %s", + appId, shuffleId, shuffleMergeId, reduceId); + } + @Override protected void finalize() throws Throwable { closeAllFilesAndDeleteIfNeeded(false);