Skip to content

Commit f2f9a9a

Browse files
海阳罗海阳罗
authored andcommitted
基于Netty实现im通讯工具
1 parent 34ed31b commit f2f9a9a

24 files changed

+821
-98
lines changed

Spring-Netty/learnnetty.iml

Lines changed: 0 additions & 98 deletions
This file was deleted.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.bruis.learnnetty.im.client;
2+
3+
import com.bruis.learnnetty.im.client.handler.FirstClientHandler;
4+
import com.bruis.learnnetty.im.client.handler.LoginResponseHandler;
5+
import com.bruis.learnnetty.im.client.handler.MessageResponseHandler;
6+
import com.bruis.learnnetty.im.codec.PacketDecoder;
7+
import com.bruis.learnnetty.im.codec.PacketEncoder;
8+
import com.bruis.learnnetty.im.codec.Spliter;
9+
import com.bruis.learnnetty.im.model.MessageRequestPacket;
10+
import com.bruis.learnnetty.im.util.LoginUtil;
11+
import io.netty.bootstrap.Bootstrap;
12+
import io.netty.channel.Channel;
13+
import io.netty.channel.ChannelFuture;
14+
import io.netty.channel.ChannelInitializer;
15+
import io.netty.channel.ChannelOption;
16+
import io.netty.channel.nio.NioEventLoopGroup;
17+
import io.netty.channel.socket.SocketChannel;
18+
import io.netty.channel.socket.nio.NioSocketChannel;
19+
20+
import java.util.Date;
21+
import java.util.Scanner;
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* @Description 客户端
26+
* @Author luohaiyang
27+
* @Date 2022/3/22
28+
*/
29+
public class NettyClient {
30+
private static final int MAX_RETRY = 5;
31+
private static final String HOST = "127.0.0.1";
32+
private static final int PORT = 8000;
33+
34+
35+
public static void main(String[] args) {
36+
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
37+
38+
Bootstrap bootstrap = new Bootstrap();
39+
bootstrap
40+
.group(workerGroup)
41+
.channel(NioSocketChannel.class)
42+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
43+
.option(ChannelOption.SO_KEEPALIVE, true)
44+
.option(ChannelOption.TCP_NODELAY, true)
45+
.handler(new ChannelInitializer<SocketChannel>() {
46+
@Override
47+
public void initChannel(SocketChannel ch) {
48+
ch.pipeline().addLast(new Spliter());
49+
// ch.pipeline().addLast(new FirstClientHandler());
50+
// 解码
51+
ch.pipeline().addLast(new PacketDecoder());
52+
ch.pipeline().addLast(new LoginResponseHandler());
53+
ch.pipeline().addLast(new MessageResponseHandler());
54+
// 编码
55+
ch.pipeline().addLast(new PacketEncoder());
56+
}
57+
});
58+
59+
connect(bootstrap, HOST, PORT, MAX_RETRY);
60+
}
61+
62+
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
63+
bootstrap.connect(host, port).addListener(future -> {
64+
if (future.isSuccess()) {
65+
System.out.println(new Date() + ": 连接成功,启动控制台线程……");
66+
Channel channel = ((ChannelFuture) future).channel();
67+
startConsoleThread(channel);
68+
} else if (retry == 0) {
69+
System.err.println("重试次数已用完,放弃连接!");
70+
} else {
71+
// 第几次重连
72+
int order = (MAX_RETRY - retry) + 1;
73+
// 本次重连的间隔
74+
int delay = 1 << order;
75+
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
76+
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
77+
.SECONDS);
78+
}
79+
});
80+
}
81+
82+
private static void startConsoleThread(Channel channel) {
83+
new Thread(() -> {
84+
while (!Thread.interrupted()) {
85+
if (LoginUtil.hasLogin(channel)) {
86+
System.out.println("输入消息发送至服务端: ");
87+
Scanner sc = new Scanner(System.in);
88+
String line = sc.nextLine();
89+
90+
channel.writeAndFlush(new MessageRequestPacket(line));
91+
}
92+
}
93+
}).start();
94+
}
95+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.bruis.learnnetty.im.client.handler;
2+
3+
import com.bruis.learnnetty.im.model.MessageRequestPacket;
4+
import com.bruis.learnnetty.im.model.MessageResponsePacket;
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
9+
import java.nio.charset.Charset;
10+
11+
/**
12+
* @Description
13+
* @Author luohaiyang
14+
* @Date 2022/3/23
15+
*/
16+
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
17+
@Override
18+
public void channelActive(ChannelHandlerContext ctx) {
19+
for (int i = 0; i < 5; i++) {
20+
// ByteBuf buffer = getByteBuf(ctx);
21+
MessageRequestPacket packet = new MessageRequestPacket();
22+
packet.setMessage("你好啊,测试一下Netty的通讯!");
23+
ctx.channel().writeAndFlush(packet);
24+
}
25+
}
26+
27+
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
28+
// byte[] bytes = "你好,欢迎关注我的微信公众号,《闪电侠的博客》!".getBytes(Charset.forName("utf-8"));
29+
byte[] bytes = "你好".getBytes(Charset.forName("utf-8"));
30+
ByteBuf buffer = ctx.alloc().buffer();
31+
buffer.writeBytes(bytes);
32+
33+
return buffer;
34+
}
35+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.bruis.learnnetty.im.client.handler;
2+
3+
import com.bruis.learnnetty.im.model.LoginRequestPacket;
4+
import com.bruis.learnnetty.im.model.LoginResponsePacket;
5+
import com.bruis.learnnetty.im.util.LoginUtil;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.SimpleChannelInboundHandler;
8+
9+
import java.util.Date;
10+
import java.util.UUID;
11+
12+
/**
13+
* @Description 登录响应的reponse
14+
* @Author luohaiyang
15+
* @Date 2022/3/23
16+
*/
17+
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {
18+
19+
@Override
20+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
21+
// 创建登录对象
22+
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
23+
loginRequestPacket.setUserId(UUID.randomUUID().toString());
24+
loginRequestPacket.setUserName("flash");
25+
loginRequestPacket.setPassword("pwd");
26+
27+
// 写数据-发起登录
28+
ctx.channel().writeAndFlush(loginRequestPacket);
29+
}
30+
31+
@Override
32+
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginResponsePacket loginResponsePacket) throws Exception {
33+
if (loginResponsePacket.isSuccess()) {
34+
System.out.println(new Date() + ": 客户端登录成功");
35+
LoginUtil.markAsLogin(channelHandlerContext.channel());
36+
} else {
37+
System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());
38+
}
39+
}
40+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.bruis.learnnetty.im.client.handler;
2+
3+
import com.bruis.learnnetty.im.model.MessageResponsePacket;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.channel.SimpleChannelInboundHandler;
6+
7+
import java.util.Date;
8+
9+
/**
10+
* @Description
11+
* @Author luohaiyang
12+
* @Date 2022/3/23
13+
*/
14+
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
15+
@Override
16+
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageResponsePacket messageResponsePacket) throws Exception {
17+
System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage());
18+
}
19+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.bruis.learnnetty.im.codec;
2+
3+
import com.bruis.learnnetty.im.model.PacketCodeC;
4+
import io.netty.buffer.ByteBuf;
5+
import io.netty.channel.ChannelHandlerContext;
6+
import io.netty.handler.codec.ByteToMessageDecoder;
7+
8+
import java.util.List;
9+
10+
/**
11+
* @Description
12+
* @Author luohaiyang
13+
* @Date 2022/3/23
14+
*/
15+
public class PacketDecoder extends ByteToMessageDecoder {
16+
@Override
17+
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
18+
out.add(PacketCodeC.INSTANCE.decode(byteBuf));
19+
}
20+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.bruis.learnnetty.im.codec;
2+
3+
import com.bruis.learnnetty.im.model.Packet;
4+
import com.bruis.learnnetty.im.model.PacketCodeC;
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.handler.codec.MessageToByteEncoder;
8+
9+
/**
10+
* @Description
11+
* @Author luohaiyang
12+
* @Date 2022/3/23
13+
*/
14+
public class PacketEncoder extends MessageToByteEncoder<Packet> {
15+
@Override
16+
protected void encode(ChannelHandlerContext channelHandlerContext, Packet packet, ByteBuf byteBuf) throws Exception {
17+
PacketCodeC.INSTANCE.encode(byteBuf, packet);
18+
}
19+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.bruis.learnnetty.im.codec;
2+
3+
import com.bruis.learnnetty.im.model.PacketCodeC;
4+
import io.netty.buffer.ByteBuf;
5+
import io.netty.channel.ChannelHandlerContext;
6+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
7+
8+
/**
9+
* @Description
10+
* @Author luohaiyang
11+
* @Date 2022/3/23
12+
*/
13+
public class Spliter extends LengthFieldBasedFrameDecoder {
14+
private static final int LENGTH_FIELD_OFFSET = 7;
15+
private static final int LENGTH_FIELD_LENGTH = 4;
16+
17+
public Spliter() {
18+
super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH);
19+
}
20+
21+
@Override
22+
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
23+
if (in.getInt(in.readerIndex()) != PacketCodeC.MAGIC_NUMBER) {
24+
ctx.channel().close();
25+
return null;
26+
}
27+
28+
return super.decode(ctx, in);
29+
}
30+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.bruis.learnnetty.im.model;
2+
3+
import io.netty.util.AttributeKey;
4+
5+
/**
6+
* @Description Netty 属性集
7+
* @Author haiyangluo
8+
* @Date 2022/3/22
9+
*/
10+
public interface Attributes {
11+
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
12+
}

0 commit comments

Comments
 (0)