b377ea26e2
### What changes were proposed in this pull request? When channel terminated will call `connectionTerminated` and remove corresponding StreamState, then all coming request on this StreamState will throw NPE like ``` 2021-07-31 22:00:24,810 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1119950114515,chunkIndex=0],errorString=java.lang.NullPointerException at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:80) at org.apache.spark.network.server.ChunkFetchRequestHandler.processFetchRequest(ChunkFetchRequestHandler.java:101) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:82) at org.apache.spark.network.server.ChunkFetchRequestHandler.channelRead0(ChunkFetchRequestHandler.java:51) at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370) at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ] to /ip:53818; closing connection java.nio.channels.ClosedChannelException at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) at org.sparkproject.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) at org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at org.sparkproject.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:112) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104) at org.sparkproject.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) at org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ``` Since JVM will not show stack of NPE exception if it happen many times. ``` 021-07-28 08:25:44,720 ERROR server.ChunkFetchRequestHandler (ChunkFetchRequestHandler.java:lambda$respond$1(146)) - Error sending result ChunkFetchFailure[streamChunkId=StreamChunkId[streamId=1187623335353,chunkIndex=11],errorString=java.lang.NullPoint erException ] to /10.130.10.5:42148; closing connection java.nio.channels.ClosedChannelException ``` Makes user confused. We should improved this error message? ### Why are the changes needed? Improve error message ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes #33622 from AngersZhuuuu/SPARK-36391. Lead-authored-by: Angerszhuuuu <angers.zhu@gmaihu@gmail.com> Co-authored-by: Angerszhuuuu <angers.zhu@gmail.com> Signed-off-by: yi.wu <yi.wu@databricks.com> |
||
---|---|---|
.. | ||
kvstore | ||
network-common | ||
network-shuffle | ||
network-yarn | ||
sketch | ||
tags | ||
unsafe |