本篇文章为《跟着闪电侠学 netty》的学习记录。
Netty 是什么
传统 IO/阻塞 IO
先来看看传统的 IO 编程。
实现一下一个场景,客户端每个两秒发送一个带有时间戳的“hello,world”给服务端,服务端接收并打印。
服务端首先要创建一个 ServerSocket 来监听 8000 端口,然后创建一个新线程,线程不断调用
ServerSocket 中的 accept 方法获取新连接,获得新连接后再为每个连接创建一个线程,这个线程负责读数据。
服务端代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class IOServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8000);
new Thread(()->{ while (true) { try { Socket socket = serverSocket.accept();
new Thread(()->{ try { int len; byte[] data = new byte[1024]; InputStream inputStream = socket.getInputStream(); while((len = inputStream.read(data)) != -1) { System.out.println(new String(data, 0, len)); } } catch (IOException e) { throw new RuntimeException(e); } }).start(); } catch (IOException e) { throw new RuntimeException(e); } } }).start(); } }
|
客户端的实现相对简单,只要连接上服务端的端口重复发送数据即可。
客户端代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class IOClient { public static void main(String[] args) { new Thread(()->{ try { Socket socket = new Socket("127.0.0.1", 8000); while(true) { try { socket.getOutputStream().write((new Date()+":hello world").getBytes()); Thread.sleep(2000); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } } }catch (IOException e) { throw new RuntimeException(e); } }).start(); } }
|
每一个连接创建成功后都需要一个线程来维护,一旦客户端数量多起来,就会导致线程资源受限,同一时刻有大量线程阻塞,造成严重资源浪费。其次,线程的频繁切换也会带来极大的开销,同时,数据读写也是以字节流为单位。
NIO
为了解决传统 IO 的问题,JDK 在 1.4 版本之后提出了 NIO。
在 NIO 模型中,新来的连接不再需要创建一个新线程,而是可以把这个连接直接绑定到某个固定线程,然后这个连接的所有读写都由这个线程来完成。NIO 模型中的 Selector 组件在这样过程起着重要作用。一个连接来了之后,不会创建一个 while 死循环去监听时候有数据可读,而是直接把这条连接注册到 Selector 上。然后,通过检查 Selector,就可以批量监测出有数据可读写的连接,进而读取数据。
这样,在 NIO 模型中的线程数量也大大降低,减少了线程切换的开销。同时,NIO 的读写面向的是 Buffer 而不是字节流,可以读取里面任何字节数据,不需要自己缓存数据,只需移动指针即可。
在 NIO 模型中通常会有两个线程,每个线程都绑定一个 Selector。一个负责轮询是否有新连接,另一个负责轮询连接是否有数据可读。服务端在监测到新连接之后,不在创建一个新线程,而是直接将连接绑定到负责轮询数据的 Selector 上。
原生 NIO 的代码编写太过复杂,jdk 也没实现比较适合 NIO 的一个线程模型,甚至拆包也要自己编写,还有空轮询导致 cpu 占有率飙升的 bug,故不太建议使用原生 NIO 进行网络编程。
Netty
Netty 就是封装了 JDK 的 NIO,让使用 NIO 编程更为方便。Netty 底层的 IO 模型可以随意切换,只需要小小的改动一下参数,Netty 就可以从 NIO 编程 IO。Netty 自带拆包粘包、异常检测等机制,使得开发者可以将更多精力放在业务逻辑上。Netty 也解决了 JDK 很多包括空轮询在内的 Bug。Netty 底层对线程、Selector 做了很多细小的优化、,精心设计的 Reactor 线程模型可以做到非常高效的并发处理,还自带各种协议栈,通用协议几乎不需要亲自动手。
使用 Netty 需要先导入 jar 包。这里使用 Maven 导入。
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.101.Final</version> </dependency>
|
Netty 实现上述 Server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class NettyServer { public static void main(String[] args) { ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup();
bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); } }).bind(8000); } }
|
Netty 客户端实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class NettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new StringEncoder()); } }); Channel channel = bootstrap.connect("127.0.0.1", 8000).channel(); while(true) { channel.writeAndFlush(new Date()+":hello world"); Thread.sleep(2000); } } }
|
服务端启动流程
服务端启动最小化代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class NettyServer { public static void main(String[] args) { ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup();
bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }); } }).bind(8000); } }
|
首先创建了两个 NioEventLoopGroup,这两个对象可以看作传统 IO 编程模型的两大线程组,boss 表示监听端口,接收新连接;worker 表示处理每一组连接的数据读写的线程组。其次创建了一个引导类 ServerBootstrap,引导服务端的启动工作。通过.group(boss, worker)给引导类配置两大线程组,这个引导类的模型就定型了。再通过.channel(NioServerSocketChannel.class)指定服务端的 IO 模型为 NIO。接着调用 childHandler()方法,给引导类创建一个 ChannelInitializer 里面有一个泛型参数 NioSocketChannel,这个就对应 BIO 中的 Socket。配置好了这三样就完成了一个最小的 Netty 服务端,最后通过.bind()绑定一个本地端口启动服务端。
自动绑定递增端口
若当前端口不可用,为了正常启动需要找到一个可用的端口,如端口 8000 不可用,就继续尝试 8001,8002,直到绑定成功。bootstrap.bind(8000)是一个异步方法,调用之后是立即返回的,它的返回值是一个 ChannelFuture。可以通过给这个 ChannelFuture 添加一个监听器 GenericFutureLitstener 去监听端口是否绑定成功。
1 2 3 4 5 6 7 8 9 10 11 12 13
| private static void bind(final ServerBootstrap bootstrap, final int port) { bootstrap.bind(port).addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isSuccess()) { System.out.println("端口"+port+"绑定成功"); } else { System.out.println("端口"+port+"绑定失败"); bind(bootstrap, port + 1); } } }); }
|
服务端启动的其他方法
handler()
childHandler()方法用于指定处理新连接数据的读写逻辑;handler()方法用于指定在服务端启动过程中的一些逻辑,一般不使用这个方法。
attr()
1
| bootstrap.arrt(AttributeKey.newInstance("serverName"), "NettyServer");
|
attr()方法可以给服务端 Channel,也就是 NioServerSocketChannel 指定一些自定义属性,然后通过 channel.attr()取出这个属性,一般不使用这个方法。上述代码的意思就是指定 channel 的 serverName 属性,属性值为 NettyServer。实际上就是维护了一个 Map。
childAttr()
1
| bootstrap.childAttr(AttributeKey.newInstance("clientKey"), "NettyClient")
|
childAttr()方法可以给每一个连接自定义属性,可以通过 channel.attr()方法取出。
option()
option()方法可以给服务端 channel 设置一些 TCP 参数,最常用的就是 so_backlog。
1
| bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
|
这个设置表示用于临时存放已完成三次握手的请求队列的最大长度,如果连接建立频繁可以适当调大这个参数。
childOption()
可以给每一个连接设置一些 TCP 参数
1 2
| bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true)
|
- ChannelOption.SO_KEEPALIVE 表示是否开启 TCP 底层心跳机制。
- ChannelOption.TCP_NODELAY 表示是否开启 Nagle 算法。通常如果要求高实时性就设置为 true 表示关闭;如果要减少发送次数、减少网络交互就设置为 false,表示开启。
客户端启动流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class NettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception {
} }); bootstrap.connect("juejin.com", 80).addListener(future -> { if(future.isSuccess()) { System.out.println("连接成功"); } else { System.out.println("连接失败"); } }); } }
|
失败重连
在网络情况较差的情况下,客户端第一次连接可能会失败,这时可能会需要尝试重连
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void connect(Bootstrap bootstrap, String host, int port, int retry) { bootstrap.connect(host, port).addListener(future -> { if(future.isSuccess()) { System.out.println("连接成功"); } else if(retry == 0) { System.out.println("连接失败"); } else { int order = 16 - retry + 1; int delay = 1 << order, 10 ; System.out.println(new Date() + ":连接失败,第" + order + "次重连"); bootstrap.config().group().schedule(()->connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS); } }); }
|
实现定时任务调用的是 bootstrap.config().group().schedule(),其中 bootstrap.config()返回的是 BootstrapConfig,它是对 Bootstrap 配置参数的抽象,bootstrap.config().group()返回的就是前面给客户端配置的线程 group,通过调用 group 的 schedule 方法即可实现定时任务。
客户端启动的其他放啊
attr()
给客户端 channel 自定义绑定属性,然后通过 channel.attr() 取出。
option()
设置一些 TCP 参数。
客户端与服务端双向通信
客户端发送数据到服务端
客户端的数据读写逻辑是通过 bootstrap.handler()指定的。
添加一个逻辑处理器,负责向服务端发送数据
1 2 3 4 5 6
| .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new FirstClientHandler()); } });
|
- socketChannel.pipeline()返回的是和这条连接相关的逻辑处理链,采用了责任链模式。
- 调用 addLast()方法添加一个逻辑处理器,这个处理器负责在连接建立后向服务端发送数据。
处理器实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class FirstClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println(new Date() + ":客户端写出数据");
ByteBuf byteBuf = getByteBuf(ctx);
ctx.channel().writeAndFlush(byteBuf); }
private ByteBuf getByteBuf(ChannelHandlerContext ctx) { ByteBuf byteBuf = ctx.alloc().buffer();
byte[] data = "Hello,nya!".getBytes(StandardCharsets.UTF_8);
byteBuf.writeBytes(data); return byteBuf; } }
|
- 处理器通过继承 ChannelInboundHandlerAdapter,覆盖 channelActive()方法,这个方法会在客户端连接建立成功之后被调用。
- 连接建立成功之后需要通过 Bytebuf 发送数据。
- 通过 ctx.alloc 获得一个 Bytebuf 内存管理器,然后把数据的二进制字节填入 Bytebuf,再通过 ctx.channel().writeAndFlush(byteBuf)把数据写道服务端。
服务端读取客户端数据
服务端的数据读写逻辑是在 ServerBootstrap 的.childHandler()方法指定的。
1 2 3 4 5 6 7 8
| bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new FirstServerHandler()) } });
|
处理器实现
1 2 3 4 5 6 7 8
| public class FirstServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ":服务端收到的数据--->"+byteBuf.toString(StandardCharsets.UTF_8)); } }
|
与客户端不同的是,channelRead()方法再接收到客户端发来的数据之后被回调。
至于为什么要强转 msg,之后会进行详细分析。
服务端返回数据到客户端
一样是通过 ctx.channel().writeAndFlush(byteBuf)的方式来进行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class FirstServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(new Date() + ":服务端写出数据"); ByteBuf out = getBytebuf(ctx); ctx.channel().writeAndFlush(out); }
private ByteBuf getBytebuf(ChannelHandlerContext ctx) { byte[] data = "你好,欢迎来到Ave Mujica的世界".getBytes(StandardCharsets.UTF_8); ByteBuf byteBuf = ctx.alloc().buffer(); byteBuf.writeBytes(data); return byteBuf; } }
|
客户端也需要重写 channelRead()来接收服务端返回的数据
1 2 3 4 5 6 7 8 9 10
| public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ":客户端收到的响应结果--->"+byteBuf.toString(StandardCharsets.UTF_8)); } }
|
数据载体 Bytebuf
Bytebuf 的结构
Bytebuf 是一个字节容器,容器里面的数据分为三部分,第一部分是已经丢弃的字节,第二部分是可读字节,第三部分是可写字节。这三个部分通过读写指针划分出来,还有一个 capacity 变量表示 Bytebuf 的总容量。每从 Byetbuf 中读取一字节,读指针(readerIndex)自增 1,Bytebuf 里总共有 writerIndex - readerIndex 字节可读。写数据则是从 writerIndex 所指位置开始写,每写一字节,写指针(writerIndex)自增 1,知道 writerIndex = capacity。
容量 API
- capacity():表示 Bytebuf 底层占用了多少字节的内存。
- maxCapacity():表示 Bytebuf 底层最多能占用多少字节。向 Bytebuf 写数据是,若发现 capacity 不足可以进行扩容,只知道 capacity 达到 maxCapacity,超过这个数就抛出异常。
- readableBytes()与 isReadable():前者表示 Bytebuf 当前可读的字节数,即 writerIndex - readerIndex,若为 0 则不可读,isReadable()返回 false。
- writableBytes()、isWritable()、maxWritableByte():和读函数类似,writableBytes()返回的是 capacity - writerIndex,若为 0 则不可写,isWritable 返回 false,这是 Netty 会对 Bytebuf 进行扩容,直至内存大小为 maxCapacity,maxWritableByte 就是计算最多还能写多少数据。
读写指针 API
- readerIndex()和 readerIndex(int):前者获得读指针,后者设置读指针。
- writeIndex()和 writeIndex(int):前者获得写指针,后者设置写指针。
- markReaderIndex()和 resetReaderIndex():前者表示把当前的读指针保存起来,后者表示把读指针恢复到之前保存的值。
- markWriteIndex()和 resetWriteIndex():与第三条类似,是对写指针的操作。
如果可以,尽量使用后两种 API,这样不需要定义变量。
读写 API
writeBytes(byte[] src)和 buffer.readBytes(byte[] dst)
writeBytes()表示把字节数组 src 里的说有数据全部写入 Bytebuf,而 readBytes 表示把 Bytebuf 里的全部数据读到字节数组 dst 中。
writeByte(byte b)和 buffer.readByte();
writeByte()表示向 Bytebuf 里写入一字节,readByte()表示读一字节。对于不同的数据类型都有类似的 API,如 writeInt()、writeLong()、writeBoolean()等。
release()和 retain()
由于 Netty 使用了堆外内存,而堆外内存是不被 JVM 直接管理的。也就是说,申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。这有点类似于 C 语言里,申请到的内存必须手工释放,否则会造成内存泄漏。
Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,则需要回收底层内存。在默认情况下,当创建完一个 ByteBuf 时,它的引用为 1,然后每次调用 retain()方法,它的引用就加一,release()方法的原理是将引用计数减一,减完之后如果发现引用计数为 0,则直接回收 ByteBuf 底层的内存。
slice()、duplicate()、copy()
- slice()方法从原始 ByteBuf 中截取一段,这段数据是从 readerIndex 到 writeIndex 的,同时,返回的新的 ByteBuf 的最大容量 maxCapacity 为原始 ByteBuf 的 readableBytes()。
- duplicate()方法把整个 ByteBuf 都截取出来,包括所有的数据、指针信息。
- slice()方法与 duplicate()方法的相同点是:底层内存及引用计数与原始 ByteBuf 共享,也就是说,经过 slice()方法或者 duplicate()方法返回的 ByteBuf 调用 write 系列方法都会影响到原始 ByteBuf,但是它们都维持着与原始 ByteBuf 相同的内存引用计数和不同的读写指针。
- slice()方法与 duplicate()方法的不同点就是:slice()方法只截取从 readerIndex 到 writerIndex 之间的数据,它返回的 ByteBuf 的最大容量被限制到原始 ByteBuf 的 readableBytes(),而 duplicate()方法是把整个 ByteBuf 都与原始 ByteBuf 共享。
- slice()方法与 duplicate()方法不会复制数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法 copy()会直接从原始 ByteBuf 中复制所有的信息,包括读写指针及底层对应的数据,因此,往 copy()方法返回的 ByteBuf 中写数据不会影响原始 ByteBuf。
- slice()方法和 duplicate()方法不会改变 ByteBuf 的引用计数,所以原始 ByteBuf 调用 release()方法之后发现引用计数为零,就开始释放内存,调用这两个方法返回的 ByteBuf 也会被释放。这时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次 retain()方法来增加引用,表示它们对应的底层内存多了一次引用,引用计数为 2。在释放内存的时候,需要调用两次 release()方法,将引用计数降到零,才会释放内存。
- 这三个方法均维护着自己的读写指针,与原始 ByteBuf 的读写指针无关,相互之间不受影响。
在使用 slice()、duplicate()的时候要理清内存共享、引用计数共享、读写指针不共享。
展示两个常见错误
(1)多次释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Buffer buffer = xxx; doWith(buffer);
buffer.release();
public void doWith(Bytebuf buffer) {
Buffer slice = buffer.slice(); foo(slice); }
public foo(Bytebuf buffer) {
buffer.release(); }
|
(2)不释放造成内存泄漏
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Buffer buffer = xxx; doWith(buffer);
buffer.release();
public void doWith(Bytebuf buffer) {
Buffer slice = buffer.reatainedSlice(); foo(slice); }
public foo(Bytebuf buffer) { }
|
Demo 演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class BytebufTest { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100); print("allocate ByteBuf(9, 100)", buffer); buffer.writeBytes(new byte[] {1, 2, 3, 4}); print("writeBytes(1,2,3,4)", buffer); buffer.writeInt(12); print("writeInt(12)", buffer); buffer.writeBytes(new byte[]{5}); print("writeBytes(5)", buffer); buffer.writeBytes(new byte[]{6}); print("writeBytes(6)", buffer); System.out.println("getByte(3) return:" + buffer.getByte(3)); System.out.println("getShort(3) return:" + buffer.getShort(3)); System.out.println("getInt(3) return:" + buffer.getInt(3)); print("getByte()", buffer); buffer.setByte(buffer.readableBytes() + 1, 0); print("setByte()", buffer); byte[] dst = new byte [buffer.readableBytes()]; buffer.readBytes(dst); print("readBytes(" + dst.length + ")", buffer); } private static void print(String action, ByteBuf buffer) { System.out.println("after ===========" + action + "============"); System.out.println("capacity():" + buffer.capacity()); System.out.println("maxCapacity():" + buffer.maxCapacity()); System.out.println("readerIndex():" + buffer.readerIndex()); System.out.println("readableBytes():" + buffer.readableBytes()); System.out.println("isReadable():" + buffer.isReadable()); System.out.println("writerIndex():" + buffer.writerIndex()); System.out.println("writableBytes():" + buffer.writableBytes()); System.out.println("isWritable():" + buffer.isWritable()); System.out.println("maxWritableBytes():" + buffer.maxWritableBytes()); System.out.println(); } }
|
通信协议编解码
无论使用 Netty 还是原始的 Socket 编程,基于 TCP 通信的数据包格式都是二进制字节流,协议就是客户端与服务端实现商量好多每个数据包中,每一段二进制字节分别代表什么含意。
协议设计
- 魔数,4 字节。为了避免所有数据包都按照约定的协议进行处理导致其他协议失效。
- 版本号,1 字节。通常为预留字段,应对版本升级。
- 序列化算法,1 字节。
- 指令,1 字节。表示数据是做什么的。
- 数据长度,4 字节。
- 后续字节为数据部分。
协议实现
协议中任何一种报文都有版本号和获取指令的方法,提取抽象成一个公共类。
1 2 3 4 5 6 7 8 9 10 11 12
| @Data public abstract class Packet {
private Byte version = 1;
public abstract Byte getCommand(); }
|
以登录请求为例,定义登录请求的数据包。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public interface Command { Byte LOGIN_REQUEST = 1; }
@Data public class LoginRequest extends Packet{ private String userId; private String username; private String password; @Override public Byte getCommand() { return Command.LOGIN_REQUEST; } }
|
序列化接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public interface Serializer {
byte serializer = 1;
Serializer DEFAULT = new JSONSerializer();
byte getSerializerAlgorithm();
byte[] serialize(Object obj);
<T> T deserialize(Class<T> clazz, byte[] data); }
public interface SerializerAlgorithm {
Byte JSON = 1; }
|
JSON 序列化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class JSONSerializer implements Serializer{ @Override public byte getSerializerAlgorithm() { return SerializerAlgorithm.JSON; }
@Override public byte[] serialize(Object obj) { return JSON.toJSONBytes(obj); }
@Override public <T> T deserialize(Class<T> clazz, byte[] data) { return JSON.parseObject(data, clazz); } }
|
编解码在 PacketCodeC.java 中实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class PacketCodeC { private static final int MAGIC_NUMBER = 0x12345678;
public ByteBuf encode(Packet packet) { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); byte[] data = Serializer.DEFAULT.serialize(packet);
byteBuf.writeInt(MAGIC_NUMBER); byteBuf.writeByte(packet.getVersion()); byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); byteBuf.writeByte(packet.getCommand()); byteBuf.writeInt(data.length); byteBuf.writeBytes(data);
return byteBuf; }
public Packet decode(ByteBuf byteBuf) { byteBuf.skipBytes(4);
byteBuf.skipBytes(1);
byte serializerAlgorithm = byteBuf.readByte();
byte command = byteBuf.readByte();
int length = byteBuf.readInt(); byte[] data = new byte[length]; byteBuf.readBytes(data);
Class<? extends Packet> requestType = getRequestType(command); Serializer serializer = getSerializer(serializerAlgorithm); if(requestType != null && serializer != null) { return serializer.deserialize(requestType, data); } return null; }
private Serializer getSerializer(byte serializerAlgorithm) { if(serializerAlgorithm == SerializerAlgorithm.JSON) return Serializer.DEFAULT; return null; }
private Class<? extends Packet> getRequestType(byte command) { if(command == 1) return LoginRequest.class; return null; }
}
|
客户端登录
客户端会先构建一个登录请求对象,然后通过编码把请求对象编码为 ByteBuf,写到服务端。服务端接收到 ByteBuf 后,先对收到的 ByteBuf 解码为登录响应请求,进行校验。通过校验之后,构造一个响应对象,编码返回客户端。客户端解码,根据响应对象判断是否登录成功。
将客户端和服务端的 Handler 更换成 ClientHandler 和 ServerHandler,后续处理逻辑在这个两个类中实现。
1 2 3 4 5
| public class ClientHandler extends ChannelInboundHandlerAdapter { }
public class ServerHandler extends ChannelInboundHandlerAdapter { }
|
首先客户端创建请求并发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println(new Date() + ":客户端开始登录"); LoginRequest loginRequest = new LoginRequest(); loginRequest.setUserId(UUID.randomUUID().toString()); loginRequest.setUsername("soyo"); loginRequest.setPassword("123456");
ByteBuf byteBuf = PacketCodeC.getINSTANCE().encode(ctx.alloc(), loginRequest);
ctx.writeAndFlush(byteBuf); }
|
服务端接收,解码并处理请求,生成响应返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf); LoginResponse response = new LoginResponse(); if(packet instanceof LoginRequest) { LoginRequest request = (LoginRequest) packet; request.setVersion(request.getVersion()); if(vaild(request)) { System.out.println(new Date() + "校验成功"); response.setSuccess(true); } else{ System.out.println("校验失败"); response.setSuccess(false); response.setMsg("账号密码校验失败"); } ByteBuf responseBuf = PacketCodeC.getINSTANCE().encode(ctx.alloc(), response); ctx.writeAndFlush(responseBuf); } }
public boolean vaild(LoginRequest loginRequest) { return loginRequest.getUsername().equals("saki") && loginRequest.getPassword().equals("123456"); }
|
客户端处理响应
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf); if(packet instanceof LoginResponse) { LoginResponse response = (LoginResponse) packet; if(response.getSuccess()) { System.out.println(new Date() + ": 客户端登录成功"); } else { System.out.println(new Date() + ": 客户端登录失败-->" + response.getMsg()); } } }
|
客户端与服务端收发消息
定义消息对象
客户端发送给服务端的消息为定义为 MessageRequest。
1 2 3 4 5 6 7 8
| @Data public class MessageRequest extends Packet { private String msg; @Override public Byte getCommand() { return Command.MESSAGE_REQUEST; } }
|
设置指令 MESSAGE_REQUEST = 3。
服务端发送给客户端的消息为 MessageResponse。
1 2 3 4 5 6 7 8
| @Data public class MessageResponse extends Packet { private String msg; @Override public Byte getCommand() { return Command.MESSAGE_RESPONSE; } }
|
判断是否登录成功
客户端在发送消息之前需要登录,登录成功才能发消息。可以通过给 Channel 绑定属性来保存客户端的登录状态。
定义登录成功的标志位
1 2 3
| public interface Attributes { AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login"); }
|
修改 NettyClient 连接成功后启动应用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private static void connect(Bootstrap bootstrap, String host, int port, int retry) { bootstrap.connect(host, port).addListener(future -> { if(future.isSuccess()) { System.out.println("连接成功"); Channel channel = ((ChannelFuture)future).channel(); starConsoleThread(channel); } }); }
private static void starConsoleThread(Channel channel) { new Thread(()->{ while(!Thread.interrupted()) { if(LoginUtil.hasLogin(channel)) { System.out.println("输入消息发送至服务端"); Scanner sc = new Scanner(System.in); String line = sc.nextLine(); MessageRequest request = new MessageRequest(); request.setMsg(line); ByteBuf byteBuf = PacketCodeC.getINSTANCE().encode(channel.alloc(), request); channel.writeAndFlush(byteBuf); } } }).start(); }
|
服务端收发消息
用 if 分支区分 packet 是 Login 相关还是 Message 相关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf); if(packet instanceof LoginRequest) { } else if(packet instanceof MessageRequest) { MessageRequest request = (MessageRequest) packet; System.out.println(new Date() + ": 收到客户端消息-->"+ request.getMsg()); MessageResponse response = new MessageResponse(); response.setMsg("服务端回复:"+request.getMsg()); ByteBuf responseBuf = PacketCodeC.getINSTANCE().encode(ctx.alloc(), response); ctx.writeAndFlush(responseBuf); } } }
|
客户端接收消息
和服务端类似读处理方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf); if(packet instanceof LoginResponse) { } else if(packet instanceof MessageResponse) { MessageResponse response = (MessageResponse) packet; System.out.println(new Date() + ": 收到服务端消息-->" + response.getMsg()); } } }
|
Pipline 和 ChannelHandler
随着指令的增多,对指令处理的 if-else 不可避免的会泛滥,同时一端的所有逻辑都写在一个类里,最后也会变得十分臃肿。此外,每次发送协议报文时都需要手动调用编码器。Netty 中的 Pipeline 和 ChannelHandler 就是解决这个问题的。它通过责任链模式组织代码逻辑,并且支持逻辑的动态添加和删除,Netty 能够支持各类协议的扩展,如 HTTP、WebSocket 等,靠的就是 Pipline 和 ChannelHandler。
Pipline 和 ChannelHandler 的构成