本篇文章为《跟着闪电侠学 netty》的学习记录。

Netty 是什么

传统 IO/阻塞 IO

先来看看传统的 IO 编程。

实现一下一个场景,客户端每个两秒发送一个带有时间戳的“hello,world”给服务端,服务端接收并打印。

服务端首先要创建一个 ServerSocket 来监听 8000 端口,然后创建一个新线程,线程不断调用
ServerSocket 中的 accept 方法获取新连接,获得新连接后再为每个连接创建一个线程,这个线程负责读数据。

服务端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class IOServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8000);

// 接收新连接线程
new Thread(()->{
while (true) {
try {
// 1 阻塞方法获取新连接
Socket socket = serverSocket.accept();

// 2 为每个新连接创建一个新线程,负责读取数据
new Thread(()->{
try {
int len;
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while((len = inputStream.read(data)) != -1) {
System.out.println(new String(data, 0, len));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}

客户端的实现相对简单,只要连接上服务端的端口重复发送数据即可。

客户端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class IOClient {
public static void main(String[] args) {
new Thread(()->{
try {
Socket socket = new Socket("127.0.0.1", 8000);
while(true) {
try {
socket.getOutputStream().write((new Date()+":hello world").getBytes());
Thread.sleep(2000);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
}
}

每一个连接创建成功后都需要一个线程来维护,一旦客户端数量多起来,就会导致线程资源受限,同一时刻有大量线程阻塞,造成严重资源浪费。其次,线程的频繁切换也会带来极大的开销,同时,数据读写也是以字节流为单位。

NIO

为了解决传统 IO 的问题,JDK 在 1.4 版本之后提出了 NIO。

在 NIO 模型中,新来的连接不再需要创建一个新线程,而是可以把这个连接直接绑定到某个固定线程,然后这个连接的所有读写都由这个线程来完成。NIO 模型中的 Selector 组件在这样过程起着重要作用。一个连接来了之后,不会创建一个 while 死循环去监听时候有数据可读,而是直接把这条连接注册到 Selector 上。然后,通过检查 Selector,就可以批量监测出有数据可读写的连接,进而读取数据。

这样,在 NIO 模型中的线程数量也大大降低,减少了线程切换的开销。同时,NIO 的读写面向的是 Buffer 而不是字节流,可以读取里面任何字节数据,不需要自己缓存数据,只需移动指针即可。

在 NIO 模型中通常会有两个线程,每个线程都绑定一个 Selector。一个负责轮询是否有新连接,另一个负责轮询连接是否有数据可读。服务端在监测到新连接之后,不在创建一个新线程,而是直接将连接绑定到负责轮询数据的 Selector 上。

原生 NIO 的代码编写太过复杂,jdk 也没实现比较适合 NIO 的一个线程模型,甚至拆包也要自己编写,还有空轮询导致 cpu 占有率飙升的 bug,故不太建议使用原生 NIO 进行网络编程。

Netty

Netty 就是封装了 JDK 的 NIO,让使用 NIO 编程更为方便。Netty 底层的 IO 模型可以随意切换,只需要小小的改动一下参数,Netty 就可以从 NIO 编程 IO。Netty 自带拆包粘包、异常检测等机制,使得开发者可以将更多精力放在业务逻辑上。Netty 也解决了 JDK 很多包括空轮询在内的 Bug。Netty 底层对线程、Selector 做了很多细小的优化、,精心设计的 Reactor 线程模型可以做到非常高效的并发处理,还自带各种协议栈,通用协议几乎不需要亲自动手。

使用 Netty 需要先导入 jar 包。这里使用 Maven 导入。

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.101.Final</version>
</dependency>

Netty 实现上述 Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NettyServer {
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap();

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();

bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {

@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
}).bind(8000);
}
}

Netty 客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while(true) {
channel.writeAndFlush(new Date()+":hello world");
Thread.sleep(2000);
}
}
}

服务端启动流程

服务端启动最小化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NettyServer {
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap();

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();

bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {

@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
}).bind(8000);
}
}

首先创建了两个 NioEventLoopGroup,这两个对象可以看作传统 IO 编程模型的两大线程组,boss 表示监听端口,接收新连接;worker 表示处理每一组连接的数据读写的线程组。其次创建了一个引导类 ServerBootstrap,引导服务端的启动工作。通过.group(boss, worker)给引导类配置两大线程组,这个引导类的模型就定型了。再通过.channel(NioServerSocketChannel.class)指定服务端的 IO 模型为 NIO。接着调用 childHandler()方法,给引导类创建一个 ChannelInitializer 里面有一个泛型参数 NioSocketChannel,这个就对应 BIO 中的 Socket。配置好了这三样就完成了一个最小的 Netty 服务端,最后通过.bind()绑定一个本地端口启动服务端。

自动绑定递增端口

若当前端口不可用,为了正常启动需要找到一个可用的端口,如端口 8000 不可用,就继续尝试 8001,8002,直到绑定成功。bootstrap.bind(8000)是一个异步方法,调用之后是立即返回的,它的返回值是一个 ChannelFuture。可以通过给这个 ChannelFuture 添加一个监听器 GenericFutureLitstener 去监听端口是否绑定成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void bind(final ServerBootstrap bootstrap, final int port) {
bootstrap.bind(port).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if(future.isSuccess()) {
System.out.println("端口"+port+"绑定成功");
} else {
System.out.println("端口"+port+"绑定失败");
bind(bootstrap, port + 1);
}
}
});
}

服务端启动的其他方法

handler()

childHandler()方法用于指定处理新连接数据的读写逻辑;handler()方法用于指定在服务端启动过程中的一些逻辑,一般不使用这个方法。

attr()

1
bootstrap.arrt(AttributeKey.newInstance("serverName"), "NettyServer");

attr()方法可以给服务端 Channel,也就是 NioServerSocketChannel 指定一些自定义属性,然后通过 channel.attr()取出这个属性,一般不使用这个方法。上述代码的意思就是指定 channel 的 serverName 属性,属性值为 NettyServer。实际上就是维护了一个 Map。

childAttr()

1
bootstrap.childAttr(AttributeKey.newInstance("clientKey"), "NettyClient")

childAttr()方法可以给每一个连接自定义属性,可以通过 channel.attr()方法取出。

option()

option()方法可以给服务端 channel 设置一些 TCP 参数,最常用的就是 so_backlog。

1
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);

这个设置表示用于临时存放已完成三次握手的请求队列的最大长度,如果连接建立频繁可以适当调大这个参数。

childOption()

可以给每一个连接设置一些 TCP 参数

1
2
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
  • ChannelOption.SO_KEEPALIVE 表示是否开启 TCP 底层心跳机制。
  • ChannelOption.TCP_NODELAY 表示是否开启 Nagle 算法。通常如果要求高实时性就设置为 true 表示关闭;如果要减少发送次数、减少网络交互就设置为 false,表示开启。

客户端启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
// 指定线程模型
bootstrap.group(group)
// 指定IO类型为NIO
.channel(NioSocketChannel.class)
// IO逻辑处理
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

}
});
bootstrap.connect("juejin.com", 80).addListener(future -> {
if(future.isSuccess()) {
System.out.println("连接成功");
} else {
System.out.println("连接失败");
}
});
}
}

失败重连

在网络情况较差的情况下,客户端第一次连接可能会失败,这时可能会需要尝试重连

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if(future.isSuccess()) {
System.out.println("连接成功");
} else if(retry == 0) {
System.out.println("连接失败");
} else {
// 假设最多重连10次
int order = 16 - retry + 1;
// 重试的时间间隔
int delay = 1 << order, 10 ;
System.out.println(new Date() + ":连接失败,第" + order + "次重连");
bootstrap.config().group().schedule(()->connect(bootstrap, host, port, retry - 1),
delay, TimeUnit.SECONDS);
}
});
}

实现定时任务调用的是 bootstrap.config().group().schedule(),其中 bootstrap.config()返回的是 BootstrapConfig,它是对 Bootstrap 配置参数的抽象,bootstrap.config().group()返回的就是前面给客户端配置的线程 group,通过调用 group 的 schedule 方法即可实现定时任务。

客户端启动的其他放啊

attr()

给客户端 channel 自定义绑定属性,然后通过 channel.attr() 取出。

option()

设置一些 TCP 参数。

客户端与服务端双向通信

客户端发送数据到服务端

客户端的数据读写逻辑是通过 bootstrap.handler()指定的。

添加一个逻辑处理器,负责向服务端发送数据

1
2
3
4
5
6
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FirstClientHandler());
}
});
  • socketChannel.pipeline()返回的是和这条连接相关的逻辑处理链,采用了责任链模式。
  • 调用 addLast()方法添加一个逻辑处理器,这个处理器负责在连接建立后向服务端发送数据。

处理器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ":客户端写出数据");

// 获取数据
ByteBuf byteBuf = getByteBuf(ctx);

// 写数据
ctx.channel().writeAndFlush(byteBuf);
}

private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 获取二进制抽象bytebuf
ByteBuf byteBuf = ctx.alloc().buffer();

// 准备数据
byte[] data = "Hello,nya!".getBytes(StandardCharsets.UTF_8);

// 将数据填充进 bytebuf
byteBuf.writeBytes(data);
return byteBuf;
}
}
  • 处理器通过继承 ChannelInboundHandlerAdapter,覆盖 channelActive()方法,这个方法会在客户端连接建立成功之后被调用。
  • 连接建立成功之后需要通过 Bytebuf 发送数据。
  • 通过 ctx.alloc 获得一个 Bytebuf 内存管理器,然后把数据的二进制字节填入 Bytebuf,再通过 ctx.channel().writeAndFlush(byteBuf)把数据写道服务端。

服务端读取客户端数据

服务端的数据读写逻辑是在 ServerBootstrap 的.childHandler()方法指定的。

1
2
3
4
5
6
7
8
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new FirstServerHandler())
}
});

处理器实现

1
2
3
4
5
6
7
8
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

System.out.println(new Date() + ":服务端收到的数据--->"+byteBuf.toString(StandardCharsets.UTF_8));
}
}

与客户端不同的是,channelRead()方法再接收到客户端发来的数据之后被回调。

至于为什么要强转 msg,之后会进行详细分析。

服务端返回数据到客户端

一样是通过 ctx.channel().writeAndFlush(byteBuf)的方式来进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 省略接收逻辑

// 返回数据到客户端
System.out.println(new Date() + ":服务端写出数据");
ByteBuf out = getBytebuf(ctx);
ctx.channel().writeAndFlush(out);
}

private ByteBuf getBytebuf(ChannelHandlerContext ctx) {
byte[] data = "你好,欢迎来到Ave Mujica的世界".getBytes(StandardCharsets.UTF_8);
ByteBuf byteBuf = ctx.alloc().buffer();
byteBuf.writeBytes(data);
return byteBuf;
}
}

客户端也需要重写 channelRead()来接收服务端返回的数据

1
2
3
4
5
6
7
8
9
10
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
// 发送逻辑省略

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

System.out.println(new Date() + ":客户端收到的响应结果--->"+byteBuf.toString(StandardCharsets.UTF_8));
}
}

数据载体 Bytebuf

Bytebuf 的结构

Bytebuf 是一个字节容器,容器里面的数据分为三部分,第一部分是已经丢弃的字节,第二部分是可读字节,第三部分是可写字节。这三个部分通过读写指针划分出来,还有一个 capacity 变量表示 Bytebuf 的总容量。每从 Byetbuf 中读取一字节,读指针(readerIndex)自增 1,Bytebuf 里总共有 writerIndex - readerIndex 字节可读。写数据则是从 writerIndex 所指位置开始写,每写一字节,写指针(writerIndex)自增 1,知道 writerIndex = capacity。

容量 API

  • capacity():表示 Bytebuf 底层占用了多少字节的内存。
  • maxCapacity():表示 Bytebuf 底层最多能占用多少字节。向 Bytebuf 写数据是,若发现 capacity 不足可以进行扩容,只知道 capacity 达到 maxCapacity,超过这个数就抛出异常。
  • readableBytes()与 isReadable():前者表示 Bytebuf 当前可读的字节数,即 writerIndex - readerIndex,若为 0 则不可读,isReadable()返回 false。
  • writableBytes()、isWritable()、maxWritableByte():和读函数类似,writableBytes()返回的是 capacity - writerIndex,若为 0 则不可写,isWritable 返回 false,这是 Netty 会对 Bytebuf 进行扩容,直至内存大小为 maxCapacity,maxWritableByte 就是计算最多还能写多少数据。

读写指针 API

  • readerIndex()和 readerIndex(int):前者获得读指针,后者设置读指针。
  • writeIndex()和 writeIndex(int):前者获得写指针,后者设置写指针。
  • markReaderIndex()和 resetReaderIndex():前者表示把当前的读指针保存起来,后者表示把读指针恢复到之前保存的值。
  • markWriteIndex()和 resetWriteIndex():与第三条类似,是对写指针的操作。

如果可以,尽量使用后两种 API,这样不需要定义变量。

读写 API

writeBytes(byte[] src)和 buffer.readBytes(byte[] dst)

writeBytes()表示把字节数组 src 里的说有数据全部写入 Bytebuf,而 readBytes 表示把 Bytebuf 里的全部数据读到字节数组 dst 中。

writeByte(byte b)和 buffer.readByte();

writeByte()表示向 Bytebuf 里写入一字节,readByte()表示读一字节。对于不同的数据类型都有类似的 API,如 writeInt()、writeLong()、writeBoolean()等。

release()和 retain()

由于 Netty 使用了堆外内存,而堆外内存是不被 JVM 直接管理的。也就是说,申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。这有点类似于 C 语言里,申请到的内存必须手工释放,否则会造成内存泄漏。

Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,则需要回收底层内存。在默认情况下,当创建完一个 ByteBuf 时,它的引用为 1,然后每次调用 retain()方法,它的引用就加一,release()方法的原理是将引用计数减一,减完之后如果发现引用计数为 0,则直接回收 ByteBuf 底层的内存。

slice()、duplicate()、copy()

  1. slice()方法从原始 ByteBuf 中截取一段,这段数据是从 readerIndex 到 writeIndex 的,同时,返回的新的 ByteBuf 的最大容量 maxCapacity 为原始 ByteBuf 的 readableBytes()。
  2. duplicate()方法把整个 ByteBuf 都截取出来,包括所有的数据、指针信息。
  3. slice()方法与 duplicate()方法的相同点是:底层内存及引用计数与原始 ByteBuf 共享,也就是说,经过 slice()方法或者 duplicate()方法返回的 ByteBuf 调用 write 系列方法都会影响到原始 ByteBuf,但是它们都维持着与原始 ByteBuf 相同的内存引用计数和不同的读写指针。
  4. slice()方法与 duplicate()方法的不同点就是:slice()方法只截取从 readerIndex 到 writerIndex 之间的数据,它返回的 ByteBuf 的最大容量被限制到原始 ByteBuf 的 readableBytes(),而 duplicate()方法是把整个 ByteBuf 都与原始 ByteBuf 共享。
  5. slice()方法与 duplicate()方法不会复制数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法 copy()会直接从原始 ByteBuf 中复制所有的信息,包括读写指针及底层对应的数据,因此,往 copy()方法返回的 ByteBuf 中写数据不会影响原始 ByteBuf。
  6. slice()方法和 duplicate()方法不会改变 ByteBuf 的引用计数,所以原始 ByteBuf 调用 release()方法之后发现引用计数为零,就开始释放内存,调用这两个方法返回的 ByteBuf 也会被释放。这时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次 retain()方法来增加引用,表示它们对应的底层内存多了一次引用,引用计数为 2。在释放内存的时候,需要调用两次 release()方法,将引用计数降到零,才会释放内存。
  7. 这三个方法均维护着自己的读写指针,与原始 ByteBuf 的读写指针无关,相互之间不受影响。

在使用 slice()、duplicate()的时候要理清内存共享、引用计数共享、读写指针不共享。

展示两个常见错误

(1)多次释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Buffer buffer = xxx;
doWith(buffer); // 也可以是Netty的回调方法,这里只是作为例子展示
// 一次释放
buffer.release();

public void doWith(Bytebuf buffer) {
//...

// 没有增加引用计数
Buffer slice = buffer.slice();
foo(slice);
}

public foo(Bytebuf buffer) {
// 处理数据...

// 重复释放
buffer.release();
}

(2)不释放造成内存泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Buffer buffer = xxx;
doWith(buffer); // 也可以是Netty的回调方法,这里只是作为例子展示
// 引用计数为2却只释放了一次
buffer.release();

public void doWith(Bytebuf buffer) {
//...

// 引用计数+1
Buffer slice = buffer.reatainedSlice();
foo(slice);
}

public foo(Bytebuf buffer) {
// 处理数据...
}

Demo 演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class BytebufTest  {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9, 100)", buffer);
// write方法改变写指针,写完之后写指针未到capacity的时候,buffer仍然可写
buffer.writeBytes(new byte[] {1, 2, 3, 4});
print("writeBytes(1,2,3,4)", buffer);
// write方法改变写指针,写完之后写指针未到capacity的时候,buffer仍然可写,写完int类型之后,写指针增加4
buffer.writeInt(12);
print("writeInt(12)", buffer);
// write方法改变写指针,写完之后写指针等于capacity的时候,buffer不可写
buffer.writeBytes(new byte[]{5});
print("writeBytes(5)", buffer);
// write方法改变写指针,写的时候发现buffer不可写则开始扩容,扩容之后capacity随即改变
buffer.writeBytes(new byte[]{6});
print("writeBytes(6)", buffer);
// get方法不改变读写指针
System.out.println("getByte(3) return:" + buffer.getByte(3));
System.out.println("getShort(3) return:" + buffer.getShort(3));
System.out.println("getInt(3) return:" + buffer.getInt(3));
print("getByte()", buffer);
// set方法不改变读写指针
buffer.setByte(buffer.readableBytes() + 1, 0);
print("setByte()", buffer);
// read方法改变读指针
byte[] dst = new byte [buffer.readableBytes()];
buffer.readBytes(dst);
print("readBytes(" + dst.length + ")", buffer);
}
private static void print(String action, ByteBuf buffer) {
System.out.println("after ===========" + action + "============");
System.out.println("capacity():" + buffer.capacity());
System.out.println("maxCapacity():" + buffer.maxCapacity());
System.out.println("readerIndex():" + buffer.readerIndex());
System.out.println("readableBytes():" + buffer.readableBytes());
System.out.println("isReadable():" + buffer.isReadable());
System.out.println("writerIndex():" + buffer.writerIndex());
System.out.println("writableBytes():" + buffer.writableBytes());
System.out.println("isWritable():" + buffer.isWritable());
System.out.println("maxWritableBytes():" + buffer.maxWritableBytes());
System.out.println();
}
}

通信协议编解码

无论使用 Netty 还是原始的 Socket 编程,基于 TCP 通信的数据包格式都是二进制字节流,协议就是客户端与服务端实现商量好多每个数据包中,每一段二进制字节分别代表什么含意。

协议设计

  1. 魔数,4 字节。为了避免所有数据包都按照约定的协议进行处理导致其他协议失效。
  2. 版本号,1 字节。通常为预留字段,应对版本升级。
  3. 序列化算法,1 字节。
  4. 指令,1 字节。表示数据是做什么的。
  5. 数据长度,4 字节。
  6. 后续字节为数据部分。

协议实现

协议中任何一种报文都有版本号和获取指令的方法,提取抽象成一个公共类。

1
2
3
4
5
6
7
8
9
10
11
12
@Data
public abstract class Packet {
/**
* 协议版本
*/
private Byte version = 1;

/**
* 指令
*/
public abstract Byte getCommand();
}

以登录请求为例,定义登录请求的数据包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Command {
Byte LOGIN_REQUEST = 1;
}

@Data
public class LoginRequest extends Packet{
private String userId;
private String username;
private String password;
@Override
public Byte getCommand() {
return Command.LOGIN_REQUEST;
}
}

序列化接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface Serializer {
/**
* JSON 序列化
*/
byte serializer = 1;

Serializer DEFAULT = new JSONSerializer();

/**
* 序列化算法
*/
byte getSerializerAlgorithm();

/**
* Java 对象转二进制
*/
byte[] serialize(Object obj);

/**
* 二进制转 java 对象
*/
<T> T deserialize(Class<T> clazz, byte[] data);
}

public interface SerializerAlgorithm {
/**
* JSON 序列化
*/
Byte JSON = 1;
}

JSON 序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class JSONSerializer implements Serializer{
@Override
public byte getSerializerAlgorithm() {
return SerializerAlgorithm.JSON;
}

@Override
public byte[] serialize(Object obj) {
return JSON.toJSONBytes(obj);
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] data) {
return JSON.parseObject(data, clazz);
}
}

编解码在 PacketCodeC.java 中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class PacketCodeC {
private static final int MAGIC_NUMBER = 0x12345678;

// 编码
public ByteBuf encode(Packet packet) {
// 创建 Bytebuf
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
// 序列化对象
byte[] data = Serializer.DEFAULT.serialize(packet);

// header
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(data.length);
// body
byteBuf.writeBytes(data);

return byteBuf;
}

// 解码
public Packet decode(ByteBuf byteBuf) {
// 按理应该检查魔数,这里业务单一就做简单的跳过处理
byteBuf.skipBytes(4);

// 跳过版本号
byteBuf.skipBytes(1);

// 序列化方式
byte serializerAlgorithm = byteBuf.readByte();

// 指令
byte command = byteBuf.readByte();

// 数据包长度
int length = byteBuf.readInt();
byte[] data = new byte[length];
byteBuf.readBytes(data);

// 反序列化
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializerAlgorithm);
if(requestType != null && serializer != null) {
return serializer.deserialize(requestType, data);
}
return null;
}

private Serializer getSerializer(byte serializerAlgorithm) {
if(serializerAlgorithm == SerializerAlgorithm.JSON) return Serializer.DEFAULT;
return null;
}

private Class<? extends Packet> getRequestType(byte command) {
if(command == 1) return LoginRequest.class;
return null;
}

}

客户端登录

客户端会先构建一个登录请求对象,然后通过编码把请求对象编码为 ByteBuf,写到服务端。服务端接收到 ByteBuf 后,先对收到的 ByteBuf 解码为登录响应请求,进行校验。通过校验之后,构造一个响应对象,编码返回客户端。客户端解码,根据响应对象判断是否登录成功。

将客户端和服务端的 Handler 更换成 ClientHandler 和 ServerHandler,后续处理逻辑在这个两个类中实现。

1
2
3
4
5
public class ClientHandler extends ChannelInboundHandlerAdapter {
}

public class ServerHandler extends ChannelInboundHandlerAdapter {
}

首先客户端创建请求并发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ":客户端开始登录");
LoginRequest loginRequest = new LoginRequest();
loginRequest.setUserId(UUID.randomUUID().toString());
loginRequest.setUsername("soyo");
loginRequest.setPassword("123456");

// 编码
ByteBuf byteBuf = PacketCodeC.getINSTANCE().encode(ctx.alloc(), loginRequest);

// 发送
ctx.writeAndFlush(byteBuf);
}

服务端接收,解码并处理请求,生成响应返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

// 解码
Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf);
LoginResponse response = new LoginResponse();
if(packet instanceof LoginRequest) {
LoginRequest request = (LoginRequest) packet;
request.setVersion(request.getVersion());
if(vaild(request)) {
// 校验成功
System.out.println(new Date() + "校验成功");
response.setSuccess(true);
} else{
// 校验失败
System.out.println("校验失败");
response.setSuccess(false);
response.setMsg("账号密码校验失败");
}
// 发送响应
ByteBuf responseBuf = PacketCodeC.getINSTANCE().encode(ctx.alloc(), response);
ctx.writeAndFlush(responseBuf);
}
}

// 假设用户是 saki
public boolean vaild(LoginRequest loginRequest) {
return loginRequest.getUsername().equals("saki") && loginRequest.getPassword().equals("123456");
}

客户端处理响应

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf);
if(packet instanceof LoginResponse) {
LoginResponse response = (LoginResponse) packet;
if(response.getSuccess()) {
System.out.println(new Date() + ": 客户端登录成功");
} else {
System.out.println(new Date() + ": 客户端登录失败-->" + response.getMsg());
}
}
}

客户端与服务端收发消息

定义消息对象

客户端发送给服务端的消息为定义为 MessageRequest。

1
2
3
4
5
6
7
8
@Data
public class MessageRequest extends Packet {
private String msg;
@Override
public Byte getCommand() {
return Command.MESSAGE_REQUEST;
}
}

设置指令 MESSAGE_REQUEST = 3。

服务端发送给客户端的消息为 MessageResponse。

1
2
3
4
5
6
7
8
@Data
public class MessageResponse extends Packet {
private String msg;
@Override
public Byte getCommand() {
return Command.MESSAGE_RESPONSE;
}
}

判断是否登录成功

客户端在发送消息之前需要登录,登录成功才能发消息。可以通过给 Channel 绑定属性来保存客户端的登录状态。

定义登录成功的标志位

1
2
3
public interface Attributes {
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
}

修改 NettyClient 连接成功后启动应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if(future.isSuccess()) {
System.out.println("连接成功");
// 连接成功之后启动控制台线程
Channel channel = ((ChannelFuture)future).channel();
starConsoleThread(channel);
} // ...
});
}

private static void starConsoleThread(Channel channel) {
new Thread(()->{
while(!Thread.interrupted()) {
if(LoginUtil.hasLogin(channel)) {
System.out.println("输入消息发送至服务端");
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();
// 构造消息请求
MessageRequest request = new MessageRequest();
request.setMsg(line);
// 发送
ByteBuf byteBuf = PacketCodeC.getINSTANCE().encode(channel.alloc(), request);
channel.writeAndFlush(byteBuf);
}
}
}).start();
}

服务端收发消息

用 if 分支区分 packet 是 Login 相关还是 Message 相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

// 解码
Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf);
if(packet instanceof LoginRequest) {
// 登录处理
} else if(packet instanceof MessageRequest) {
// 消息处理
MessageRequest request = (MessageRequest) packet;
System.out.println(new Date() + ": 收到客户端消息-->"+ request.getMsg());
MessageResponse response = new MessageResponse();
response.setMsg("服务端回复:"+request.getMsg());
ByteBuf responseBuf = PacketCodeC.getINSTANCE().encode(ctx.alloc(), response);
ctx.writeAndFlush(responseBuf);
}
}
}

客户端接收消息

和服务端类似读处理方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

Packet packet = PacketCodeC.getINSTANCE().decode(byteBuf);
if(packet instanceof LoginResponse) {
// 处理登录响应
} else if(packet instanceof MessageResponse) {
// 处理消息响应
MessageResponse response = (MessageResponse) packet;
System.out.println(new Date() + ": 收到服务端消息-->" + response.getMsg());
}
}
}

Pipline 和 ChannelHandler

随着指令的增多,对指令处理的 if-else 不可避免的会泛滥,同时一端的所有逻辑都写在一个类里,最后也会变得十分臃肿。此外,每次发送协议报文时都需要手动调用编码器。Netty 中的 Pipeline 和 ChannelHandler 就是解决这个问题的。它通过责任链模式组织代码逻辑,并且支持逻辑的动态添加和删除,Netty 能够支持各类协议的扩展,如 HTTP、WebSocket 等,靠的就是 Pipline 和 ChannelHandler。

Pipline 和 ChannelHandler 的构成