从零手撸一个Rpc框架-2-传输层、编码层、代理层

这里我完成了最初始的Config层,Proxy层,Codec层,以及Transport层。

具体来说,我使用Netty作为Transport层,并进行了半包和粘包的处理;我自己定义了自己的通信协议:susu协议,协议头待会会介绍;使用Java原生的动态代理作为Proxy;Config层只写了最基础的代码。

GitHub项目地址:susu

这篇博客干货比较多,基本都是代码,前方高能:

一、测试程序

Rpc服务接口:

1
2
3
public interface IService {
String say(String name);
}

Client代码(也叫consumer):

1
2
3
4
5
6
7
8
9
10
11
public class ClientTest {

public static void main(String[] args) throws Exception {
Reference<IService> reference = new Reference<>();
reference.setInterfaceClass(IService.class);
IService service = reference.getRefer();
// Thread.sleep(1000);
String result = service.say("zrj");
System.out.println(result);
}
}

Server代码(也叫Provider):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ServerTest {
public static void main(String[] args) {
Exporter<IService> exporter = new Exporter<>();
exporter.setInterfaceClazz(IService.class);
exporter.setRef(new IServiceImpl());
exporter.export();
}

private static class IServiceImpl implements IService {
@Override
public String say(String name) {
return "from rpc " + name;
}
}
}

这部分代码上篇博客最后已经介绍过了,这篇博客将底层的实现全部完成了,运行结果如下:

二、代码细节

我先介绍两个类:Request和Response,这两个类封装了通信payload中的所有信息,我们的Rpc框架使用这两个类进行网络的信息交换,之后也会基于这两个类做序列化等工作。

Request:

各个字段的说明写在注释里面了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Request {

// 每次请求唯一的id
private long requestId;
// 服务的接口名称,带包名
private String interfaceName;
// 方法名
private String methodName;
// 参数列表的类型,用英文逗号分隔,名称带包名
private String argsType;
// 参数列表对应的参数值,会被序列化
private Object[] argsValue;

/* getter & setter */
}

Response:

1
2
3
4
5
6
7
8
9
10
11
public class Response {

// 对应的是那个请求的id
private long requestId;
// 返回值
private Object returnValue;
// 抛出了异常
private Exception exception;

/* getter & setter */
}

1、服务端:

Exporter:

Exporter代码超简单,调用export时直接打开了一个NettyServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 */
public class Exporter<T> {

private T ref;
private Class<T> interfaceClazz;

public void setInterfaceClazz(Class<T> interfaceClazz) {
this.interfaceClazz = interfaceClazz;
}

public void setRef(T ref) {
this.ref = ref;
}

public void export() {
NettyServer nettyServer = new NettyServer(new Provider<>(ref, interfaceClazz));
nettyServer.open();
}
}

这里面引入了两个个新的类:NettyServer、Provider,我们先看NettyServer。

NettyServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NettyServer extends ChannelDuplexHandler {

private Handler handler;

public NettyServer(Handler handler) {
this.handler = handler;
}

public void open() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new NettyDecoder());
pipeline.addLast("encoder", new NettyEncoder());
NettyChannelHandler channelHandler = new NettyChannelHandler(handler);
pipeline.addLast("handler", channelHandler);
}
});
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = serverBootstrap.bind(20880);
f.syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}
}
}

一个最基本的Netty服务,里面接受一个参数:handler,这是一个通用接口,用来处理具体逻辑,当NettyServer接受到请求后,最终的处理逻辑会委托给handler。

Handler:

1
2
3
4
5
6
7
8
/**
* 传输层获得数据后,交给具体的处理方法
*
* @author zrj
*/
public interface Handler {
Object handle(Object message);
}

NettyServer代码中出现了几个新类:NettyDecoder、NettyEncoder、NettyChannelHandler,这里先简单介绍一下:NettyDecoder用来解决半包粘包问题,NettyEncoder没什么用,NettyChannelHandler是一个标准的Netty的ChannelHandler,用来处理各种网络事件,待会会仔细的看这几个类,现在先简单介绍一下有个印象。

当我们创建完NettyServer后,会启动20880接口去监听服务并阻塞等待。

NettyChannelHandler:

这个类是一个标准的ChannelHandler类,用来处理网络事件,Client端和Server端共用这个类,之后如果Client端和Server端的代码差异增加,这个类可以拆开成两个ChannelHandler,分别用于Client端和Server端,这里我偷懒就直接写成一个了,继承了ChannelDuplexHandler,这里我们先看怎么处理Request的部分逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NettyChannelHandler extends ChannelDuplexHandler {

private Codec codec;
private Handler handler;

public NettyChannelHandler(Handler handler) {
this.codec = new SusuCodec();
this.handler = handler;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Object object = codec.decode((byte[]) msg);
if (!(object instanceof Request) && !(object instanceof Response)) {
throw new SusuException("NettyChannelHandler: unsupported message type when encode: " + object.getClass());
}
if (object instanceof Request) {
processRequest(ctx, (Request) object);
} else {
processResponse(ctx, (Response) object);
}
}

private void processRequest(ChannelHandlerContext ctx, Request msg) {
Object result = handler.handle(msg);
Response response = new Response();
response.setRequestId(msg.getRequestId());

if(result instanceof Exception) {
response.setException((Exception) result);
} else {
response.setReturnValue(result);
}
sendResponse(ctx, response);
}

private void processResponse(ChannelHandlerContext ctx, Response msg) {
handler.handle(msg);
}

private ChannelFuture sendResponse(ChannelHandlerContext ctx, Response response) {
byte[] msg = codec.encode(response);
if (ctx.channel().isActive()) {
return ctx.channel().writeAndFlush(msg);
}
return null;
}
}

可以看到,当我们收到msg后,直接调用了Codec进行解码,这里msg一定是byte[],这一点是由NettyDecoder保证的,之后在讲半包粘包问题时会介绍到这部分代码。

解码之后,如果是Request,则委托给handler去执行具体的代码,这里的Handle是我们创建NettyServer的时候传进来的,是一个Provider实例,下面会介绍Provider。Provider返回给我们一个结果,我们先判断结果是否是一个异常,并创建一个Response填充具体的字段,最后我们使用sendResponse方法将这个Response返回给客户端,当然返回之前还是要先编码为byte[]。

到这里,一个完整的过程已经很清晰了,我们剩下需要关心的是两件事情,一个是如何编解码,一个是Provider具体干了什么,编解码对于这部分逻辑是透明的,你只用知道byte[]被转换成了Request或者Response就可以了,所以编解码部分最后再讲。

下面我们看看Provider。

Provider:

可以看到,在我们创建NettyServer的时候传入了一个Provider实例,这个实例实现了Handler接口,看看里面干了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @author zrj CreateDate: 2019/9/5
*/
public class Provider<T> implements Handler {

protected Map<String, Method> methodMap = new ConcurrentHashMap<>();
private T ref;
private Class<T> interfaceClazz;

/**
* 找到所有interfaceClazz可以调用的方法,并缓存下来,缓存名字要保留参数类型的完整名称,防止函数重载
*/
public Provider(T ref, Class<T> interfaceClazz) {
if(!interfaceClazz.isInterface()) {
throw new SusuException("Provider: interfaceClazz is not a interface!");
}
this.ref = ref;
this.interfaceClazz = interfaceClazz;
List<Method> methods = ReflectUtils.parseMethod(interfaceClazz);
for(Method method : methods) {
String methodDesc = ReflectUtils.getMethodDesc(method);
methodMap.putIfAbsent(methodDesc, method);
}
}

@Override
public Object handle(Object message) {
if(!(message instanceof Request)) {
throw new SusuException("Provider: handle unsupported message type: " + message.getClass());
}
Request request = (Request) message;
String methodName = ReflectUtils.getMethodDesc(request.getMethodName(), request.getArgsType());
Method method = methodMap.get(methodName);
if(method == null) {
return new SusuException("Provider: can't find method: " + methodName);
}
try {
return method.invoke(ref, request.getArgsValue());
} catch (Exception e) {
return new SusuException("Provider: exception when invoke method: " + methodName, e);
} catch (Error e) {
return new SusuException("Provider: error when invoke method: " + methodName, e);
}
}
}

可以看到,构造函数中,首先找到了interfaceClazz的所有可以被调用的Method对象,这里只保留了限定符为public的方法,并将这些方法全部缓存到本地,Key用的是方法签名,方法签名包括参数列表,防止有重载的方法。

当调用handler的时候,将message转成Request类型(这个在调用方保证),然后拿到Request中的调用的方法和参数信息组装成Key,通过Key拿到对应的Methed,然后通过反射调用具体的方法,最后返回调用的结果,如果调用出错则抛出异常。

好了,到这里看看我们完成了什么功能,首先我们使用Netty开了一个端口监听请求,请求到来之后经过最后扔进了Provider中调用具体的方法,并返回调用结果。

当然,Netty传过来的数据是二进制数据,需要反序列化。

其实服务端的代码到这里就介绍完了,是不是很简单!

回顾一下主要是三个类,

  • NettyServer,用来启动一个Netty服务监听网络。
  • NettyChannelHandler,用来接收网络请求,并将二进制请求反序列化为Request对象,然后调用Provider获取结果,包装成Response返回给客户端。
  • Provider,持有服务接口的实现的引用,并使用反射解析服务接口的各种方法,当Request对象到来时,根据Request中的信息得到具体需要调用的方法,使用反射调用后获取结果,返回给NettyChannelHandler。

好了服务端的代码先到这,接下来看看客户端的代码,客户端也就是Rpc的调用方。

2、客户端

Reference:

Reference也很简单,使用默认的ProxyFactory创建一个代理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Reference<T> {

private ProxyFactory<T> proxyFactory;
private Class<T> interfaceClass;

public Reference() {
this.proxyFactory = new ProxyFactory<>();
}

public T getRefer() {
return proxyFactory.getProxy(interfaceClass);
}

public Class<T> getInterfaceClass() {
return interfaceClass;
}

public void setInterfaceClass(Class<T> interfaceClass) {
this.interfaceClass = interfaceClass;
}
}

ProxyFactory:

1
2
3
4
5
6
public class ProxyFactory<T> {
@SuppressWarnings("unchecked")
public T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new ProxyHandler());
}
}

创建了一个代理,重点在于ProxyHandler,看看ProxyHandler干了什么。

ProxyHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ProxyHandler implements InvocationHandler {

private NettyClient client;

public ProxyHandler() {
this.client = new NettyClient();
client.open();
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

Request request = new Request();
request.setInterfaceName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setArgsType(getArgsTypeString(args));
request.setArgsValue(args);
Response response = client.invoke(request);

if(response.getException() != null) {
throw response.getException();
}

return response.getReturnValue();
}

private String getArgsTypeString(Object[] args) {
if(args.length <= 0) {
return "";
}
StringBuilder sb = new StringBuilder();
for(Object object : args) {
sb.append(object.getClass().getName()).append(",");
}
if(sb.length() > 0) {
sb.setLength(sb.length() - ",".length());
}
return sb.toString();
}
}

可见这个类是整个客户端的重点,首先,ProxyHandler构造时,首先创建了一个NettyClient并持有。当代理类的方法被调用的时候,首先根据调用的运行时信息创建Request,然后调用Response response = client.invoke(request)获取Response,然后返回具体的结果,抛出异常或者正常返回。

具体的调用工作交给了NettyClient。

NettyClient:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class NettyClient {

private io.netty.channel.Channel clientChannel;
private Codec codec = new SusuCodec();
private Map<Long, ResponseFuture> currentTask = new ConcurrentHashMap<>();

public Response invoke(Request request) {
byte[] msg = codec.encode(request);
ResponseFuture response = new DefaultResponseFuture();
currentTask.put(request.getRequestId(), response);
clientChannel.writeAndFlush(msg);
try {
return (Response) response.getValue();
} catch (Exception e) {
Response response1 = new Response();
response1.setRequestId(request.getRequestId());
response1.setException(new TransportException("NettyClient: response.getValue interrupted!"));
return response1;
}
}

public void open() {

Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new NettyDecoder());
pipeline.addLast("encoder", new NettyEncoder());
pipeline.addLast("handler", new NettyChannelHandler(
message -> {
Response response = (Response) message;
ResponseFuture future = currentTask.remove(response.getRequestId());
future.onSuccess(response);
return null;
}
));
}
}
);

new Thread(() -> {
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 20880).sync();
clientChannel = future.channel();
clientChannel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}

当调用open之后,同样很简单的创建了一个Netty客户端,不过这里bootstrap.connect("127.0.0.1", 20880).sync();方法在另外一个线程中调用,不然会阻塞我们的Main方法,之后的代码无法运行。

在里面创建客户端时,同样加入了NettyDecoder,NettyEncoder,NettyChannelHandler以及一个匿名的Handler,这个Handler用来通知ResponseFuture,服务端已经返回结果了。

ResponseFuture是一个Future,用来异步获取Netty服务返回的结果。

ResponseFuture:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class DefaultResponseFuture implements ResponseFuture {

private static final int NEW = 0;
private static final int SUCCESS = 1;
private static final int CANCEL = 2;
private static final int FAILED = 3;

private final Object lock = new Object();
private volatile int status;
private Response value;

public DefaultResponseFuture() {
this.status = NEW;
}

@Override
public void onSuccess(Response response) {
synchronized (lock) {
value = response;
status = SUCCESS;
lock.notifyAll();
}
}

@Override
public void onFailure(Response response) {
synchronized (lock) {
value = response;
status = FAILED;
lock.notifyAll();
}
}

@Override
public Object getValue() throws InterruptedException {
if (status > 0) {
return value;
}
synchronized (lock) {
if (status > 0) {
return value;
}
lock.wait();
}
return value;
}
}

可以看到,当我们调用getValue()方法时,如果结果还没有准备好,会挂起当前调用的线程,直到onSuccess方法被调用,设置进来结果后,才会唤醒所有等待的线程。onSuccess方法刚刚已经看到了,这个方法被注册进了NettyChannelHandler,当Netty服务端返回时,onSuccess就会被回调。

最后看看NettyClient中的invoke方法,这个方法超级简单,首先序列化Request请求,然后创建一个DefaultResponseFuture用来异步获取结果, 并将当前的请求放入本地的缓存中,方便异步返回时配对,然后调用clientChannel.writeAndFlush(msg);,将请求发给服务端,最后调用DefaultResponseFuture.getValue阻塞等待结果。

3、编解码,susu协议

编解码器的接口:

1
2
3
4
5
6
7
8
9
/**
* 编解码器
*
* @author zrj CreateDate: 2019/9/5
*/
public interface Codec {
byte[] encode(Object message) throws CodecException;
Object decode(byte[] data) throws CodecException;
}

susu协议编解码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 编解码核心类:
*
* 协议头:
*
* | magic 16bit | version 8bit | type flag 8bit |
* | content length 32 bit |
* | request id 64 bit |
* | request id 64 bit |
* | content ... |
*
*
* @author zrj CreateDate: 2019/9/5
*/
public class SusuCodec implements Codec {

不用多解释了,协议头写在注释里面啦!

至于具体的编解码过程没什么好说的,就是硬编码,感兴趣的朋友们看Github上的源码,GitHub项目地址:susu

除了协议头之外,还有对Request和Response对象的序列化,我使用的是FastJson序列化框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FastJsonSerialization implements Serialization {

@Override
public byte[] serialize(Object object) throws IOException {
SerializeWriter out = new SerializeWriter();
JSONSerializer serializer = new JSONSerializer(out);
serializer.config(SerializerFeature.WriteEnumUsingToString, true);
serializer.config(SerializerFeature.WriteClassName, true);
serializer.write(object);
return out.toBytes("UTF-8");
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) throws IOException {
return JSON.parseObject(new String(bytes), clazz);
}
}

4、半包与粘包

半包与粘包发生在TCP传输中,由于TCP是面向流的协议,TCP本身不知道如何去截断有完整语义的数据包,所以在客户端看来是分离的单独语义的数据包经过TCP传输后可能有的数据包被截断了,有的数据包被连在一起了,这是需要我们自己设计协议去解析数据流,将我们自己定义的数据包从TCP流中识别出来。

最常用的方法就是在我们自定义的协议头中加入content length字段,标识这段数据包有多少数据。

在netty中,netty提供了方便的用于处理半包粘包问题的入口。

我们可以继承ByteToMessageDecoder,每次轮询到TCP中有未读数据后,会调用decode方法,decode方法会让你有机会先”检视“一次数据,如果数据不完整(发生了半包)的话,就直接return,等待TCP的下次轮询,当有足够的数据之后,我们可以根据自己的规则,将数据写入到List\<Object> out参数中,告诉netty我们有足够的数据了,可以继续进行下面的步骤了。

NettyDecoder:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class NettyDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 数据比协议头小,直接返回
if (in.readableBytes() <= CodecConstants.HEADER_SIZE) {
return;
}

// 标记初始位置
in.markReaderIndex();
short magic = in.readShort();
if(magic != CodecConstants.MAGIC_HEAD) {
in.resetReaderIndex();
throw new TransportException("NettyDecoder: magic number error: " + magic);
}

in.skipBytes(2);
int contentLength = in.readInt();
if(in.readableBytes() < contentLength + 8/* requestId 8 byte */) {
in.resetReaderIndex();
return;
}

// 全部读取
in.resetReaderIndex();
byte[] data = new byte[CodecConstants.HEADER_SIZE + contentLength];
in.readBytes(data);
out.add(data);
}
}

首先检查数据大小是否大于协议头,大于协议头我们才继续。

由于大于协议头,所以后面的16个字节的数据我们可以放心的读出,而不用担心越界异常。我们先检查了一下Magic头,看看是不是我们协议的数据,然后去读contentLength字段,我们就知道了这个数据头所携带的数据包有多大,然后看看ByteBuf中是否有足够的数据,如果没有直接返回,等待下次轮询,如果有了足够数据,则直接取出这个数据包中的所有数据,加入到out中。

千万要注意,这里只能读出本数据包中的数据,由于可能发生粘包,如果将ByteBuf中的数据全部读出来,可能会读到下个数据包的部分数据,导致真正要处理下个数据包的时候,读不出完整的数据,从而导致报错或者一些意想不到的错误,甚至死循环。

三、总结

我们进行了最最简单的编码,直奔最终的结果,去完成一次最简单的RPC调用,这些代码写完的时间非常短,所以里面可能有大量大量的隐藏问题。这段代码暂时仅用于学习和理解RPC框架流程,随着之后的完善,不排除用于生产环境的可能。

代码在这。

GitHub项目地址:susu

之后会引入Zookeeper作为注册中心,支持Cluster和负载均衡部分,并完善代码逻辑,加入各种设置(本文中都没有设置入口,端口和ip全都写死的)。

如果要运行,请将Client中的sleep代码放出来,因为有可能调用的时候,Client和Server的TCP还没链接,之后也会修复这个问题。

我不喝咖啡,但是我相信知识有价。