diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 64a83171e9..a67683b892 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -43,7 +43,7 @@ import org.apache.spark.network.protocol.OneWayMessage; import org.apache.spark.network.protocol.RpcRequest; import org.apache.spark.network.protocol.StreamChunkId; import org.apache.spark.network.protocol.StreamRequest; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** * Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow @@ -135,9 +135,10 @@ public class TransportClient implements Closeable { long streamId, final int chunkIndex, final ChunkReceivedCallback callback) { - final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); - logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr); + if (logger.isDebugEnabled()) { + logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel)); + } final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); @@ -148,11 +149,13 @@ public class TransportClient implements Closeable { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; - logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr, - timeTaken); + if (logger.isTraceEnabled()) { + logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel), + timeTaken); + } } else { String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, - serverAddr, future.cause()); + getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeFetchRequest(streamChunkId); channel.close(); @@ -173,9 +176,10 @@ public class TransportClient implements Closeable { * @param callback Object to call with the stream data. */ public void stream(final String streamId, final StreamCallback callback) { - final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); - logger.debug("Sending stream request for {} to {}", streamId, serverAddr); + if (logger.isDebugEnabled()) { + logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel)); + } // Need to synchronize here so that the callback is added to the queue and the RPC is // written to the socket atomically, so that callbacks are called in the right order @@ -188,11 +192,13 @@ public class TransportClient implements Closeable { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; - logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr, - timeTaken); + if (logger.isTraceEnabled()) { + logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel), + timeTaken); + } } else { String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId, - serverAddr, future.cause()); + getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); channel.close(); try { @@ -215,9 +221,10 @@ public class TransportClient implements Closeable { * @return The RPC's id. */ public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) { - final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); - logger.trace("Sending RPC to {}", serverAddr); + if (logger.isTraceEnabled()) { + logger.trace("Sending RPC to {}", getRemoteAddress(channel)); + } final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); handler.addRpcRequest(requestId, callback); @@ -228,10 +235,12 @@ public class TransportClient implements Closeable { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; - logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken); + if (logger.isTraceEnabled()) { + logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken); + } } else { String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId, - serverAddr, future.cause()); + getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeRpcRequest(requestId); channel.close(); diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index a27aaf2b27..1c9916baee 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -195,7 +195,7 @@ public class TransportClientFactory implements Closeable { /** Create a completely new {@link TransportClient} to the remote address. */ private TransportClient createClient(InetSocketAddress address) throws IOException { - logger.debug("Creating new connection to " + address); + logger.debug("Creating new connection to {}", address); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 8a69223c88..179667296e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -38,7 +38,7 @@ import org.apache.spark.network.protocol.StreamChunkId; import org.apache.spark.network.protocol.StreamFailure; import org.apache.spark.network.protocol.StreamResponse; import org.apache.spark.network.server.MessageHandler; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; import org.apache.spark.network.util.TransportFrameDecoder; /** @@ -122,7 +122,7 @@ public class TransportResponseHandler extends MessageHandler { @Override public void channelInactive() { if (numOutstandingRequests() > 0) { - String remoteAddress = NettyUtils.getRemoteAddress(channel); + String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed")); @@ -132,7 +132,7 @@ public class TransportResponseHandler extends MessageHandler { @Override public void exceptionCaught(Throwable cause) { if (numOutstandingRequests() > 0) { - String remoteAddress = NettyUtils.getRemoteAddress(channel); + String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); failOutstandingRequests(cause); @@ -141,13 +141,12 @@ public class TransportResponseHandler extends MessageHandler { @Override public void handle(ResponseMessage message) throws Exception { - String remoteAddress = NettyUtils.getRemoteAddress(channel); if (message instanceof ChunkFetchSuccess) { ChunkFetchSuccess resp = (ChunkFetchSuccess) message; ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} since it is not outstanding", - resp.streamChunkId, remoteAddress); + resp.streamChunkId, getRemoteAddress(channel)); resp.body().release(); } else { outstandingFetches.remove(resp.streamChunkId); @@ -159,7 +158,7 @@ public class TransportResponseHandler extends MessageHandler { ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding", - resp.streamChunkId, remoteAddress, resp.errorString); + resp.streamChunkId, getRemoteAddress(channel), resp.errorString); } else { outstandingFetches.remove(resp.streamChunkId); listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException( @@ -170,7 +169,7 @@ public class TransportResponseHandler extends MessageHandler { RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding", - resp.requestId, remoteAddress, resp.body().size()); + resp.requestId, getRemoteAddress(channel), resp.body().size()); } else { outstandingRpcs.remove(resp.requestId); try { @@ -184,7 +183,7 @@ public class TransportResponseHandler extends MessageHandler { RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding", - resp.requestId, remoteAddress, resp.errorString); + resp.requestId, getRemoteAddress(channel), resp.errorString); } else { outstandingRpcs.remove(resp.requestId); listener.onFailure(new RuntimeException(resp.errorString)); diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java index 074780f2b9..f045318618 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java @@ -39,7 +39,7 @@ public final class MessageDecoder extends MessageToMessageDecoder { Message.Type msgType = Message.Type.decode(in); Message decoded = decode(msgType, in); assert decoded.type() == msgType; - logger.trace("Received message " + msgType + ": " + decoded); + logger.trace("Received message {}: {}", msgType, decoded); out.add(decoded); } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index f2223379a9..884ea7d115 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -29,7 +29,7 @@ import org.apache.spark.network.client.TransportResponseHandler; import org.apache.spark.network.protocol.Message; import org.apache.spark.network.protocol.RequestMessage; import org.apache.spark.network.protocol.ResponseMessage; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** * The single Transport-level Channel handler which is used for delegating requests to the @@ -76,7 +76,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler requestTimeoutNs; if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) { if (responseHandler.numOutstandingRequests() > 0) { - String address = NettyUtils.getRemoteAddress(ctx.channel()); + String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + "requests. Assuming connection is dead; please adjust spark.network.timeout if " + "this is wrong.", address, requestTimeoutNs / 1000 / 1000); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index bebe88ec5d..e67a034cb8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -17,6 +17,7 @@ package org.apache.spark.network.server; +import java.net.SocketAddress; import java.nio.ByteBuffer; import com.google.common.base.Throwables; @@ -42,7 +43,7 @@ import org.apache.spark.network.protocol.RpcResponse; import org.apache.spark.network.protocol.StreamFailure; import org.apache.spark.network.protocol.StreamRequest; import org.apache.spark.network.protocol.StreamResponse; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** * A handler that processes requests from clients and writes chunk data back. Each handler is @@ -114,9 +115,9 @@ public class TransportRequestHandler extends MessageHandler { } private void processFetchRequest(final ChunkFetchRequest req) { - final String client = NettyUtils.getRemoteAddress(channel); - - logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId); + if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId); + } ManagedBuffer buf; try { @@ -125,7 +126,7 @@ public class TransportRequestHandler extends MessageHandler { buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); } catch (Exception e) { logger.error(String.format( - "Error opening block %s for request from %s", req.streamChunkId, client), e); + "Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e); respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e))); return; } @@ -134,13 +135,12 @@ public class TransportRequestHandler extends MessageHandler { } private void processStreamRequest(final StreamRequest req) { - final String client = NettyUtils.getRemoteAddress(channel); ManagedBuffer buf; try { buf = streamManager.openStream(req.streamId); } catch (Exception e) { logger.error(String.format( - "Error opening stream %s for request from %s", req.streamId, client), e); + "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e); respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e))); return; } @@ -189,13 +189,13 @@ public class TransportRequestHandler extends MessageHandler { * it will be logged and the channel closed. */ private void respond(final Encodable result) { - final String remoteAddress = channel.remoteAddress().toString(); + final SocketAddress remoteAddress = channel.remoteAddress(); channel.writeAndFlush(result).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - logger.trace(String.format("Sent result %s to client %s", result, remoteAddress)); + logger.trace("Sent result {} to client {}", result, remoteAddress); } else { logger.error(String.format("Error sending result %s to %s; closing connection", result, remoteAddress), future.cause()); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index baae235e02..a67db4f69f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -130,7 +130,7 @@ public class TransportServer implements Closeable { channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); - logger.debug("Shuffle server started on port :" + port); + logger.debug("Shuffle server started on port: {}", port); } @Override diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 1270cef621..d05d0ac4d2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -42,7 +42,7 @@ import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId; import org.apache.spark.network.shuffle.protocol.*; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; import org.apache.spark.network.util.TransportConf; @@ -101,11 +101,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler { blocks.add(block); } long streamId = streamManager.registerStream(client.getClientId(), blocks.iterator()); - logger.trace("Registered streamId {} with {} buffers for client {} from host {}", - streamId, - msg.blockIds.length, - client.getClientId(), - NettyUtils.getRemoteAddress(client.getChannel())); + if (logger.isTraceEnabled()) { + logger.trace("Registered streamId {} with {} buffers for client {} from host {}", + streamId, + msg.blockIds.length, + client.getClientId(), + getRemoteAddress(client.getChannel())); + } callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer()); metrics.blockTransferRateBytes.mark(totalBlockSize); } finally { diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 56cf1e2e3e..d436711692 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -267,7 +267,7 @@ public class ExternalShuffleBlockResolver { for (String localDir : dirs) { try { JavaUtils.deleteRecursively(new File(localDir)); - logger.debug("Successfully cleaned up directory: " + localDir); + logger.debug("Successfully cleaned up directory: {}", localDir); } catch (Exception e) { logger.error("Failed to delete directory: " + localDir, e); }