Skip to content

Commit 051fa57

Browse files
committed
add Netty源码分析-ChannelPipeline下
1 parent 2061519 commit 051fa57

File tree

1 file changed

+322
-0
lines changed

1 file changed

+322
-0
lines changed
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
## 1. 概览
2+
3+
上篇已经讲解了ChannelPipeline以及ChannelHandler的关系以及对应的类继承关系图,本节来详细分析一下inbound和outbound的原理。
4+
5+
## 2. DefaultChannelPipeline源码分析
6+
7+
在DefaultChannelPipeline中,定义了一个head“头结点”和一个tail“尾结点”,它们都是AbstractChannelhandlerContext类的节点,我们都知道在ChannelPipeline中AbstractChannelHandlerContext就是节点元素的抽象类实现,而这个handlerContext持有ChannelHandler。
8+
9+
在Netty中我们还需要知道inbound和outbound类型的ChannelHandler节点的执行顺序。
10+
11+
下面来先看下一个Netty的demo
12+
13+
该Netty的demo中,分别定义了六个Handler,分为两组,一组是inboundHandler,另一组是outboundHandler。
14+
15+
16+
InBoundHandlerA
17+
```java
18+
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
19+
20+
@Override
21+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
22+
System.out.println("InBoundHandlerA: " + msg);
23+
ctx.fireChannelRead(msg);
24+
}
25+
}
26+
```
27+
28+
InBoundHandlerB
29+
```java
30+
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
31+
@Override
32+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
33+
System.out.println("OutBoundHandlerB: " + msg);
34+
ctx.write(msg, promise);
35+
}
36+
37+
38+
@Override
39+
public void handlerAdded(final ChannelHandlerContext ctx) {
40+
ctx.executor().schedule(() -> {
41+
ctx.channel().write("ctx.channel().write -> hello world");
42+
ctx.write("hello world");
43+
}, 3, TimeUnit.SECONDS);
44+
}
45+
}
46+
```
47+
48+
InBoundHandlerC
49+
```java
50+
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
51+
@Override
52+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
53+
System.out.println("InBoundHandlerC: " + msg);
54+
ctx.fireChannelRead(msg);
55+
}
56+
}
57+
```
58+
59+
```java
60+
public final class Server {
61+
62+
public static void main(String[] args) throws Exception {
63+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
64+
EventLoopGroup workerGroup = new NioEventLoopGroup();
65+
66+
try {
67+
ServerBootstrap b = new ServerBootstrap();
68+
b.group(bossGroup, workerGroup)
69+
.channel(NioServerSocketChannel.class)
70+
.childOption(ChannelOption.TCP_NODELAY, true)
71+
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
72+
.childHandler(new ChannelInitializer<SocketChannel>() {
73+
@Override
74+
public void initChannel(SocketChannel ch) {
75+
ch.pipeline().addLast(new InBoundHandlerA());
76+
ch.pipeline().addLast(new InBoundHandlerB());
77+
ch.pipeline().addLast(new InBoundHandlerC());
78+
}
79+
});
80+
81+
ChannelFuture f = b.bind(8888).sync();
82+
83+
f.channel().closeFuture().sync();
84+
} finally {
85+
bossGroup.shutdownGracefully();
86+
workerGroup.shutdownGracefully();
87+
}
88+
}
89+
}
90+
```
91+
92+
执行结果如下:
93+
```
94+
InBoundHandlerA: hello world
95+
InBoundHandlerB: hello world
96+
InBoundHandlerC: hello world
97+
```
98+
99+
可以发现Netty中,对于inboundHandler来说是按照顺序执行操作的。
100+
101+
接着在看看outboundHandler定义如下
102+
103+
OutBoundHandlerA
104+
```java
105+
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
106+
107+
@Override
108+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
109+
System.out.println("OutBoundHandlerA: " + msg);
110+
ctx.write(msg, promise);
111+
}
112+
}
113+
```
114+
115+
OutBoundHandlerB
116+
```java
117+
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
118+
@Override
119+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
120+
System.out.println("OutBoundHandlerB: " + msg);
121+
ctx.write(msg, promise);
122+
}
123+
}
124+
```
125+
126+
OutBoundHandlerC
127+
```java
128+
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
129+
130+
@Override
131+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
132+
System.out.println("OutBoundHandlerC: " + msg);
133+
ctx.write(msg, promise);
134+
}
135+
}
136+
```
137+
138+
139+
然后修改Server类为如下,
140+
141+
```java
142+
public final class Server {
143+
144+
public static void main(String[] args) throws Exception {
145+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
146+
EventLoopGroup workerGroup = new NioEventLoopGroup();
147+
148+
try {
149+
ServerBootstrap b = new ServerBootstrap();
150+
b.group(bossGroup, workerGroup)
151+
.channel(NioServerSocketChannel.class)
152+
.childOption(ChannelOption.TCP_NODELAY, true)
153+
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
154+
.childHandler(new ChannelInitializer<SocketChannel>() {
155+
@Override
156+
public void initChannel(SocketChannel ch) {
157+
ch.pipeline().addLast(new OutBoundHandlerA());
158+
ch.pipeline().addLast(new OutBoundHandlerB());
159+
ch.pipeline().addLast(new OutBoundHandlerC());
160+
}
161+
});
162+
163+
ChannelFuture f = b.bind(8888).sync();
164+
165+
f.channel().closeFuture().sync();
166+
} finally {
167+
bossGroup.shutdownGracefully();
168+
workerGroup.shutdownGracefully();
169+
}
170+
}
171+
}
172+
```
173+
174+
执行结果如下:
175+
```
176+
OutBoundHandlerC: ctx.channel().write -> hello world
177+
OutBoundHandlerB: ctx.channel().write -> hello world
178+
OutBoundHandlerA: ctx.channel().write -> hello world
179+
OutBoundHandlerA: hello world
180+
```
181+
182+
可以看到在Netty中对于ountboundHandler来说,是倒序执行的。
183+
184+
整个Netty执行ChannelHandler可以用下图来描述。
185+
186+
![channelPipeline事件传播图](https://coderbruis.github.io/javaDocs/img/netty/source/ChannelPipeline事件传播图.png)
187+
188+
189+
上图描述的Head节点顺序执行,Tail节点逆序执行的源码是在DefaultChannelPipeline中,在《Netty-ChannelPipeline-上》文章开头就已经说明了,对于inboundHandler类型的Handler,主要还是用于监听Channel的read、register、active、exceptionCaught等事件,而对于outboundHandler类型来说,主要是用于bind、connect、write、flush等事件,回顾了这一点后,我们在继续看DefaultChannelPipeline源码
190+
191+
```java
192+
public class DefaultChannelPipeline implements ChannelPipeline {
193+
... 省略
194+
195+
@Override
196+
public final ChannelPipeline fireChannelRead(Object msg) {
197+
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
198+
return this;
199+
}
200+
201+
@Override
202+
public final ChannelFuture write(Object msg) {
203+
return tail.write(msg);
204+
}
205+
206+
... 省略
207+
}
208+
```
209+
210+
分别以inbound类型的channelRead和outbound类型的write来分析。
211+
212+
DefaultChannelPipeline.java
213+
```java
214+
@Override
215+
public final ChannelPipeline fireChannelRead(Object msg) {
216+
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
217+
return this;
218+
}
219+
```
220+
在AbstractChannelHandlerContext#invokeChannelRead方法中,传入了一个重要的入参:head,这里就是传入的Head头结点,这一重要调用得以让inbound类型handler在ChannelPipeline中按顺序执行。
221+
222+
AbstractChannelHandlerContext.java
223+
```java
224+
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
225+
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
226+
EventExecutor executor = next.executor();
227+
// 在NioEventLoop线程内,next这里传入的是head头结点
228+
if (executor.inEventLoop()) {
229+
next.invokeChannelRead(m);
230+
} else {
231+
executor.execute(new Runnable() {
232+
@Override
233+
public void run() {
234+
next.invokeChannelRead(m);
235+
}
236+
});
237+
}
238+
}
239+
240+
private void invokeChannelRead(Object msg) {
241+
if (invokeHandler()) {
242+
try {
243+
((ChannelInboundHandler) handler()).channelRead(this, msg);
244+
} catch (Throwable t) {
245+
invokeExceptionCaught(t);
246+
}
247+
} else {
248+
fireChannelRead(msg);
249+
}
250+
}
251+
252+
```
253+
254+
ChannelInboundHandler#channelRead的调用,会最终来到InBoundHandlerA里的channelRead方法。
255+
```java
256+
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
257+
258+
@Override
259+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
260+
System.out.println("InBoundHandlerA: " + msg);
261+
ctx.fireChannelRead(msg);
262+
}
263+
}
264+
```
265+
266+
经过AbstractChannelHandlerContext#fireChannelRead,会在ChannelPipeline中寻找下一个inbound,然后继续执行channelRead。
267+
268+
```java
269+
@Override
270+
public ChannelHandlerContext fireChannelRead(final Object msg) {
271+
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
272+
return this;
273+
}
274+
```
275+
276+
277+
细看OutBoundHandlerB#handlerAdded方法由两个write,一个是ctx.channel.write,另一个是ctx.write,这两个有啥区别呢?为啥输出结果是三条:ctx.channel().write -> hello world,一条hello world呢?
278+
279+
启动Server启动类之后,再cmd窗口输入连接socket的命令debug之后分析得
280+
281+
```
282+
telnet 127.0.0.1 8888
283+
```
284+
285+
在客户端socket连接进Netty之后,会先注册channel并init初始化,这时会调用Server类里ServerBootstrap注入的ChannelInitilizer的initChannel方法,最终得以向ChannelPipeline里添加进OutBoundHandlerA、OutBoundHandlerB、OutBoundHandlerC,随后调用
286+
287+
```java
288+
ch.pipeline().addLast(new xxxx)
289+
```
290+
只有会触发DefaultChannelPipeline#callHandlerAdded0()方法,最终来到OutBoundHandler里的handlerAdded()方法,并向Netty的定时任务队列里添加了一个匿名内部任务,也就是:
291+
292+
```java
293+
@Override
294+
public void handlerAdded(final ChannelHandlerContext ctx) {
295+
ctx.executor().schedule(() -> {
296+
ctx.channel().write("ctx.channel().write -> hello world");
297+
ctx.write("hello world");
298+
}, 3, TimeUnit.SECONDS);
299+
}
300+
```
301+
302+
随后完成客户端Socket的初始化工作。此时服务端的selector继续执行for死循环,执行到任务队列,此时发现任务队列中有一个定时任务需要执行,则拿出任务并执行任务,执行过程会跳转到上面的匿名内部类,并依次执行ctx.channel().write()和ctx.write()两个方法。
303+
304+
```java
305+
ctx.channel().write()
306+
```
307+
方法会从ChannelPipeline的尾部tail开始执行(上文已经总结过,outboundHandler都是从tail节点开始执行handler) ,所以字符串“ctx.channel().write -> hello world”就会按outboundHandlerC、outboundHandlerB、outboundHandlerC这个顺序开始执行,执行完head节点之后会一路往上返回到Ctx.channel().write()
308+
方法,并最后去执行ctx.write()方法,而ctx.write()方法会从当前的handler节点开始向前执行,所以当前outboundHandlerB的前节点是outboundHandlerA,所以最终控制台打印出:
309+
```
310+
OutBoundHandlerC: ctx.channel().write -> hello world
311+
OutBoundHandlerB: ctx.channel().write -> hello world
312+
OutBoundHandlerA: ctx.channel().write -> hello world
313+
OutBoundHandlerA: hello world
314+
```
315+
316+
整个过程比较复杂,也比较绕,下面用一张流程图来描述整个过程。
317+
318+
![NettyChannelPipeline流程图1](https://coderbruis.github.io/javaDocs/img/netty/source/NettyChannelPipeline流程图1.png)
319+
320+
321+
- TODO ChannelPipeline优化?MASK
322+
- TODO SimpleChannelInboundHandler源码分析

0 commit comments

Comments
 (0)