[SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages

### What changes were proposed in this pull request?
Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of https://github.com/apache/spark/pull/33034 will remove those changes once it is merged and remove the WIP at that time.

### Why are the changes needed?
Minor cleanup to make code more readable.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No tests, just changing log messages

Closes #33561 from venkata91/SPARK-36332.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit ab897109a3)
Signed-off-by: yi.wu <yi.wu@databricks.com>
This commit is contained in:
Venkata krishnan Sowrirajan 2021-08-10 09:53:53 +08:00 committed by yi.wu
parent 10f7f6e62b
commit 38dd42a50c

View file

@ -684,9 +684,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
private void writeBuf(ByteBuffer buf) throws IOException { private void writeBuf(ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) { while (buf.hasRemaining()) {
long updatedPos = partitionInfo.getDataFilePos() + length; long updatedPos = partitionInfo.getDataFilePos() + length;
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current pos" logger.debug("{} current pos {} updated pos {}", partitionInfo,
+ " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId,
partitionInfo.shuffleMergeId, partitionInfo.reduceId,
partitionInfo.getDataFilePos(), updatedPos); partitionInfo.getDataFilePos(), updatedPos);
length += partitionInfo.dataChannel.write(buf, updatedPos); length += partitionInfo.dataChannel.write(buf, updatedPos);
} }
@ -801,9 +799,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
return; return;
} }
abortIfNecessary(); abortIfNecessary();
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData writable", logger.trace("{} onData writable", partitionInfo);
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
partitionInfo.reduceId);
if (partitionInfo.getCurrentMapIndex() < 0) { if (partitionInfo.getCurrentMapIndex() < 0) {
partitionInfo.setCurrentMapIndex(mapIndex); partitionInfo.setCurrentMapIndex(mapIndex);
} }
@ -823,9 +819,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
throw ioe; throw ioe;
} }
} else { } else {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData deferred", logger.trace("{} onData deferred", partitionInfo);
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
partitionInfo.reduceId);
// If we cannot write to disk, we buffer the current block chunk in memory so it could // If we cannot write to disk, we buffer the current block chunk in memory so it could
// potentially be written to disk later. We take our best effort without guarantee // potentially be written to disk later. We take our best effort without guarantee
// that the block will be written to disk. If the block data is divided into multiple // that the block will be written to disk. If the block data is divided into multiple
@ -858,9 +852,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
@Override @Override
public void onComplete(String streamId) throws IOException { public void onComplete(String streamId) throws IOException {
synchronized (partitionInfo) { synchronized (partitionInfo) {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onComplete invoked", logger.trace("{} onComplete invoked", partitionInfo);
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
partitionInfo.reduceId);
// Initially when this request got to the server, the shuffle merge finalize request // Initially when this request got to the server, the shuffle merge finalize request
// was not received yet or this was the latest stage attempt (or latest shuffleMergeId) // was not received yet or this was the latest stage attempt (or latest shuffleMergeId)
// generating shuffle output for the shuffle ID. By the time we finish reading this // generating shuffle output for the shuffle ID. By the time we finish reading this
@ -942,9 +934,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
synchronized (partitionInfo) { synchronized (partitionInfo) {
if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId), if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
partitionInfo.shuffleMergeId, partitionInfo.reduceId)) { partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {}" logger.debug("{} encountered failure", partitionInfo);
+ " encountered failure", partitionInfo.appId, partitionInfo.shuffleId,
partitionInfo.shuffleMergeId, partitionInfo.reduceId);
partitionInfo.setCurrentMapIndex(-1); partitionInfo.setCurrentMapIndex(-1);
} }
} }
@ -1038,9 +1028,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
} }
public void setDataFilePos(long dataFilePos) { public void setDataFilePos(long dataFilePos) {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} current pos {}" logger.trace("{} current pos {} update pos {}", this, this.dataFilePos, dataFilePos);
+ " update pos {}", appId, shuffleId, shuffleMergeId, reduceId, this.dataFilePos,
dataFilePos);
this.dataFilePos = dataFilePos; this.dataFilePos = dataFilePos;
} }
@ -1049,9 +1037,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
} }
void setCurrentMapIndex(int mapIndex) { void setCurrentMapIndex(int mapIndex) {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} updated mapIndex {}" logger.trace("{} mapIndex {} current mapIndex {}", this, currentMapIndex, mapIndex);
+ " current mapIndex {}", appId, shuffleId, shuffleMergeId, reduceId,
currentMapIndex, mapIndex);
this.currentMapIndex = mapIndex; this.currentMapIndex = mapIndex;
} }
@ -1060,8 +1046,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
} }
void blockMerged(int mapIndex) { void blockMerged(int mapIndex) {
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} updated merging mapIndex {}", logger.debug("{} updated merging mapIndex {}", this, mapIndex);
appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
mapTracker.add(mapIndex); mapTracker.add(mapIndex);
chunkTracker.add(mapIndex); chunkTracker.add(mapIndex);
lastMergedMapIndex = mapIndex; lastMergedMapIndex = mapIndex;
@ -1079,9 +1064,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
*/ */
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
try { try {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} index current {}" logger.trace("{} index current {} updated {}", this, this.lastChunkOffset,
+ " updated {}", appId, shuffleId, shuffleMergeId, reduceId, chunkOffset);
this.lastChunkOffset, chunkOffset);
if (indexMetaUpdateFailed) { if (indexMetaUpdateFailed) {
indexFile.getChannel().position(indexFile.getPos()); indexFile.getChannel().position(indexFile.getPos());
} }
@ -1109,8 +1093,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
return; return;
} }
chunkTracker.add(mapIndex); chunkTracker.add(mapIndex);
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} mapIndex {}" logger.trace("{} mapIndex {} write chunk to meta file", this, mapIndex);
+ " write chunk to meta file", appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
if (indexMetaUpdateFailed) { if (indexMetaUpdateFailed) {
metaFile.getChannel().position(metaFile.getPos()); metaFile.getChannel().position(metaFile.getPos());
} }
@ -1175,6 +1158,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
} }
} }
@Override
public String toString() {
return String.format("Application %s shuffleId %s shuffleMergeId %s reduceId %s",
appId, shuffleId, shuffleMergeId, reduceId);
}
@Override @Override
protected void finalize() throws Throwable { protected void finalize() throws Throwable {
closeAllFilesAndDeleteIfNeeded(false); closeAllFilesAndDeleteIfNeeded(false);