upgrade Netty from 4.0.0.Beta2 to 4.0.13.Final
This commit is contained in:
parent
3bf7c708d3
commit
040dd3ecd5
|
@ -23,6 +23,7 @@ import io.netty.channel.ChannelOption;
|
|||
import io.netty.channel.oio.OioEventLoopGroup;
|
||||
import io.netty.channel.socket.oio.OioSocketChannel;
|
||||
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -32,6 +33,7 @@ class FileClient {
|
|||
private FileClientHandler handler = null;
|
||||
private Channel channel = null;
|
||||
private Bootstrap bootstrap = null;
|
||||
private EventLoopGroup group = null;
|
||||
private int connectTimeout = 60*1000; // 1 min
|
||||
|
||||
public FileClient(FileClientHandler handler, int connectTimeout) {
|
||||
|
@ -40,8 +42,9 @@ class FileClient {
|
|||
}
|
||||
|
||||
public void init() {
|
||||
bootstrap = new Bootstrap();
|
||||
bootstrap.group(new OioEventLoopGroup())
|
||||
group = new OioEventLoopGroup();
|
||||
Bootstrap bootstrap = new Bootstrap();
|
||||
bootstrap.group(group)
|
||||
.channel(OioSocketChannel.class)
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
.option(ChannelOption.TCP_NODELAY, true)
|
||||
|
@ -76,11 +79,13 @@ class FileClient {
|
|||
|
||||
public void close() {
|
||||
if(channel != null) {
|
||||
channel.close();
|
||||
channel.close().awaitUninterruptibly();
|
||||
channel = null;
|
||||
}
|
||||
if ( bootstrap!=null) {
|
||||
bootstrap.shutdown();
|
||||
|
||||
if (group!=null) {
|
||||
group.shutdownGracefully();
|
||||
group = null;
|
||||
bootstrap = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.spark.network.netty;
|
||||
|
||||
import io.netty.buffer.BufType;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.string.StringEncoder;
|
||||
|
@ -25,7 +24,7 @@ import io.netty.handler.codec.string.StringEncoder;
|
|||
|
||||
class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
private FileClientHandler fhandler;
|
||||
private final FileClientHandler fhandler;
|
||||
|
||||
public FileClientChannelInitializer(FileClientHandler handler) {
|
||||
fhandler = handler;
|
||||
|
@ -35,7 +34,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|||
public void initChannel(SocketChannel channel) {
|
||||
// file no more than 2G
|
||||
channel.pipeline()
|
||||
.addLast("encoder", new StringEncoder(BufType.BYTE))
|
||||
.addLast("encoder", new StringEncoder())
|
||||
.addLast("handler", fhandler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.spark.network.netty;
|
|||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundByteHandlerAdapter;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
|
||||
import org.apache.spark.storage.BlockId;
|
||||
|
||||
abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
|
||||
abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
|
||||
private FileHeader currentHeader = null;
|
||||
|
||||
|
@ -37,13 +37,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
|
|||
public abstract void handleError(BlockId blockId);
|
||||
|
||||
@Override
|
||||
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
|
||||
// Use direct buffer if possible.
|
||||
return ctx.alloc().ioBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
|
||||
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
|
||||
// get header
|
||||
if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
|
||||
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));
|
||||
|
|
|
@ -22,6 +22,9 @@ import java.net.InetSocketAddress;
|
|||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoop;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.oio.OioEventLoopGroup;
|
||||
import io.netty.channel.socket.oio.OioServerSocketChannel;
|
||||
|
||||
|
@ -36,7 +39,8 @@ class FileServer {
|
|||
|
||||
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
|
||||
|
||||
private ServerBootstrap bootstrap = null;
|
||||
private EventLoopGroup bossGroup = null;
|
||||
private EventLoopGroup workerGroup = null;
|
||||
private ChannelFuture channelFuture = null;
|
||||
private int port = 0;
|
||||
private Thread blockingThread = null;
|
||||
|
@ -45,8 +49,11 @@ class FileServer {
|
|||
InetSocketAddress addr = new InetSocketAddress(port);
|
||||
|
||||
// Configure the server.
|
||||
bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
|
||||
bossGroup = new OioEventLoopGroup();
|
||||
workerGroup = new OioEventLoopGroup();
|
||||
|
||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(bossGroup, workerGroup)
|
||||
.channel(OioServerSocketChannel.class)
|
||||
.option(ChannelOption.SO_BACKLOG, 100)
|
||||
.option(ChannelOption.SO_RCVBUF, 1500)
|
||||
|
@ -89,13 +96,19 @@ class FileServer {
|
|||
public void stop() {
|
||||
// Close the bound channel.
|
||||
if (channelFuture != null) {
|
||||
channelFuture.channel().close();
|
||||
channelFuture.channel().close().awaitUninterruptibly();
|
||||
channelFuture = null;
|
||||
}
|
||||
// Shutdown bootstrap.
|
||||
if (bootstrap != null) {
|
||||
bootstrap.shutdown();
|
||||
bootstrap = null;
|
||||
|
||||
// Shutdown event groups
|
||||
if (bossGroup != null) {
|
||||
bossGroup.shutdownGracefully();
|
||||
bossGroup = null;
|
||||
}
|
||||
|
||||
if (workerGroup != null) {
|
||||
workerGroup.shutdownGracefully();
|
||||
workerGroup = null;
|
||||
}
|
||||
// TODO: Shutdown all accepted channels as well ?
|
||||
}
|
||||
|
|
|
@ -21,13 +21,13 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.DefaultFileRegion;
|
||||
|
||||
import org.apache.spark.storage.BlockId;
|
||||
import org.apache.spark.storage.FileSegment;
|
||||
|
||||
class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
|
||||
class FileServerHandler extends SimpleChannelInboundHandler<String> {
|
||||
|
||||
PathResolver pResolver;
|
||||
|
||||
|
@ -36,7 +36,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
|
||||
public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
|
||||
BlockId blockId = BlockId.apply(blockIdString);
|
||||
FileSegment fileSegment = pResolver.getBlockLocation(blockId);
|
||||
// if getBlockLocation returns null, close the channel
|
||||
|
@ -60,7 +60,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
|
|||
int len = new Long(length).intValue();
|
||||
ctx.write((new FileHeader(len, blockId)).buffer());
|
||||
try {
|
||||
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
|
||||
ctx.write(new DefaultFileRegion(new FileInputStream(file)
|
||||
.getChannel(), fileSegment.offset(), fileSegment.length()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -282,7 +282,7 @@
|
|||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
<version>4.0.0.CR1</version>
|
||||
<version>4.0.13.Final</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
|
|
|
@ -178,7 +178,7 @@ object SparkBuild extends Build {
|
|||
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
"io.netty" % "netty-all" % "4.0.0.CR1",
|
||||
"io.netty" % "netty-all" % "4.0.13.Final",
|
||||
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
|
||||
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
|
||||
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
|
||||
|
|
Loading…
Reference in a new issue