注册中心
# 服务定义
@Builder
public class ServiceDefinition implements Serializable {
private static final long serialVersionUID = -8263365765897285189L;
private String uniqueId;
private String serviceId;
private String version;
private String protocol;
private String patternPath;
private String envType;
private boolean enable = true;
/*服务列表信息 k:invokerPath v:ServiceInvoker */
private Map<String,ServiceInvoker> invokerMap;
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 服务实例
public class ServiceInstance implements Serializable {
private static final long serialVersionUID = -7559569289189228478L;
/*ip+port*/
protected String serviceInstanceId;
protected String uniqueId;
protected String ip;
protected int port;
protected String tags;
protected Integer weight;
/*后序负载均衡,预热用到*/
protected long registerTime;
protected boolean enable = true;
protected String version;
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 整合Nacos实现服务注册与服务订阅
引入Nacos客户端依赖:
<dependencies>
<!-- 添加Nacos客户端依赖 -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.0.4</version> <!-- 请替换为实际的最新版本号 -->
</dependency>
<!-- 添加日志依赖,因为nacos-client使用SLF4J记录日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version> <!-- 请替换为实际的最新版本号 -->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version> <!-- 请替换为实际的最新版本号 -->
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
将与注册中心有关的服务抽取到一个接口中:
public interface RegisterCenter {
/**
* 初始化
* @param registerAddress 注册中心地址
* @param env 要注册到的环境
*/
void init(String registerAddress, String env);
/**
* 注册
* @param serviceDefinition 服务定义信息
* @param serviceInstance 服务实例信息
*/
void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance);
/**
* 注销
* @param serviceDefinition
* @param serviceInstance
*/
void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance);
/**
* 订阅所有服务变更
* @param registerCenterListener
*/
void subscribeAllServices(RegisterCenterListener registerCenterListener);
}
public interface RegisterCenterListener {
void onChange(ServiceDefinition serviceDefinition, Set<ServiceInstance> serviceInstanceSet);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 服务注册订阅
服务注册的伪代码:基于NamingService,调用registerInstance方法将服务实例注册上去;
import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.Instance; public class NacosRegisterDemo { public static void main(String[] args) { try { // 设置Nacos地址 String serverAddr = "127.0.0.1:8848"; // 创建命名服务实例,用于服务注册 NamingService namingService = NacosFactory.createNamingService(serverAddr); // 创建服务实例 Instance instance = new Instance(); instance.setIp("你的服务IP"); // 服务实例IP instance.setPort(你的服务端口); // 服务实例端口 instance.setServiceName("你的服务名称"); // 服务名称 instance.setClusterName("你的服务集群名"); // 服务所在集群 // 添加其他元数据 instance.addMetadata("version", "1.0"); instance.addMetadata("env", "production"); // 注册服务 namingService.registerInstance("你的服务名称", instance); System.out.println("服务注册成功"); } catch (NacosException e) { // 异常处理 e.printStackTrace(); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NacosRegisterCenter implements RegisterCenter {
private String registerAddress;/*localhost:8848*/
private String env;
private NamingService namingService;/*实例信息*/
private NamingMaintainService namingMaintainService;/*定义信息*/
private List<RegisterCenterListener> registerCenterListenerList = new CopyOnWriteArrayList<>();
@Override
public void init(String registerAddress, String env) {
this.registerAddress = registerAddress;
this.env = env;
try {
this.namingMaintainService = NamingMaintainFactory.createMaintainService(registerAddress);
this.namingService = NamingFactory.createNamingService(registerAddress);
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
@Override
public void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
try {
Instance nacosInstance = new Instance();
nacosInstance.setInstanceId(serviceInstance.getServiceInstanceId());
nacosInstance.setPort(serviceInstance.getPort());
nacosInstance.setIp(serviceInstance.getIp());
nacosInstance.setMetadata(Map.of(GatewayConst.META_DATA_KEY, JSON.toJSONString(serviceInstance)));
/*注册*/
namingService.registerInstance(serviceDefinition.getServiceId(), env, nacosInstance);
/*更新服务定义*/
namingMaintainService.updateService(serviceDefinition.getServiceId(), env, 0, Map.of(GatewayConst.META_DATA_KEY, JSON.toJSONString(serviceDefinition)));
log.info("register {} {}", serviceDefinition, serviceInstance);
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
@Override
public void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
try {
namingService.registerInstance(serviceDefinition.getServiceId(), env, serviceInstance.getIp(), serviceInstance.getPort());
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
@Override
public void subscribeAllServices(RegisterCenterListener registerCenterListener) {
registerCenterListenerList.add(registerCenterListener);
//检查Nacos服务列表与当前已订阅服务的差异,并订阅任何新的服务
doSubscribeAllServices();
//可能有新服务加入,所以需要有一个定时任务来检查
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1, new NameThreadFactory(
"doSubscribeAllServices"));
//循环执行服务发现与订阅操作
scheduledThreadPool.scheduleWithFixedDelay(() -> doSubscribeAllServices(), 10, 10, TimeUnit.SECONDS);
}
private void doSubscribeAllServices() {
try {
/*已经订阅的服务*/
Set<String> subscribeService = namingService.getSubscribeServices().stream().map(ServiceInfo::getName).collect(Collectors.toSet());
/*分页从nacos中获取*/
int pageNo = 1;
int pageSize = 100;
List<String> serviceList = namingService.getServicesOfServer(pageNo, pageSize, env).getData();
while(CollectionUtils.isNotEmpty(serviceList)){
log.info("service list size {}", serviceList.size());
for(String service:serviceList){
if(subscribeService.contains(service)){
continue;
}
/*nacos事件监听器*/
EventListener eventListener = new NacosRegisterListener();
eventListener.onEvent(new NamingEvent(service,null));
namingService.subscribe(service,env,eventListener);
log.info("subscribe {} {}", service, env);
}
serviceList = namingService.getServicesOfServer(++pageNo,pageSize,env).getData();
}
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
/**
* 实现对nacos事件的监听器 这个事件监听器会在Nacos发生事件变化的时候进行回调
* NamingEvent 是一个事件对象,用于表示与服务命名空间(Naming)相关的事件。
* NamingEvent 的作用是用于监听和处理命名空间中的服务实例(Service Instance)的变化,
* 以便应用程序可以根据这些变化来动态地更新服务实例列表,以保持与注册中心的同步。
*/
public class NacosRegisterListener implements EventListener {
@Override
public void onEvent(Event event) {
if(event instanceof NamingEvent){
NamingEvent namingEvent = (NamingEvent)event;
String serviceName = namingEvent.getServiceName();
try {
/*获取服务定义信息*/
Service service = namingMaintainService.queryService(serviceName, env);
ServiceDefinition serviceDefinition = JSON.parseObject(service.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceDefinition.class);
/*获取服务实例信息*/
List<Instance> allInstances = namingService.getAllInstances(service.getName(), env);
HashSet<ServiceInstance> set = new HashSet<>();
for(Instance instance : allInstances){
ServiceInstance serviceInstance = JSON.parseObject(instance.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceInstance.class);
set.add(serviceInstance);
}
registerCenterListenerList.stream().forEach(l->l.onChange(serviceDefinition,set));
} catch (NacosException e) {
throw new RuntimeException(e);
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
接口说明:
NamingMaintainService:维护服务信息包括更新,查询,删除,NamingService接口用于服务的注册和发现;
- 更新服务信息:可以更新一个服务的元数据,这包括服务的保护阈值、元数据等。
- 查询服务信息:可以查询服务的当前配置状态,以便进行审查或者其他操作。
- 删除服务信息:如果一个服务不再需要在注册中心注册,可以使用 NamingMaintainService 将其删除。
subscribeAllServices方法执行流程:
将监听事件添加到本地列表;
将参数中的registerCenterListener添加到registerCenterListenerList中,这个列表维护了所有的监听器,这些监听器将对Nacos服务中心的变化做出响应;
执行服务订阅逻辑;
doSubscribeAllServices:检查Nacos服务列表与当前已订阅服务的差异,并订阅任何新的服务;
定时任务检查服务变更;
定时执行的线程池scheduledThreadPool,周期性调用doSubscribeAllServices;
处理服务订阅更新;
如果发现新服务,创建一个新的EventListener,并用它订阅这个服务的变化。一旦服务状态有变化,就会触发事件,然后通过NamingEvent事件传递给所有监听器;
事件监听与变更通知;
NacosRegisterListener内部类中定义的onEvent方法会在每个服务变化时被调用,当onEvent方法被触发时,它会从Nacos服务中心查询服务的当前定义和实例信息,并通知所有注册的RegisterCenterListener监听器,这样客户端就可以采取相应的动作,如更新其内部服务列表、重新负载均衡等;
Bootstrap中容器启动后加载注册中心:
private static RegisterCenter registerAndSubscribe(Config config) {
ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
final RegisterCenter registerCenter = serviceLoader.findFirst().orElseThrow(() -> {
log.error("not found RegisterCenter impl");
return new RuntimeException("not found RegisterCenter impl");
});
registerCenter.init(config.getRegistryAddress(), config.getEnv());
/*注册*/
ServiceDefinition serviceDefinition = buildGatewayServiceDefinition(config);
ServiceInstance serviceInstance = buildGatewayServiceInstance(config);
registerCenter.register(serviceDefinition,serviceInstance);
/*订阅*/
registerCenter.subscribeAllServices((serviceDefinition1,serviceInstanceSet)->{
log.info("refresh service and instance: {} {}", serviceDefinition1.getUniqueId(), JSON.toJSON(serviceInstanceSet));
DynamicConfigManager manager = DynamicConfigManager.getInstance();
manager.addServiceInstance(serviceDefinition1.getUniqueId(), serviceInstanceSet);
manager.putServiceDefinition(serviceDefinition1.getUniqueId(),serviceDefinition1);
});
return registerCenter;
}
private static ServiceInstance buildGatewayServiceInstance(Config config) {
String localIp = NetUtils.getLocalIp();
int port = config.getPort();
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setServiceInstanceId(localIp + COLON_SEPARATOR + port);
serviceInstance.setIp(localIp);
serviceInstance.setPort(port);
serviceInstance.setRegisterTime(TimeUtil.currentTimeMillis());
return serviceInstance;
}
private static ServiceDefinition buildGatewayServiceDefinition(Config config) {
ServiceDefinition serviceDefinition = new ServiceDefinition();
serviceDefinition.setInvokerMap(Map.of());
serviceDefinition.setUniqueId(config.getApplicationName());/*?*/
serviceDefinition.setServiceId(config.getApplicationName());
serviceDefinition.setEnvType(config.getEnv());
return serviceDefinition;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40