[SPARK-18972][CORE] Fix the netty thread names for RPC

## What changes were proposed in this pull request?

Right now the name of threads created by Netty for Spark RPC are `shuffle-client-**` and `shuffle-server-**`. It's pretty confusing.

This PR just uses the module name in TransportConf to set the thread name. In addition, it also includes the following minor fixes:

- TransportChannelHandler.channelActive and channelInactive should call the corresponding super methods.
- Make ShuffleBlockFetcherIterator throw NoSuchElementException if it has no more elements. Otherwise,  if the caller calls `next` without `hasNext`, it will just hang.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16380 from zsxwing/SPARK-18972.
This commit is contained in:
Shixiong Zhu 2016-12-22 16:22:55 -08:00
parent 2246ce88ae
commit f252cb5d16
5 changed files with 19 additions and 9 deletions

View file

@ -100,8 +100,10 @@ public class TransportClientFactory implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode()); IOMode ioMode = IOMode.valueOf(conf.ioMode());
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
// TODO: Make thread pool name configurable. this.workerGroup = NettyUtils.createEventLoop(
this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client"); ioMode,
conf.clientThreads(),
conf.getModuleName() + "-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
} }

View file

@ -88,14 +88,14 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
try { try {
requestHandler.channelActive(); requestHandler.channelActive();
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Exception from request handler while registering channel", e); logger.error("Exception from request handler while channel is active", e);
} }
try { try {
responseHandler.channelActive(); responseHandler.channelActive();
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Exception from response handler while registering channel", e); logger.error("Exception from response handler while channel is active", e);
} }
super.channelRegistered(ctx); super.channelActive(ctx);
} }
@Override @Override
@ -103,14 +103,14 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
try { try {
requestHandler.channelInactive(); requestHandler.channelInactive();
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Exception from request handler while unregistering channel", e); logger.error("Exception from request handler while channel is inactive", e);
} }
try { try {
responseHandler.channelInactive(); responseHandler.channelInactive();
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.error("Exception from response handler while unregistering channel", e); logger.error("Exception from response handler while channel is inactive", e);
} }
super.channelUnregistered(ctx); super.channelInactive(ctx);
} }
@Override @Override

View file

@ -89,7 +89,7 @@ public class TransportServer implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode()); IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup = EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup; EventLoopGroup workerGroup = bossGroup;
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(

View file

@ -70,6 +70,10 @@ public class TransportConf {
return "spark." + module + "." + suffix; return "spark." + module + "." + suffix;
} }
public String getModuleName() {
return module;
}
/** IO mode: nio or epoll */ /** IO mode: nio or epoll */
public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); } public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); }

View file

@ -315,6 +315,10 @@ final class ShuffleBlockFetcherIterator(
* Throws a FetchFailedException if the next block could not be fetched. * Throws a FetchFailedException if the next block could not be fetched.
*/ */
override def next(): (BlockId, InputStream) = { override def next(): (BlockId, InputStream) = {
if (!hasNext) {
throw new NoSuchElementException
}
numBlocksProcessed += 1 numBlocksProcessed += 1
var result: FetchResult = null var result: FetchResult = null