从零手撸一个Rpc框架-3-Cluster层、代码重构

GitHub项目地址:susu

之前我们可以说是直接了当的实现了传输层(netty)、编码层(susu协议)、代理层(动态代理),完成了一个rpc框架所需的最少代码,上次代码一共800多行,可以说非常“mini”了。

先看看上次遗留的一个小问题:demo中的client需要sleep一下,我们只用把NettyClient#open方法稍微修改一下就行了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
new Thread(() -> {
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 20880).sync();
clientChannel = future.channel();
clientChannel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}).start();

// 改为:
ChannelFuture future = bootstrap.connect("127.0.0.1", 20880).sync();
clientChannel = future.channel();
new Thread(() -> {
try {
clientChannel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}).start();

我们只用把connect操作从线程里面拿出来即可,这样在open方法退出时,可以保证client已经连上server了。当然,我现在对这部分逻辑进行了更加细致的操作,不再是new Thread了,请看最新代码:

susu

这次更新,我加入了对zookeeper的支持,使用zookeeper做注册中心,做了服务注册和发现,并将项目做了分层和抽象,初步完成了配置层,以及最后同样写了一个例子,一起来看看吧~

问题

我们的mini版代码中,代码是严重耦合的,这里不再细致的探讨设计模式的思想,之后会专门写一篇博客探讨设计模式的思想。这里我们使用抽象和分层去解决耦合的问题,保证层与层之间单向依赖,并保证这种依赖是建立在接口上的,而不是实现上的。

之前的代码没有做任何可用性扩展,这里我定义了几个自定义异常去包装各种异常信息,并抽象出了一个LifeCircle接口,用来管理所有对象的生命周期。在dubbo和motan中,这个接口的名称是Node,也是用来管理生命周期的。

之前的代码客户端和服务端是直连的,没有配置,没有服务注册发现,也没有服务簇,这次通过加入Config层、Registry层、Cluster层开始着手去处理这些问题。

一、项目分模块

目前susu项目分为5层,每层之间单向依赖,从下到上依次是:

  • susu-core:核心模块,定义了一些基本接口、常量、异常、工具类以及一些基础类。
  • susu-transport:传输层,定义了传输层的接口、netty版本的实现以及编解码层的实现。这里将codec层和transport层合并了,因为这两层本质上都属于传输层的逻辑,之后如果需要独立出编解码层可以考虑拆开。
  • susu-cluster:服务簇,包装了传输层,管理传输层的生命周期,并完成了负载均衡功能。
  • susu-registry:注册层,定义了基本的注册接口,并实现zookeeper版本的注册中心。
  • susu-config:总设置层,定义了服务暴露和服务发现的接口和功能实现,是rpc框架的总入口。

对比与dubbo和motan的层次接口(系列博客第一篇有讲),我这里层次更少一些,原因目前还不需要这么多层。

接下来一层一层看吧,core模块不单独讲,因为各层都会或多或少的使用到core中的代码,所以我们直接从传输层开始看,看看在上篇博客的基础上,做了哪些改进。

在介绍具体代码之前我要先说一些我的设计思想:所有的设计均基于实际需求,而不是形式上的设计。

插播!】这里我分享一个我自己的小经验:多数的设计是基于已有的架构遇到问题后在解决的过程中设计并完善的,所有的设计都不是无的放矢的。我自己最开始想去写一些抽象的接口的时候就会犯难,因为总是想着先去抽象一个什么什么接口再去实现,而结果往往是无从下手,不妨换一个思路,先去实现功能而不管实现的过程有多蠢,然后再review代码考虑该怎么优化以及抽象,由于你自己实现了一遍功能,在实现的过程中就能感受到哪些地方是可能重用的,可以抽出来甚至抽象成一个接口,所以这时候抽象出来的接口是真正基于功能抽象出来的。所有的抽象都应该基于功能而不是想象,想象出来的抽象往往很多地方是和实际功能相冲突的。当然,除了那些一眼就知道要怎么抽象的接口可以直接写之外。这里是有实际依据的,我看了spring1.0版的代码,那时候还不叫spring,叫interface21,然后也看了spring5.0的代码,发现很多抽象层次和接口是后来才扩展出来的,这说明是在开发过程中,发现有些地方可以或者说需要抽象成一个接口,然后再进行抽象的,而不是在spring最初版本就把所有的接口定义好了的。但随着接口和抽象的越来越熟练,在设计的时候就就可以提前写好很多的接口了,这时候的设计可能才开始从上到下设计,但在没那么的熟练之前,还是可以考虑由下到上的设计方式。

二、susu-transport

我们需要思考一个问题,传输层其实是可能会需要支持不同的协议的,这里的传输层不等于七层网络中的传输层,这里的传输层单指网络传输,里面可能会包含一些应用层的协议,例如http等。所以我们需要抽象出一个传输层的api。传输层需要考虑到连接是否可用等可用性问题,所以需要进行生命周期管理。

1. Channel

传输层说白了是管理端到端的连接用的,所以传输层的最基础的api应该是一个信道(或者说连接也行,后文使用信道这个词,代表客户端到服务端的一个通信通道),我们需要知道该信道是否打开或者关闭,需要有打开信道和关闭信道的方法,需要知道该信道上使用的编码协议,以及对于该信道的各种设置。我们就有下面所示的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Channel {

/**
* 使用在该信道上的编解码协议
*/
Codec getCodec();

/**
* 该传输层的设置
*/
URL getUrl();

boolean isOpened();
boolean isClosed();

/**
* 打开信道
*/
void open();

/**
* 关闭信道
*/
void close();
}

Codec接口在上篇博客介绍过了,里面有两个方法,编码和解码,这里再稍微回顾一下:

1
2
3
4
public interface Codec {
byte[] encode(Object message) throws CodecException;
Object decode(byte[] data) throws CodecException;
}

2. Client&Server

在Channel接口之后,我们需要分为客户端信道和服务端信道,客户端操作信道和服务端操作信道应该是有区别的,比如客户端需要有从信道发送信息给服务端并获得回复的功能:

1
2
3
4
5
6
public interface Client extends Channel {
Response invoke(Request request);
}

public interface Server extends Channel {
}

严格来说,Channel需要有读写信息的接口,这也是是一个信道的基础功能之一,实际上Dubbo就是这样抽象的,但是这里考虑到Server端不会主动推送信息给Client端,所以虽然我们使用的C/S模式,但实际是B/S模式,所以这里在Client中扩展了一个类似HttpServlet中的方法,Server接口中不具有任何读写的功能。再一个,我们需要考虑到读写功能是否需要提供给上层使用,这里的结论是上层不需要调用这么具体的读写功能,上层只需要调用Response invoke(Request request)对于我们的rpc框架来说已经足够了(目前足够)。

这里对比一下dubbo的设计(为了缩短篇幅我把注释都删了,dubbo命名没的说,不用注释也能看懂):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Channel extends Endpoint {
InetSocketAddress getRemoteAddress();
boolean isConnected();
boolean hasAttribute(String key);
Object getAttribute(String key);
void setAttribute(String key, Object value);
void removeAttribute(String key);
}

public interface Endpoint {
URL getUrl();
ChannelHandler getChannelHandler();
InetSocketAddress getLocalAddress();
void send(Object message) throws RemotingException;
void send(Object message, boolean sent) throws RemotingException;
void close();
void close(int timeout);
void startClose();
boolean isClosed();
}

在分享一下motan的Channel设计:

1
2
3
4
5
6
7
8
9
10
11
public interface Channel {
InetSocketAddress getLocalAddress();
InetSocketAddress getRemoteAddress();
Response request(Request request) throws TransportException;
boolean open();
void close();
void close(int timeout);
boolean isClosed();
boolean isAvailable();
URL getUrl();
}

InetSocketAddress getLocalAddress(); InetSocketAddress getRemoteAddress();motan中的这两个方法全项目没有任何地方用到,虽然看起来是两个Channel需要有的基本功能,但是可能根本用不到,这就是我上面说的要面向功能抽象。所以我自己的Channel中,稍微精炼了一下,剔除了没有用的方法。

这里有一个设计小细节:当我们在接口中定义一个方法是否需要抛出异常时,所有RuntimeException只需要以注释的形式写明即可,如果显示的抛出一个RuntimeException我个人感觉是很奇怪的,显示抛出异常的用意是告诉调用方需要显示catch,而显示catch的异常通常是Exception异常而非RuntimeException,如果说既需要显示catch,又要定义为RuntimeException异常,那本身就是矛盾的。

dubbo中就有这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Invoker<T> extends Node {

Class<T> getInterface();

// 这里的RpcException就是一个RuntimeException,但是却显示的抛出了。
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}

我们看看jdk中的做法,拿Future接口的get方法举例:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

这个方法会主动抛出三个异常,不出意外的,InterruptedException和ExecutionException都是Exception类型,而CancellationException是RuntimeException类型。

这是一个小细节,不知道是我自己的理解有问题还是dubbo有他自己的用意。否则的话,RuntimeException我建议写在方法注释里面。

3. AbstractEndpoint&AbstractClient&AbstractServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
abstract public class AbstractEndpoint implements Channel, Codec {
private static final int UNKNOWN = -1;
private static final int NEW = 0;
private static final int OPEN = 1;
private static final int CLOSE = 2;

private Codec codec;
private URL url;
private volatile int CHANNEL_STATUS = 0;

public AbstractEndpoint(Codec codec, URL url) {
this.codec = codec;
this.url = url;
}

@Override
public Codec getCodec() {
return codec;
}

@Override
public URL getUrl() {
return url;
}

public void setUrl(URL url) {
this.url = url;
}

@Override
public boolean isOpened() {
return CHANNEL_STATUS == OPEN;
}

@Override
public boolean isClosed() {
return CHANNEL_STATUS == CLOSE;
}

@Override
public void open() {
CHANNEL_STATUS = OPEN;
}

@Override
public void close() {
CHANNEL_STATUS = CLOSE;
}

@Override
public byte[] encode(Object message) throws CodecException {
return codec.encode(message);
}

@Override
public Object decode(byte[] data) throws CodecException {
return codec.decode(data);
}
}

封装了状态的迁移,并代理了Codec的功能,让其子类可以直接具有编解码的功能。

AbstractClient&AbstractServer这两个类没有做更多的事情,继承了AbstractEndpoint并分别实现了Client接口和Server接口,AbstractClient保存了当前的任务和ResponseFuture的map。

里面比较重要的是支持了URL设置,URL在susu中承担的是配置总线的角色,不难想到AbstractClient&AbstractServer的继承者中的各种配置信息都可以在URL中拿到。dubbo和motan也是用URL作为配置总线,后面我们会详细讨论URL这个类,这个类还是存在一些争议的,有一些坏味道。

4. NettyClient&NettyServer

分别继承于AbstractClient&AbstractServer,里面实现了Channel中定义的open方法和close方法,close方法中完成了netty的优雅关机,以及做了一些异常处理工作。

NettyChannelHandler不出意外的被我们拆开成了两个内部类:ServerChannelHandler和ClientChannelHandler,分别存在于NettyServer和NettyClient中,其逻辑没有改变,跟上篇博客中介绍的一样。

好了,transport中的抽象就写到这里啦,总结一下就是,首先进行了抽象,然后完善了信道生命周期的管理。

三、susu-cluster

从这层开始往上可以算作是服务治理的功能范畴了,首先要介绍的是配置总线URL以及对它设计的一些讨论,然后会简单介绍一些负载均衡算法,以后会开一篇博客写一写常用的静态负载均衡算法以及动态负载均衡算法,这里实现了最简单的一种:随机负载均衡,之后会逐渐完善各种负载均衡算法。

1. URL

url作为贯穿各层的配置总线的角色存在:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class URL {

/**
* 协议
*/
private String protocol;

/**
* host地址
*/
private String host;

/**
* 服务端口
*/
private int port;

/**
* 任务路径,等同于interfaceName
*/
private String path;

/**
* 通用设置
*/
private Map<String, String> parameters;
}

具体的方法没有展示出来,但是从他的属性就能看出一些。对于rpc框架来说,其处理的对象本身就是各种各样的终端,客户端、服务端、注册中心、配置中心、监控中心等等,这些不同的终端无一不是依靠网络通信,既然是网络通信,就一定可以使用URL去描述一个网络资源。所以在RPC框架中使用URL作为一个终端的配置总线是水到渠成的事情,URL中封装了终端的协议类型,ip地址,服务端口,资源路径,以及各种自定义参数。

但这里面存在着坏味道—— URL太泛了,理解门槛较高,并有误用的风险,后面再讲config层的时候我们可以看到一些使用URL作为配置总线的问题,这里先看cluster中的内容。

2. LifeCircle

服务治理层有自己的控制生命周期的接口:LifeCircle。

1
2
3
4
5
6
public interface LifeCircle {

void init();
void destroy();
boolean isAvailable();
}

所有与资源有关的接口都应该继承这个接口。

3. Invoker

1
2
3
4
5
6
7
8
public interface Invoker extends LifeCircle {

default URL getURL() {
return null;
}

Response invoke(Request request);
}

Invoker是上层对于底层一次RPC调用的抽象,其实现类用于管理底层(传输层)的实例。

一个Invoker代表一个可以调用的Channel。dubbo和motan都有自己的Invoker:

dubbo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Invoker<T> extends Node {

/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();

/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;

}

前面说过,Node的作用等同于LifeCircle,这里面的Invocation等同于我们的Request,Result等同于Response。

motan中没有Invoker接口,但有一个类似功能的Caller接口:

1
2
3
4
5
public interface Caller<T> extends Node {

Class<T> getInterface();
Response call(Request request);
}

可以看到除了名称不一样以外,其他的设计基本一样。

4. LoadBalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface LoadBalance {
Invoker select(List<Invoker> invokers);
}

public class RandomLoadBalance implements LoadBalance {

private Random random;

public RandomLoadBalance() {
this.random = new Random();
}

@Override
public Invoker select(List<Invoker> invokers) {
if(invokers.size() == 1) {
return invokers.get(0);
}
return invokers.get(random.nextInt(invokers.size()));
}
}

非常简单的lb以及它的实现类,作用就是从一组可选的Invoker中,根据一定的算法挑选出这一次调用需要使用的Invoker。刚刚说了,Invoker是对一个Channel的封装,更准确的说是对Client的封装,所以一个Invoker在这里可以等同于一个Client来理解。一个Client对应的是一个Server,所以不同的Invoker实际上对应的是不同的Server,所以使用不同的Invoker其实就是将这次请求打到不同的Server,这也是负载均衡这个概念的核心。

5. DefaultInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class DefaultInvoker implements Invoker {

private Client client;
private URL url;

public DefaultInvoker(URL url) {
this.url = url;
Codec codec;
// 检测传输层和编码层各用什么样的实现,如果没有使用默认实现代替
if("netty".equals(url.getString(URL_CONFIG.TRANSPORT))) {
if("susu".equals(url.getString(URL_CONFIG.CODEC))) {
codec = new SusuCodec();
} else {
codec = new SusuCodec();
}
client = new NettyClient(codec, url);
} else {
client = new NettyClient(new SusuCodec(), url);
}
}

@Override
public Response invoke(Request request) {
// 先进行最简单的实现
return client.invoke(request);
}

@Override
public URL getURL() {
return url;
}

@Override
public void init() {
client.open();
}

@Override
public void destroy() {
client.close();
}

@Override
public boolean isAvailable() {
return client.isOpened();
}
}

DefaultInvoker是invoker的默认实现,里面根据URL中的配置决定使用什么样的传输层实现,我们默认使用SusuCodec和Netty的实现。

6. DefaultCluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class DefaultCluster implements Cluster {

private LoadBalance loadBalance;
private List<Invoker> invokers;
private List<URL> urls;
private URL url;

private volatile boolean init = false;

/**
* 没有传入urls, 会去registry中拿地址
*/
public DefaultCluster(URL url) {
this.url = url;
}

public DefaultCluster(URL url, List<URL> urls) {
this.urls = urls;
this.url = url;
}

@Override
public Response invoke(Request request) {
if(!init) {
throw new SusuException("DefaultCluster: status error when invoke(), init: " + init);
}
if(invokers == null || invokers.size() == 0) {
throw new SusuException("DefaultCluster: invokers is empty");
}
Invoker invoker = loadBalance.select(invokers);
if(!invoker.isAvailable()) {
invoker.init();
}
return invoker.invoke(request);
}

@Override
public void init() {
// 初始化负载均衡策略
if("random".equals(url.getString(URL_CONFIG.LOAD_BALANCE))) {
loadBalance = new RandomLoadBalance();
} else {
loadBalance = new RandomLoadBalance();
}

// 根据urls构建invoker
if(urls != null && urls.size() > 0) {
invokers = urls.stream().map(this::getInvokerFromUrl).collect(Collectors.toList());
}
init = true;
}

@Override
public void destroy() {
for(Invoker invoker : invokers) {
if(invoker.isAvailable()) {
invoker.destroy();
}
}
init = false;
}

@Override
public boolean isAvailable() {
return init;
}

@Override
public void notify(URL registryUrl, List<URL> urls) {
this.urls = urls;
}

/**
* 创建Invoker的逻辑先独立出来再说,之后可能会有很复杂的创建逻辑。
*/
private Invoker getInvokerFromUrl(URL url) {
return new DefaultInvoker(url);
}
}

持有lb的实现,以及Invoker的列表,Cluster本身也继承了Invoker接口,因为Cluster本身是一种特殊的Invoker,Cluster对于上层来说也是一个Invoker,只不过里面封装了一些请求过滤和负载均衡的工作,对于上层来说,Cluster和Invoker干的活是一样的,都是发起一次请求,获得一个返回。

Cluster管理了不同的Invoker的生命周期,并且使用了懒连接的方式:只有当一个Invoker真正被lb选中时才建立真正的连接。这样对于一致性哈希这样的负载均衡算法来说可以节约大量的连接资源—— 因为一致性哈希在原链接不出问题的情况下,永远会使用同一个连接,也就是同一个Invoker。

当上层像调用Invoker一样调用Cluster的invoke方法时,Cluster会先使用持有的lb从备选的Invoker组中选出一个,然后再进行真正的请求。

notify方法之后将Registry层的时候再细讲。

好了,Cluster层到这里也讲完啦~我实现了一个非常简单的Cluster层,我们来回顾一下Cluster层干了什么:首先使用Invoker去抽象底层的Channel,然后向上提供一个特殊的Invoker—— 一个Cluster去调用,Cluster会持有一个Invoker列表,一个Invoker映射一个Server,然后Cluster会根据负载均衡算法去调用真正的Invoker,将请求打到对应的Server上。

四、Registry层、Config层

由于篇幅的关系,Registry层、Config层下篇博客再讲啦~

总结一下,这篇博客讲了如何在上篇博客基本实现功能了的基础上,进行抽象、分层、扩展。可以看到,基于URL配置的方式可以在不同层非常方便是使用不同的策略去初始不同的实现,比如lb、codec、client、server。虽然每层接口我们都只提供了一种实现,但是扩展起来可以想象到会非常的方便,你甚至可以使用不同的client和server的传输层实现rpc,但是codec必须一样哦!

本篇博客还讲了Channel和Cluster的接口是如何实现以及如何设计的,这两个接口分别对应transport层和cluster层的最高抽象接口。

LifeCircle的接口做了简单介绍,但是实现没有细讲,因为这部分看代码就好了。

下篇博客我会详细介绍Registry层、Config层,当然重点是Registry层。

最新项目地址:susu

我不喝咖啡,但是我相信知识有价。