从零手撸一个Rpc框架-4-Registry层、Config层

上篇我们讲了重构后的transport层和Cluster层,这篇博客讲一讲Registry层和Config层,Registry层使用了Zookeeper的实现。

GitHub项目地址:susu

话不多说,我们直接开始看吧~

一、Registry层

registry层有两个基本接口:Registry和NotifyListener。

1. Registry

1
2
3
4
5
6
7
public interface Registry extends LifeCircle {

void register(URL url);
void unregister(URL url);
void subscribe(URL url, NotifyListener listener);
void unsubscribe(URL url, NotifyListener listener);
}

2. NotifyListener

1
2
3
public interface NotifyListener {
void notify(URL registryUrl, List<URL> urls);
}

功能非常的明显。这里url作为统一的配置总线很让人疑惑,需要好好分析一下不同的方法传入的URL参数有什么不同:

调用register,一般来说是Server端调用的,Server端将所有的东西都准备好之后,将自己的URL注册到注册中心中,URL中至少需要包含一下内容:

  • 服务的名称,即暴露的接口名,这个接口名字是包含包名全路径的。
  • 服务所属的组,以后随着暴露出的接口越来越多,不同工程或者不同业务所暴露出来的接口需要分组管理。
  • 服务版本,表示当前服务所提供的版本,客户端需要根据版本信息判断该服务能否提供服务。
  • URL中需要标识该URL是属于服务端的URL还是客户端的URL。
  • 服务协议,客户端需要判断协议是否匹配
  • 最后,需要该服务的ip地址和端口号。

调用subscribe,一般来说subscribe是client端调用的,用来订阅一个目录,那这里传入的URL是什么呢?是服务端的URL吗?我要订阅一个服务,那我传入一个服务端的URL,听起来没错,但是细想是有问题的,第一,客户端调用这个方法怎么知道服务端的url呢?第二,这里订阅一个目录需要URL吗?其实不需要,只需要目录名就行。那么问题来了,client端怎么获得目录名呢?

这里需要先明确一下注册中心的目录结构,从上面Server端url的结构可以看出一些端倪,服务端目录结构至少由这几个层次:服务组,服务接口名(interface name),服务ip&port。

/ [server group] / [interface name] / [ip&port&settings]

那么client端需要订阅的其实只有服务组和服务名就可以了,上篇博客已经先简单的分析过URL的结构了,一组对应的Client和Server的URL都是有相同的服务接口名称的,这是必须的,服务接口名就是我们定义的服务接口的全路径名称。所以Client端是可以轻松拿到这两个字段的,至于组,直接在URL中设置就行了。

这么看来调用subscribe的时候,传入的url是客户端自己的url,然后程序根据client端的url订阅到具体的目录下。

这么看来,调用subscribe貌似没必要传入URL这种包含这么多信息的对象,只需要传入组名和服务名就可以了,这样的话程序逻辑就非常清晰了。但是dubbo除了单单subscribe以外,还干了一件事,就是同时将client端的url注册到了另一个目录下,这样做的目的我估计是两点:一,方便监控Client端的在线信息;二,Server端以至于其他任何终端,都可以发现Client端并主动与Client端通信。

好了,到这里就介绍完了Registry中两个主要的方法(register和subscribe)以及他们的参数URL需要怎么传。

在subscribe方法中还会传一个回调,这个回调就是NotifyListener,用来当订阅的节点目录发生变化时,通知这个NotifyListener,可以想象到,实现这个NotifyListener的类,需要能够知道服务端暴露信息的节点的变化,并拿到变化后的服务端的全量url,从而更新自己的服务列表,对了,有这个需求的对象就是Cluster,很明显Cluster需要知道服务端的变化,并动态更新自己的Invoker可用列表,加入新的可以用的服务并销毁不再可用的服务。

看了上篇博客的朋友们肯定知道,Cluster是继承了NotifyListener接口的。

这里有个小问题:这里发生了反向依赖,即cluster层依赖了更高层registry层的接口NotifyListener,但是NotifyListener接口其实可以不属于registry层,可以作为core层的基础接口,这样就解决了反向依赖的问题,但是还是有点不和谐,如果你们有更好的建议,可以给我留言或者提PR。

3. ZookeeperRegistry

这个类实现了Zookeeper版本的注册中心的实现。

先贴一下目录:

/ [server group] / [interface name] / [ip&port&settings]

常用的Zookeeper有两种:zkClient和Curator,客户端有一些好处,它封装了Zookeeper的原生api,提供了更加方便的接口,并在内部做了一些重连重试等功能的封装。

dubbo将zookeeper的实现也进行了抽象,分别支持了不同的Zookeeper客户端,并可可以配置。motan就更加直接一些,直接使用了zkClient。我使用的是Curator。

在看这段代码之前,大家可能需要先熟悉一下zookeeper的使用和Curator的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class ZookeeperRegistry implements Registry {

private static final String PROVIDERS = "providers";
private static final String CONSUMERS = "consumers";
private static final Set<CuratorEventType> interested = new HashSet<>();

static {
// 只关心创建节点和删除节点
interested.add(CuratorEventType.CREATE);
interested.add(CuratorEventType.DELETE);
}

private URL registryUrl;
private CuratorFramework client;

public ZookeeperRegistry(URL url) {
registryUrl = url;
init();
}

@Override
public void init() {
// todo: RetryPolicy 支持配置
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
// todo : 账号密码支持
client = CuratorFrameworkFactory.builder()
.connectString(registryUrl.getIpPortString())
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.build();

client.start();
}

@Override
public void destroy() {
if (client != null) {
client.close();
}
}

@Override
public boolean isAvailable() {
if (client == null) {
return false;
}
CuratorFrameworkState state = client.getState();
return state == CuratorFrameworkState.STARTED;
}

@Override
public void register(URL url) {
checkClientStatus();
String path = buildPath(url);
if (!exist(path)) {
buildPath(path);
}
path += SusuConstants.PATH_SEP + encodeUrl(url.getUrlString());
ephemeralPath(path);
}

@Override
public void unregister(URL url) {
checkClientStatus();
String path = buildPath(url) + SusuConstants.PATH_SEP + url.getUrlString();
delete(path);
}

@Override
public void subscribe(URL url, NotifyListener listener) {
checkClientStatus();
String path = buildServerPath(url);

// 先同步的回调一次
try {
List<String> urls = client.getChildren().forPath(buildServerPath(url) + SusuConstants.PATH_SEP + PROVIDERS);
listener.notify(this.registryUrl, urls.stream()
.map(this::decodeUrl)
.map(URL::parse)
.collect(Collectors.toList()));
} catch (Exception e) {
throw new RegistryException("ZookeeperRegistry: getChildren error", e);
}

// 注册监听器
CuratorListener curatorListener = ((client0, event) -> {
if (interested.contains(event.getType())) {
if (event.getPath() != null && event.getPath().startsWith(path)) {
System.out.println(event);// todo: 调试删除
List<String> urls = event.getChildren();
listener.notify(this.registryUrl, urls.stream()
.map(this::decodeUrl)
.map(URL::parse)
.collect(Collectors.toList()));
}
}
});
client.getCuratorListenable().addListener(curatorListener);
}

private void checkClientStatus() {
if (!isAvailable()) {
throw new RegistryException(
"ZookeeperRegistry: registry unavailable, url: " + registryUrl.getIpPortString());
}
}

private String buildPath(URL url) {
String root = url.getString(URL_CONFIG.ROOT);
String group = url.getString(URL_CONFIG.GROUP);
String path = url.getPath();
String serverOrClient = url.getBoolean(URL_CONFIG.IS_SERVER) ? PROVIDERS : CONSUMERS;
return root
+ SusuConstants.PATH_SEP + group
+ SusuConstants.PATH_SEP + path
+ SusuConstants.PATH_SEP + serverOrClient;
}

... // 略,具体代码见github
}

首先我们实现了LifeCircle中的三个方法,实现了registry的初始化,状态检查以及销毁,后面展示了两个主要方法的实现。

register方法

首先做一下状态检查,然后检查服务路径是否存在,如果不存在这个服务的路径,则先创建一个永久服务路径,即:

/ [server group] / [interface name]

然后创建一个临时路径保存当前的url信息。这里要用临时节点,因为不是每次服务退出的时候都能保证去手动清理掉自己在注册中心注册的服务信息,比如服务突然断线,这样就没有时间让服务自己清理,这时候如果这个节点是永久节点,那客户端就会拿到一个已经下线的服务的url,从而造成服务失败。zookeeper可以在断线后自动清理掉临时节点。

本地测试,注册服务之前和注册之后:

注册服务之前和注册之后

服务root是/susu,组名是default,服务名是com.nowcoder.first.IService,ip&port&setting是下面经过编码后的url。

subscribe方法

subscribe方法比较简单,首先将目录下的所有url读出来先回调一次NotifyListener,保证在调用subscribe方法的时候可以同步获取数据。然后通过Curator注册了一个zookeeper的watcher,用来通知NotifyListener节点的变化。

这里我们只关心两个事件:节点create和delete信息。注意这里订阅的节点层级,不能多不能少,不然获得的回调信息会有问题。

回调事件触发的时候,我们还需要检查触发的节点路径是否是和当前监听的Url想匹配的服务,如果不是将会忽略。

后面的几个方法register和unsubscribe等就不赘述了。

二、Config层

config层太简单了,暂时还没啥好些的,嘻嘻~之后再补充吧。

三、总结

这波代码更新一共2200行,应该说还是加了不少东西。

问题也有很多,目前大多数优化和修改都是在Client这边,Server那边仍然是最蠢的代码实现,比如多个服务同时暴露在同一个端口时如何路由等功能都还没有。

URL这个东西也是很闹心,感觉这个东西相当于传了一个map,一个map在一个框架里面从头到脚的作为一个参数去传太难受了,这个问题我要想办法解决一下。

日志目前还是一片空白,也还没有做。

Filter也还没有支持,没有想到要支持哪些东西,因为东西都太杂了太小了,没有大的功能需要支持了。

还有一个挺重要的,对Object的各种方法需要处理一下。

我想把URL、Config以及服务端的路由功能做了之后先测试一波,看看有没有明显的问题再说吧~

如果你有好的建议欢迎提出来~

BTW,SPI这个东西暂时没有支持的打算了,感觉暂时不需要。

GitHub项目地址:susu