RocketMQ 关于网络方面核心类图如下所示:
接下来先一一介绍各个类的主要职责。
RPC 远程服务基础类。主要定义所有的远程服务类的基础方法:
void start()
:启动远程服务。void shutdown()
:关闭。void registerRPCHook(RPCHook rpcHook)
:注册 RPC 钩子函数,有利于在执行网络操作的前后执行定制化逻辑。远程服务器/客户端基础接口,两者中的方法基本类似,故这里重点介绍一下 RemotingServer,定位 RPC 远程操作的相关“业务方法”。
1 | void registerProcessor(int requestCode, NettyRequestProcessor processor,ExecutorService executor) |
注册命令处理器,这里是 RocketMQ Netty 网络设计的核心亮点,RocketMQ 会按照业务逻辑进行拆分,例如消息发送、消息拉取等每一个网络操作会定义一个请求编码(requestCode),然后每一个类型对应一个业务处理器 NettyRequestProcessor,并可以按照不同的 requestCode 定义不同的线程池,实现不同请求的线程池隔离。其参数说明如下。
1 | int requestCode |
命令编码,rocketmq 中所有的请求命令在 RequestCode 中定义。
1 | NettyRequestProcessor processor |
RocketMQ 请求业务处理器,例如消息发送的处理器为 SendMessageProcessor,PullMessageProcessor 为消息拉取的业务处理器。
1 | ExecutorService executor |
线程池,NettyRequestProcessor 具体业务逻辑在该线程池中执行。
1 | Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode) |
根据请求编码获取对应的请求业务处理器与线程池。
1 | RemotingCommand invokeSync(Channel channel, RemotingCommand request,long timeoutMillis) |
同步请求调用,参数说如下:
1 | void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) |
异步请求调用。
1 | void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) |
Oneway 请求调用。
Netty 远程服务抽象实现类,定义网络远程调用、请求,响应等处理逻辑,其核心方法与核心方法的设计理念如下。
NettyRemotingAbstract 核心属性:
Semaphore semaphoreOneway:控制 oneway 发送方式的并发度的信号量,默认为 65535 个许可。
Semaphore semaphoreAsync:控制异步发送方式的并发度的信号量,默认为 65535 个许可。
ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable
:当前正在等待对端返回的请求处理表,其中 opaque 表示请求的编号,全局唯一,通常采用原子递增,通常套路是客户端向对端发送网络请求时,通常会采取单一长连接,故发送请求后会向调用端立即返回 ResponseFuture,同时会将请求放入到该映射表中,然后收到客户端响应时(客户端响应会包含请求 code),然后从该映射表中获取对应的 ResponseFutre,然后通知调用端的返回结果,这里是Future 模式在网络编程中的经典运用。
```
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
1 |
|
public boolean isOK() {
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
**3\. 请求发送示例**
以同步消息发送为例我们来看一下消息发送的使用示例,其示例代码如下:
![7](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112520589.png)
使用关键点如下:
* 首先会为每一个请求进行编号,即所谓的 requestId,在这里使用便利 opaque 来表示,在单机内唯一即可。
* 然后基于 Future 模式,创建 ResponseFuture,并将其放入到`ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable`,当客户端收到服务端的响应后,需要根据 opaque 查找到对应的 ResponseFuture,从而唤醒客户端。
* 通过使用 CHannel 的 writeAndFlush 方法,将请求 Request 通过网络发送到服务端,内部会使用编码器 NettyEncoder 将 RemotingCommand request 编码二级制流,并使用 addListener 添加回调函数,在回调函数中进行处理,唤醒处理结果。
* 同步调用的实现方式,通过调用 Future 的 waitResponse 方法,收到响应结果该方法被唤醒。
#### **Netty 服务端编程实践**
**1\. Netty 服务端创建示例**
**Step1**:创建 Boss、Work 事件线程组。Netty 的服务端线程模型采用的是主从多 Reactor 模型,会创建两个线程组,分别为 Boss Group 与 Work Group,其创建示例如下图所示:
![8](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112528931.png)
通常 Boos Group 默认使用一个线程,而 Work 线程组通常为 CPU 的合数,Work 线程组通常为 IO 线程池,处理读写事件。
**Step2**:创建默认事件执行线程组。
![9](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112618407.png)
关于该线程池的作用与客户端类似,故不重复介绍。
**Step3**:使用 Netty ServerBootstrap 服务端启动类构建服务端。(模板)
![10](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112639194.png)
通过 ServerBootstrap 构建的关键点如下:
* 通过 ServerBootstrap 的 group 的指定 boss、work 两个线程组。
* 通过 ServerBootstrap 的 chanel 方法指定通道的类型,通常有 NioServerSocketChannel、EpollServerSocketChannel。
* 通过 option 方法设置 EpollServerSocketChannel 相关的网络参数,即监听客户端请求的网络通道相关的参数。
* 通过 childOption 方法设置 NioSocketChannel 的相关网络参数,即读写 Socket 相关的网络参数。
* 通过 localAddress 方法绑定到服务端指定的 IP、端口。
* 通过 childHanlder 方法设置实际处理监听器,是应用程序通过 Netty 编程主要的业务切入点,与客户端类似,其中 ServerHandler 为服务端的业务处理 Handler,编码解码与客户端无异。
**Step4**:调用 ServerBootstrap 的 bind 方法绑定到指定端口。
![11](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112647675.png)
ServerBootstrap 的 bind 的方法是一个非阻塞方法,调用 sync\(\) 方法会变成阻塞方法,即等待服务端启动完成。
**2\. Netty ServerHandler 编写示例**
服务端在网络通信方面无非就是接受请求并处理,然后将响应发送到客户端,处理请求的入口通常通过定义 ChannelHandler,我们来看一下 RocketMQ 中编写的 Handler。
![12](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112656230.png)
服务端的业务处理 Handler 主要是接受客户端的请求,故通常关注的是读事件,可以通常继承 SimpleChannelInboundHandler,并实现 channelRead0,由于已经经过了解码器(NettyDecoder),已经将请求解码成具体的请求对象了,在 RocketMQ 中使用 RemotingCommand 对象,只需面向该对象进行编程,processMessageReceived 该方法是 NettyRemotingClient、NettyRemotingServer 的父类,故对于服务端来会调用 processReqeustCommand 方法。
**在基于 Netty4 的编程,在 ChannelHandler 加上\@ChannelHandler.Sharable 可实现线程安全。**
> 温馨提示:在 ChannelHandler 中通常不会执行具体的业务逻辑,通常是只负责请求的分发,其背后会引入线程池进行异步解耦,在 RocketMQ 的实现中更加如此,在 RocketMQ 提供了基于“业务”的线程池隔离,例如会为消息发送、消息拉取分别创建不同的线程池。这部分内容将在下文详细介绍。
#### **协议编码解码器**
基于网络编程,通信协议的制定是最最重要的工作,通常关于通信协议的设计套路如下:
![13](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112704844.png)
通常采用的是 Header + Body 这种结构,通常 Header 部分是固定长度,并且在 Header 部分会有一个字段来标识整条消息的长度,至于头结点中是否会放置其他字段。这种结构非常经典,实现简单,特别适合在接收端从二进制流中解码请求,其关键点如下:
* 接收端首先会尝试从二级制流中读取 Header 长度个字节,如果当前可读取字节不足 Header 长度个字节,先累计,等待更多数据到达。
* 如果能读取到 Header 长度个字段,按照 Header 的格式读取该消息的总长度,然后尝试读取总长度的消息,如果不足,说明还未收到条完整的消息,等待更多数据的到达;如果缓存区中能读取到一条完整的消息,就按照消息格式进行解码,按照特定的格式,将二级制转换为请求对象,例如 RocketMQ 的 RemotingCommand 对象。
由于这种模式非常通用,故 Netty 提供了该解码的通用实现类:LengthFieldBasedFrameDecoder,即能够从二级制流中读取一个完整的消息自己缓存区,应用程序自己实现将 ByteBuf 转换为特定的请求对象即可,NettyDecoder 的示例如下:
![14](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112713468.png)
而 NettyEncoder 的职责就是将请求对象转换成 ByteBuf,即转换成二级制流,这个对象转换为上图中协议格式(Header + Body)这种格式即可。
### 线程隔离机制
通常服务端接收请求,经过解码器解码后转换成请求对象,服务端需要根据请求对象进行对应的业务处理,避免业务处理阻塞 IO 读取线程,通常业务的处理会采用额外的线程池,即**业务线程池**,RocketMQ 在这块采用的方式值得我们借鉴,提供了不同业务采用不同的线程池,实现线程隔离机制。
RocketMQ 为每一个请求进行编码,然后每一类请求会对应一个 Proccess(业务处理逻辑),并且将 Process 注册到指定线程池,实现线程隔离机制。
**Step1**:首先在服务端启动时会先进行静态注册,将请求处理器与执行的线程池进行对应,其代码示例如下:
![15](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112721817.png)
**Step2**:服务端接受到请求对象后,根据请求命令获取对应的 Processor 与线程池,然后将任务提交到线程池中执行,其代码示例如下所示(NettyRemotingAbstract#processRequestCommand)。
![16](https://posts-cdn.lilu.org.cn/gitchat/rocketmqszyjj/20200920112729518.png)
本篇就介绍到这里了,以 RocketMQ 中使用 Netty 编程为切入点,梳理出基于 Netty 进行网络编程的套路。