diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 625c3257d7..ef20999180 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -100,8 +100,7 @@ public class TransportServer implements Closeable { } }); - channelFuture = bootstrap.bind(new InetSocketAddress(portToBind)); - channelFuture.syncUninterruptibly(); + bindRightPort(portToBind); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port :" + port); @@ -123,4 +122,37 @@ public class TransportServer implements Closeable { bootstrap = null; } + /** + * Attempt to bind to the specified port up to a fixed number of retries. + * If all attempts fail after the max number of retries, exit. + */ + private void bindRightPort(int portToBind) { + int maxPortRetries = conf.portMaxRetries(); + + for (int i = 0; i <= maxPortRetries; i++) { + int tryPort = -1; + if (0 == portToBind) { + // Do not increment port if tryPort is 0, which is treated as a special port + tryPort = 0; + } else { + // If the new port wraps around, do not try a privilege port + tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024; + } + try { + channelFuture = bootstrap.bind(new InetSocketAddress(tryPort)); + channelFuture.syncUninterruptibly(); + return; + } catch (Exception e) { + logger.warn("Netty service could not bind on port " + tryPort + + ". Attempting the next port."); + if (i >= maxPortRetries) { + logger.error(e.getMessage() + ": Netty server failed after " + + maxPortRetries + " retries."); + + // If it can't find a right port, it should exit directly. + System.exit(-1); + } + } + } + } } diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 6c91786886..2eaf3b71d9 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -98,4 +98,11 @@ public class TransportConf { public boolean lazyFileDescriptor() { return conf.getBoolean("spark.shuffle.io.lazyFD", true); } + + /** + * Maximum number of retries when binding to a port before giving up. + */ + public int portMaxRetries() { + return conf.getInt("spark.port.maxRetries", 16); + } }