|
| 1 | +## 1. 概览 |
| 2 | + |
| 3 | +上篇已经讲解了ChannelPipeline以及ChannelHandler的关系以及对应的类继承关系图,本节来详细分析一下inbound和outbound的原理。 |
| 4 | + |
| 5 | +## 2. DefaultChannelPipeline源码分析 |
| 6 | + |
| 7 | +在DefaultChannelPipeline中,定义了一个head“头结点”和一个tail“尾结点”,它们都是AbstractChannelhandlerContext类的节点,我们都知道在ChannelPipeline中AbstractChannelHandlerContext就是节点元素的抽象类实现,而这个handlerContext持有ChannelHandler。 |
| 8 | + |
| 9 | +在Netty中我们还需要知道inbound和outbound类型的ChannelHandler节点的执行顺序。 |
| 10 | + |
| 11 | +下面来先看下一个Netty的demo |
| 12 | + |
| 13 | +该Netty的demo中,分别定义了六个Handler,分为两组,一组是inboundHandler,另一组是outboundHandler。 |
| 14 | + |
| 15 | + |
| 16 | +InBoundHandlerA |
| 17 | +```java |
| 18 | +public class InBoundHandlerA extends ChannelInboundHandlerAdapter { |
| 19 | + |
| 20 | + @Override |
| 21 | + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| 22 | + System.out.println("InBoundHandlerA: " + msg); |
| 23 | + ctx.fireChannelRead(msg); |
| 24 | + } |
| 25 | +} |
| 26 | +``` |
| 27 | + |
| 28 | +InBoundHandlerB |
| 29 | +```java |
| 30 | +public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter { |
| 31 | + @Override |
| 32 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { |
| 33 | + System.out.println("OutBoundHandlerB: " + msg); |
| 34 | + ctx.write(msg, promise); |
| 35 | + } |
| 36 | + |
| 37 | + |
| 38 | + @Override |
| 39 | + public void handlerAdded(final ChannelHandlerContext ctx) { |
| 40 | + ctx.executor().schedule(() -> { |
| 41 | + ctx.channel().write("ctx.channel().write -> hello world"); |
| 42 | + ctx.write("hello world"); |
| 43 | + }, 3, TimeUnit.SECONDS); |
| 44 | + } |
| 45 | +} |
| 46 | +``` |
| 47 | + |
| 48 | +InBoundHandlerC |
| 49 | +```java |
| 50 | +public class InBoundHandlerC extends ChannelInboundHandlerAdapter { |
| 51 | + @Override |
| 52 | + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| 53 | + System.out.println("InBoundHandlerC: " + msg); |
| 54 | + ctx.fireChannelRead(msg); |
| 55 | + } |
| 56 | +} |
| 57 | +``` |
| 58 | + |
| 59 | +```java |
| 60 | +public final class Server { |
| 61 | + |
| 62 | + public static void main(String[] args) throws Exception { |
| 63 | + EventLoopGroup bossGroup = new NioEventLoopGroup(1); |
| 64 | + EventLoopGroup workerGroup = new NioEventLoopGroup(); |
| 65 | + |
| 66 | + try { |
| 67 | + ServerBootstrap b = new ServerBootstrap(); |
| 68 | + b.group(bossGroup, workerGroup) |
| 69 | + .channel(NioServerSocketChannel.class) |
| 70 | + .childOption(ChannelOption.TCP_NODELAY, true) |
| 71 | + .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue") |
| 72 | + .childHandler(new ChannelInitializer<SocketChannel>() { |
| 73 | + @Override |
| 74 | + public void initChannel(SocketChannel ch) { |
| 75 | + ch.pipeline().addLast(new InBoundHandlerA()); |
| 76 | + ch.pipeline().addLast(new InBoundHandlerB()); |
| 77 | + ch.pipeline().addLast(new InBoundHandlerC()); |
| 78 | + } |
| 79 | + }); |
| 80 | + |
| 81 | + ChannelFuture f = b.bind(8888).sync(); |
| 82 | + |
| 83 | + f.channel().closeFuture().sync(); |
| 84 | + } finally { |
| 85 | + bossGroup.shutdownGracefully(); |
| 86 | + workerGroup.shutdownGracefully(); |
| 87 | + } |
| 88 | + } |
| 89 | +} |
| 90 | +``` |
| 91 | + |
| 92 | +执行结果如下: |
| 93 | +``` |
| 94 | +InBoundHandlerA: hello world |
| 95 | +InBoundHandlerB: hello world |
| 96 | +InBoundHandlerC: hello world |
| 97 | +``` |
| 98 | + |
| 99 | +可以发现Netty中,对于inboundHandler来说是按照顺序执行操作的。 |
| 100 | + |
| 101 | +接着在看看outboundHandler定义如下 |
| 102 | + |
| 103 | +OutBoundHandlerA |
| 104 | +```java |
| 105 | +public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter { |
| 106 | + |
| 107 | + @Override |
| 108 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { |
| 109 | + System.out.println("OutBoundHandlerA: " + msg); |
| 110 | + ctx.write(msg, promise); |
| 111 | + } |
| 112 | +} |
| 113 | +``` |
| 114 | + |
| 115 | +OutBoundHandlerB |
| 116 | +```java |
| 117 | +public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter { |
| 118 | + @Override |
| 119 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { |
| 120 | + System.out.println("OutBoundHandlerB: " + msg); |
| 121 | + ctx.write(msg, promise); |
| 122 | + } |
| 123 | +} |
| 124 | +``` |
| 125 | + |
| 126 | +OutBoundHandlerC |
| 127 | +```java |
| 128 | +public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter { |
| 129 | + |
| 130 | + @Override |
| 131 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { |
| 132 | + System.out.println("OutBoundHandlerC: " + msg); |
| 133 | + ctx.write(msg, promise); |
| 134 | + } |
| 135 | +} |
| 136 | +``` |
| 137 | + |
| 138 | + |
| 139 | +然后修改Server类为如下, |
| 140 | + |
| 141 | +```java |
| 142 | +public final class Server { |
| 143 | + |
| 144 | + public static void main(String[] args) throws Exception { |
| 145 | + EventLoopGroup bossGroup = new NioEventLoopGroup(1); |
| 146 | + EventLoopGroup workerGroup = new NioEventLoopGroup(); |
| 147 | + |
| 148 | + try { |
| 149 | + ServerBootstrap b = new ServerBootstrap(); |
| 150 | + b.group(bossGroup, workerGroup) |
| 151 | + .channel(NioServerSocketChannel.class) |
| 152 | + .childOption(ChannelOption.TCP_NODELAY, true) |
| 153 | + .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue") |
| 154 | + .childHandler(new ChannelInitializer<SocketChannel>() { |
| 155 | + @Override |
| 156 | + public void initChannel(SocketChannel ch) { |
| 157 | + ch.pipeline().addLast(new OutBoundHandlerA()); |
| 158 | + ch.pipeline().addLast(new OutBoundHandlerB()); |
| 159 | + ch.pipeline().addLast(new OutBoundHandlerC()); |
| 160 | + } |
| 161 | + }); |
| 162 | + |
| 163 | + ChannelFuture f = b.bind(8888).sync(); |
| 164 | + |
| 165 | + f.channel().closeFuture().sync(); |
| 166 | + } finally { |
| 167 | + bossGroup.shutdownGracefully(); |
| 168 | + workerGroup.shutdownGracefully(); |
| 169 | + } |
| 170 | + } |
| 171 | +} |
| 172 | +``` |
| 173 | + |
| 174 | +执行结果如下: |
| 175 | +``` |
| 176 | +OutBoundHandlerC: ctx.channel().write -> hello world |
| 177 | +OutBoundHandlerB: ctx.channel().write -> hello world |
| 178 | +OutBoundHandlerA: ctx.channel().write -> hello world |
| 179 | +OutBoundHandlerA: hello world |
| 180 | +``` |
| 181 | + |
| 182 | +可以看到在Netty中对于ountboundHandler来说,是倒序执行的。 |
| 183 | + |
| 184 | +整个Netty执行ChannelHandler可以用下图来描述。 |
| 185 | + |
| 186 | + |
| 187 | + |
| 188 | + |
| 189 | +上图描述的Head节点顺序执行,Tail节点逆序执行的源码是在DefaultChannelPipeline中,在《Netty-ChannelPipeline-上》文章开头就已经说明了,对于inboundHandler类型的Handler,主要还是用于监听Channel的read、register、active、exceptionCaught等事件,而对于outboundHandler类型来说,主要是用于bind、connect、write、flush等事件,回顾了这一点后,我们在继续看DefaultChannelPipeline源码 |
| 190 | + |
| 191 | +```java |
| 192 | +public class DefaultChannelPipeline implements ChannelPipeline { |
| 193 | + ... 省略 |
| 194 | + |
| 195 | + @Override |
| 196 | + public final ChannelPipeline fireChannelRead(Object msg) { |
| 197 | + AbstractChannelHandlerContext.invokeChannelRead(head, msg); |
| 198 | + return this; |
| 199 | + } |
| 200 | + |
| 201 | + @Override |
| 202 | + public final ChannelFuture write(Object msg) { |
| 203 | + return tail.write(msg); |
| 204 | + } |
| 205 | + |
| 206 | + ... 省略 |
| 207 | +} |
| 208 | +``` |
| 209 | + |
| 210 | +分别以inbound类型的channelRead和outbound类型的write来分析。 |
| 211 | + |
| 212 | +DefaultChannelPipeline.java |
| 213 | +```java |
| 214 | + @Override |
| 215 | + public final ChannelPipeline fireChannelRead(Object msg) { |
| 216 | + AbstractChannelHandlerContext.invokeChannelRead(head, msg); |
| 217 | + return this; |
| 218 | + } |
| 219 | +``` |
| 220 | +在AbstractChannelHandlerContext#invokeChannelRead方法中,传入了一个重要的入参:head,这里就是传入的Head头结点,这一重要调用得以让inbound类型handler在ChannelPipeline中按顺序执行。 |
| 221 | + |
| 222 | +AbstractChannelHandlerContext.java |
| 223 | +```java |
| 224 | + static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { |
| 225 | + final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); |
| 226 | + EventExecutor executor = next.executor(); |
| 227 | + // 在NioEventLoop线程内,next这里传入的是head头结点 |
| 228 | + if (executor.inEventLoop()) { |
| 229 | + next.invokeChannelRead(m); |
| 230 | + } else { |
| 231 | + executor.execute(new Runnable() { |
| 232 | + @Override |
| 233 | + public void run() { |
| 234 | + next.invokeChannelRead(m); |
| 235 | + } |
| 236 | + }); |
| 237 | + } |
| 238 | + } |
| 239 | + |
| 240 | + private void invokeChannelRead(Object msg) { |
| 241 | + if (invokeHandler()) { |
| 242 | + try { |
| 243 | + ((ChannelInboundHandler) handler()).channelRead(this, msg); |
| 244 | + } catch (Throwable t) { |
| 245 | + invokeExceptionCaught(t); |
| 246 | + } |
| 247 | + } else { |
| 248 | + fireChannelRead(msg); |
| 249 | + } |
| 250 | + } |
| 251 | + |
| 252 | +``` |
| 253 | + |
| 254 | +ChannelInboundHandler#channelRead的调用,会最终来到InBoundHandlerA里的channelRead方法。 |
| 255 | +```java |
| 256 | +public class InBoundHandlerA extends ChannelInboundHandlerAdapter { |
| 257 | + |
| 258 | + @Override |
| 259 | + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| 260 | + System.out.println("InBoundHandlerA: " + msg); |
| 261 | + ctx.fireChannelRead(msg); |
| 262 | + } |
| 263 | +} |
| 264 | +``` |
| 265 | + |
| 266 | +经过AbstractChannelHandlerContext#fireChannelRead,会在ChannelPipeline中寻找下一个inbound,然后继续执行channelRead。 |
| 267 | + |
| 268 | +```java |
| 269 | + @Override |
| 270 | + public ChannelHandlerContext fireChannelRead(final Object msg) { |
| 271 | + invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); |
| 272 | + return this; |
| 273 | + } |
| 274 | +``` |
| 275 | + |
| 276 | + |
| 277 | +细看OutBoundHandlerB#handlerAdded方法由两个write,一个是ctx.channel.write,另一个是ctx.write,这两个有啥区别呢?为啥输出结果是三条:ctx.channel().write -> hello world,一条hello world呢? |
| 278 | + |
| 279 | +启动Server启动类之后,再cmd窗口输入连接socket的命令debug之后分析得 |
| 280 | + |
| 281 | +``` |
| 282 | +telnet 127.0.0.1 8888 |
| 283 | +``` |
| 284 | + |
| 285 | +在客户端socket连接进Netty之后,会先注册channel并init初始化,这时会调用Server类里ServerBootstrap注入的ChannelInitilizer的initChannel方法,最终得以向ChannelPipeline里添加进OutBoundHandlerA、OutBoundHandlerB、OutBoundHandlerC,随后调用 |
| 286 | + |
| 287 | +```java |
| 288 | +ch.pipeline().addLast(new xxxx) |
| 289 | +``` |
| 290 | +只有会触发DefaultChannelPipeline#callHandlerAdded0()方法,最终来到OutBoundHandler里的handlerAdded()方法,并向Netty的定时任务队列里添加了一个匿名内部任务,也就是: |
| 291 | + |
| 292 | +```java |
| 293 | + @Override |
| 294 | + public void handlerAdded(final ChannelHandlerContext ctx) { |
| 295 | + ctx.executor().schedule(() -> { |
| 296 | + ctx.channel().write("ctx.channel().write -> hello world"); |
| 297 | + ctx.write("hello world"); |
| 298 | + }, 3, TimeUnit.SECONDS); |
| 299 | + } |
| 300 | +``` |
| 301 | + |
| 302 | +随后完成客户端Socket的初始化工作。此时服务端的selector继续执行for死循环,执行到任务队列,此时发现任务队列中有一个定时任务需要执行,则拿出任务并执行任务,执行过程会跳转到上面的匿名内部类,并依次执行ctx.channel().write()和ctx.write()两个方法。 |
| 303 | + |
| 304 | +```java |
| 305 | +ctx.channel().write() |
| 306 | +``` |
| 307 | +方法会从ChannelPipeline的尾部tail开始执行(上文已经总结过,outboundHandler都是从tail节点开始执行handler) ,所以字符串“ctx.channel().write -> hello world”就会按outboundHandlerC、outboundHandlerB、outboundHandlerC这个顺序开始执行,执行完head节点之后会一路往上返回到Ctx.channel().write() |
| 308 | +方法,并最后去执行ctx.write()方法,而ctx.write()方法会从当前的handler节点开始向前执行,所以当前outboundHandlerB的前节点是outboundHandlerA,所以最终控制台打印出: |
| 309 | +``` |
| 310 | +OutBoundHandlerC: ctx.channel().write -> hello world |
| 311 | +OutBoundHandlerB: ctx.channel().write -> hello world |
| 312 | +OutBoundHandlerA: ctx.channel().write -> hello world |
| 313 | +OutBoundHandlerA: hello world |
| 314 | +``` |
| 315 | + |
| 316 | +整个过程比较复杂,也比较绕,下面用一张流程图来描述整个过程。 |
| 317 | + |
| 318 | + |
| 319 | + |
| 320 | + |
| 321 | +- TODO ChannelPipeline优化?MASK |
| 322 | +- TODO SimpleChannelInboundHandler源码分析 |
0 commit comments