[SPARK-36332][SHUFFLE] Cleanup RemoteBlockPushResolver log messages
### What changes were proposed in this pull request? Cleanup `RemoteBlockPushResolver` log messages by using `AppShufflePartitionInfo#toString()` to avoid duplications. Currently this is based off of https://github.com/apache/spark/pull/33034 will remove those changes once it is merged and remove the WIP at that time. ### Why are the changes needed? Minor cleanup to make code more readable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No tests, just changing log messages Closes #33561 from venkata91/SPARK-36332. Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com> Signed-off-by: yi.wu <yi.wu@databricks.com>
This commit is contained in:
parent
b8508f4876
commit
ab897109a3
|
@ -678,9 +678,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
private void writeBuf(ByteBuffer buf) throws IOException {
|
||||
while (buf.hasRemaining()) {
|
||||
long updatedPos = partitionInfo.getDataFilePos() + length;
|
||||
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current pos"
|
||||
+ " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId,
|
||||
partitionInfo.shuffleMergeId, partitionInfo.reduceId,
|
||||
logger.debug("{} current pos {} updated pos {}", partitionInfo,
|
||||
partitionInfo.getDataFilePos(), updatedPos);
|
||||
length += partitionInfo.dataChannel.write(buf, updatedPos);
|
||||
}
|
||||
|
@ -795,9 +793,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
return;
|
||||
}
|
||||
abortIfNecessary();
|
||||
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData writable",
|
||||
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
|
||||
partitionInfo.reduceId);
|
||||
logger.trace("{} onData writable", partitionInfo);
|
||||
if (partitionInfo.getCurrentMapIndex() < 0) {
|
||||
partitionInfo.setCurrentMapIndex(mapIndex);
|
||||
}
|
||||
|
@ -817,9 +813,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
throw ioe;
|
||||
}
|
||||
} else {
|
||||
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData deferred",
|
||||
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
|
||||
partitionInfo.reduceId);
|
||||
logger.trace("{} onData deferred", partitionInfo);
|
||||
// If we cannot write to disk, we buffer the current block chunk in memory so it could
|
||||
// potentially be written to disk later. We take our best effort without guarantee
|
||||
// that the block will be written to disk. If the block data is divided into multiple
|
||||
|
@ -852,9 +846,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
@Override
|
||||
public void onComplete(String streamId) throws IOException {
|
||||
synchronized (partitionInfo) {
|
||||
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onComplete invoked",
|
||||
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
|
||||
partitionInfo.reduceId);
|
||||
logger.trace("{} onComplete invoked", partitionInfo);
|
||||
// Initially when this request got to the server, the shuffle merge finalize request
|
||||
// was not received yet or this was the latest stage attempt (or latest shuffleMergeId)
|
||||
// generating shuffle output for the shuffle ID. By the time we finish reading this
|
||||
|
@ -936,9 +928,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
synchronized (partitionInfo) {
|
||||
if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
|
||||
partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
|
||||
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {}"
|
||||
+ " encountered failure", partitionInfo.appId, partitionInfo.shuffleId,
|
||||
partitionInfo.shuffleMergeId, partitionInfo.reduceId);
|
||||
logger.debug("{} encountered failure", partitionInfo);
|
||||
partitionInfo.setCurrentMapIndex(-1);
|
||||
}
|
||||
}
|
||||
|
@ -1032,9 +1022,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
}
|
||||
|
||||
public void setDataFilePos(long dataFilePos) {
|
||||
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} current pos {}"
|
||||
+ " update pos {}", appId, shuffleId, shuffleMergeId, reduceId, this.dataFilePos,
|
||||
dataFilePos);
|
||||
logger.trace("{} current pos {} update pos {}", this, this.dataFilePos, dataFilePos);
|
||||
this.dataFilePos = dataFilePos;
|
||||
}
|
||||
|
||||
|
@ -1043,9 +1031,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
}
|
||||
|
||||
void setCurrentMapIndex(int mapIndex) {
|
||||
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} updated mapIndex {}"
|
||||
+ " current mapIndex {}", appId, shuffleId, shuffleMergeId, reduceId,
|
||||
currentMapIndex, mapIndex);
|
||||
logger.trace("{} mapIndex {} current mapIndex {}", this, currentMapIndex, mapIndex);
|
||||
this.currentMapIndex = mapIndex;
|
||||
}
|
||||
|
||||
|
@ -1054,8 +1040,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
}
|
||||
|
||||
void blockMerged(int mapIndex) {
|
||||
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} updated merging mapIndex {}",
|
||||
appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
|
||||
logger.debug("{} updated merging mapIndex {}", this, mapIndex);
|
||||
mapTracker.add(mapIndex);
|
||||
chunkTracker.add(mapIndex);
|
||||
lastMergedMapIndex = mapIndex;
|
||||
|
@ -1073,9 +1058,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
*/
|
||||
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
|
||||
try {
|
||||
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} index current {}"
|
||||
+ " updated {}", appId, shuffleId, shuffleMergeId, reduceId,
|
||||
this.lastChunkOffset, chunkOffset);
|
||||
logger.trace("{} index current {} updated {}", this, this.lastChunkOffset,
|
||||
chunkOffset);
|
||||
if (indexMetaUpdateFailed) {
|
||||
indexFile.getChannel().position(indexFile.getPos());
|
||||
}
|
||||
|
@ -1103,8 +1087,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
return;
|
||||
}
|
||||
chunkTracker.add(mapIndex);
|
||||
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} mapIndex {}"
|
||||
+ " write chunk to meta file", appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
|
||||
logger.trace("{} mapIndex {} write chunk to meta file", this, mapIndex);
|
||||
if (indexMetaUpdateFailed) {
|
||||
metaFile.getChannel().position(metaFile.getPos());
|
||||
}
|
||||
|
@ -1169,6 +1152,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Application %s shuffleId %s shuffleMergeId %s reduceId %s",
|
||||
appId, shuffleId, shuffleMergeId, reduceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
closeAllFilesAndDeleteIfNeeded(false);
|
||||
|
|
Loading…
Reference in a new issue