[SPARK-5444][Network]Add a retry to deal with the conflict port in netty server.

If the `spark.blockMnager.port` had conflicted with a specific port, Spark will throw an exception and exit.
So add a retry to avoid this situation.

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #4240 from SaintBacchus/NettyPortConflict and squashes the following commits:

cc926d2 [huangzhaowei] Add a retry to deal with the conflict port in netty server.
This commit is contained in:
huangzhaowei 2015-02-06 14:35:29 -08:00 committed by Andrew Or
parent dcd1e42d6b
commit 2bda1c1d37
2 changed files with 41 additions and 2 deletions

View file

@ -100,8 +100,7 @@ public class TransportServer implements Closeable {
}
});
channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
channelFuture.syncUninterruptibly();
bindRightPort(portToBind);
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port :" + port);
@ -123,4 +122,37 @@ public class TransportServer implements Closeable {
bootstrap = null;
}
/**
* Attempt to bind to the specified port up to a fixed number of retries.
* If all attempts fail after the max number of retries, exit.
*/
private void bindRightPort(int portToBind) {
int maxPortRetries = conf.portMaxRetries();
for (int i = 0; i <= maxPortRetries; i++) {
int tryPort = -1;
if (0 == portToBind) {
// Do not increment port if tryPort is 0, which is treated as a special port
tryPort = 0;
} else {
// If the new port wraps around, do not try a privilege port
tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
}
try {
channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
channelFuture.syncUninterruptibly();
return;
} catch (Exception e) {
logger.warn("Netty service could not bind on port " + tryPort +
". Attempting the next port.");
if (i >= maxPortRetries) {
logger.error(e.getMessage() + ": Netty server failed after "
+ maxPortRetries + " retries.");
// If it can't find a right port, it should exit directly.
System.exit(-1);
}
}
}
}
}

View file

@ -98,4 +98,11 @@ public class TransportConf {
public boolean lazyFileDescriptor() {
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
}
/**
* Maximum number of retries when binding to a port before giving up.
*/
public int portMaxRetries() {
return conf.getInt("spark.port.maxRetries", 16);
}
}