基于netty的群聊

基于netty的群聊

学了一段时间的netty知识,现在通过这个基于console的程序来对netty的相关接口做个简单的应用。

准备

依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.44.Final</version>
</dependency>

代码实现

我们都知道,一个典型的netty程序绝大部分使基于以下三部曲来走的;

  1. server/client 启动类
  2. xxxInitializer (implements ChannelInitializer<?> )
  3. xxxChannelHandler (implememts SimpleChannelInboundHandler<?>)

按照以上的三部曲思路,就可以实现自己的网络程序了。

Server端实现

server启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyChatServer {

public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyChatServerInitializeor());

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();

} catch (Exception e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
serverChannelInitializer
1
2
3
4
5
6
7
8
9
10
11
12
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 粘包、粘包处理器
pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyServerChannelHandler());

}
}
serverChannelHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class MyServerChannelHandler extends SimpleChannelInboundHandler<String> {

/**
* 保存channel对象
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static final String DATE_PARTTEN = "yyyy-MM-dd HH:mm:ss:SSS";
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
// 当前遍历的channel不是发送msg的channel对象。则向其他客户端广播
if (channel != ch) {
ch.writeAndFlush(channel.remoteAddress() + ", 发送的消息" + msg + "\n");
} else {
ch.writeAndFlush("[自己] " + msg + " \n");
}
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 上线了!");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + " 离开了!");

}

/**
* 客户端链接建立的时候调用
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//super.handlerAdded(ctx);
// 服务端与客户端建立
Channel channel = ctx.channel();
// 向其他链接的客户端发送广播信息
SocketAddress socketAddress = channel.remoteAddress();
String date = DateTimeFormatter.ofPattern(DATE_PARTTEN).format(LocalDateTime.now());
// 向channelGroup中的每一个channel对象发送一个消息
channelGroup.writeAndFlush(date + " [服务器] - " + socketAddress + " 加入 \n");
// 保存该客户端链接
channelGroup.add(channel);
}

/**
* 链接断开
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String date = DateTimeFormatter.ofPattern(DATE_PARTTEN).format(LocalDateTime.now());

channelGroup.writeAndFlush(date + " [服务器] - " + channel.remoteAddress() + " 离开 \n");
}

/**
* 客户端注册进来
* @param ctx
* @throws Exception
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
}

client 端实现

client启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyChatClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();


try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new MyClientChannelInitializor());

ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
Channel channel = channelFuture.channel();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}

}
}
clientChannelInitializer
1
2
3
4
5
6
7
8
9
10
11
public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("DelimiterBasedFrameDecoder", new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyChatClientChannelHandler());

}
}
clientChannelHandler
1
2
3
4
5
6
7
8
9
public class MyChatClientChannelHandler extends SimpleChannelInboundHandler<String> {

// 直接打印服务端返回的消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);

}
}