Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 项目架构
  • 网络通信层
  • 注册中心
    • 服务定义
    • 服务实例
    • 整合Nacos实现服务注册与服务订阅
    • 服务注册订阅
  • 配置中心
  • 过滤器链
  • 路由转发过滤器
  • 重试与限流
  • 熔断与降级
  • 用户鉴权
  • 缓存优化
  • Disruptor缓冲区优化
  • 客户端—dubbo接口
  • 网关上下文
  • 负载均衡
  • API网关
Nreal
2023-12-03
目录

注册中心

# 服务定义

@Builder
public class ServiceDefinition implements Serializable {

	private static final long serialVersionUID = -8263365765897285189L;

	private String uniqueId;

	private String serviceId;

	private String version;

	private String protocol;

	private String patternPath;

	private String envType;

	private boolean enable = true;
	
	/*服务列表信息 k:invokerPath v:ServiceInvoker */
	private Map<String,ServiceInvoker> invokerMap;
    
    ...
        
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 服务实例

public class ServiceInstance implements Serializable {

    private static final long serialVersionUID = -7559569289189228478L;

    /*ip+port*/
    protected String serviceInstanceId;

    protected String uniqueId;

    protected String ip;

    protected int port;

    protected String tags;

    protected Integer weight;

    /*后序负载均衡,预热用到*/
    protected long registerTime;

    protected boolean enable = true;

    protected String version;
    
    ...
        
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 整合Nacos实现服务注册与服务订阅

引入Nacos客户端依赖:

<dependencies>
    <!-- 添加Nacos客户端依赖 -->
    <dependency>
        <groupId>com.alibaba.nacos</groupId>
        <artifactId>nacos-client</artifactId>
        <version>2.0.4</version> <!-- 请替换为实际的最新版本号 -->
    </dependency>
    <!-- 添加日志依赖,因为nacos-client使用SLF4J记录日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version> <!-- 请替换为实际的最新版本号 -->
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version> <!-- 请替换为实际的最新版本号 -->
    </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

将与注册中心有关的服务抽取到一个接口中:

public interface RegisterCenter {

    /**
     *   初始化
     * @param registerAddress  注册中心地址
     * @param env  要注册到的环境
     */
    void init(String registerAddress, String env);

    /**
     * 注册
     * @param serviceDefinition 服务定义信息
     * @param serviceInstance 服务实例信息
     */
    void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance);

    /**
     * 注销
     * @param serviceDefinition
     * @param serviceInstance
     */
    void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance);

    /**
     * 订阅所有服务变更
     * @param registerCenterListener
     */
    void subscribeAllServices(RegisterCenterListener registerCenterListener);
}

public interface RegisterCenterListener {
    void onChange(ServiceDefinition serviceDefinition, Set<ServiceInstance> serviceInstanceSet);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 服务注册订阅

服务注册的伪代码:基于NamingService,调用registerInstance方法将服务实例注册上去;

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;

public class NacosRegisterDemo {
    public static void main(String[] args) {
        try {
            // 设置Nacos地址
            String serverAddr = "127.0.0.1:8848";
            // 创建命名服务实例,用于服务注册
            NamingService namingService = NacosFactory.createNamingService(serverAddr);

            // 创建服务实例
            Instance instance = new Instance();
            instance.setIp("你的服务IP"); // 服务实例IP
            instance.setPort(你的服务端口); // 服务实例端口
            instance.setServiceName("你的服务名称"); // 服务名称
            instance.setClusterName("你的服务集群名"); // 服务所在集群

            // 添加其他元数据
            instance.addMetadata("version", "1.0");
            instance.addMetadata("env", "production");

            // 注册服务
            namingService.registerInstance("你的服务名称", instance);

            System.out.println("服务注册成功");
        } catch (NacosException e) {
            // 异常处理
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NacosRegisterCenter implements RegisterCenter {

    private String registerAddress;/*localhost:8848*/
    private String env;
    private NamingService namingService;/*实例信息*/
    private NamingMaintainService namingMaintainService;/*定义信息*/
    private List<RegisterCenterListener> registerCenterListenerList = new CopyOnWriteArrayList<>();

    @Override
    public void init(String registerAddress, String env) {
        this.registerAddress = registerAddress;
        this.env = env;
        try {
            this.namingMaintainService = NamingMaintainFactory.createMaintainService(registerAddress);
            this.namingService = NamingFactory.createNamingService(registerAddress);
        } catch (NacosException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void register(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
        try {
            Instance nacosInstance = new Instance();
            nacosInstance.setInstanceId(serviceInstance.getServiceInstanceId());
            nacosInstance.setPort(serviceInstance.getPort());
            nacosInstance.setIp(serviceInstance.getIp());
            nacosInstance.setMetadata(Map.of(GatewayConst.META_DATA_KEY, JSON.toJSONString(serviceInstance)));
            /*注册*/
            namingService.registerInstance(serviceDefinition.getServiceId(), env, nacosInstance);
            /*更新服务定义*/
            namingMaintainService.updateService(serviceDefinition.getServiceId(), env, 0, Map.of(GatewayConst.META_DATA_KEY, JSON.toJSONString(serviceDefinition)));
            log.info("register {} {}", serviceDefinition, serviceInstance);
        } catch (NacosException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deregister(ServiceDefinition serviceDefinition, ServiceInstance serviceInstance) {
        try {
            namingService.registerInstance(serviceDefinition.getServiceId(), env, serviceInstance.getIp(), serviceInstance.getPort());
        } catch (NacosException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void subscribeAllServices(RegisterCenterListener registerCenterListener) {
        registerCenterListenerList.add(registerCenterListener);
        //检查Nacos服务列表与当前已订阅服务的差异,并订阅任何新的服务
        doSubscribeAllServices();

        //可能有新服务加入,所以需要有一个定时任务来检查
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1, new NameThreadFactory(
                "doSubscribeAllServices"));
        //循环执行服务发现与订阅操作
        scheduledThreadPool.scheduleWithFixedDelay(() -> doSubscribeAllServices(), 10, 10, TimeUnit.SECONDS);
    }

    private void doSubscribeAllServices() {
        try {
            /*已经订阅的服务*/
            Set<String> subscribeService = namingService.getSubscribeServices().stream().map(ServiceInfo::getName).collect(Collectors.toSet());
            /*分页从nacos中获取*/
            int pageNo = 1;
            int pageSize = 100;
            List<String> serviceList = namingService.getServicesOfServer(pageNo, pageSize, env).getData();
            while(CollectionUtils.isNotEmpty(serviceList)){
                log.info("service list size {}", serviceList.size());
                for(String service:serviceList){
                    if(subscribeService.contains(service)){
                        continue;
                    }
                    /*nacos事件监听器*/
                    EventListener eventListener = new NacosRegisterListener();
                    eventListener.onEvent(new NamingEvent(service,null));
                    namingService.subscribe(service,env,eventListener);
                    log.info("subscribe {} {}", service, env);
                }
                serviceList = namingService.getServicesOfServer(++pageNo,pageSize,env).getData();
            }
        } catch (NacosException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 实现对nacos事件的监听器 这个事件监听器会在Nacos发生事件变化的时候进行回调
     * NamingEvent 是一个事件对象,用于表示与服务命名空间(Naming)相关的事件。
     * NamingEvent 的作用是用于监听和处理命名空间中的服务实例(Service Instance)的变化,
     * 以便应用程序可以根据这些变化来动态地更新服务实例列表,以保持与注册中心的同步。
     */
    public class NacosRegisterListener implements EventListener {

        @Override
        public void onEvent(Event event) {
            if(event instanceof NamingEvent){
                NamingEvent namingEvent = (NamingEvent)event;
                String serviceName = namingEvent.getServiceName();
                try {
                    /*获取服务定义信息*/
                    Service service = namingMaintainService.queryService(serviceName, env);
                    ServiceDefinition serviceDefinition = JSON.parseObject(service.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceDefinition.class);
                    /*获取服务实例信息*/
                    List<Instance> allInstances = namingService.getAllInstances(service.getName(), env);
                    HashSet<ServiceInstance> set = new HashSet<>();
                    for(Instance instance : allInstances){
                        ServiceInstance serviceInstance = JSON.parseObject(instance.getMetadata().get(GatewayConst.META_DATA_KEY), ServiceInstance.class);
                        set.add(serviceInstance);
                    }
                    registerCenterListenerList.stream().forEach(l->l.onChange(serviceDefinition,set));
                } catch (NacosException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

接口说明:

NamingMaintainService:维护服务信息包括更新,查询,删除,NamingService接口用于服务的注册和发现;

  • 更新服务信息:可以更新一个服务的元数据,这包括服务的保护阈值、元数据等。
  • 查询服务信息:可以查询服务的当前配置状态,以便进行审查或者其他操作。
  • 删除服务信息:如果一个服务不再需要在注册中心注册,可以使用 NamingMaintainService 将其删除。

subscribeAllServices方法执行流程:

  1. 将监听事件添加到本地列表;

    将参数中的registerCenterListener添加到registerCenterListenerList中,这个列表维护了所有的监听器,这些监听器将对Nacos服务中心的变化做出响应;

  2. 执行服务订阅逻辑;

    doSubscribeAllServices:检查Nacos服务列表与当前已订阅服务的差异,并订阅任何新的服务;

  3. 定时任务检查服务变更;

    定时执行的线程池scheduledThreadPool,周期性调用doSubscribeAllServices;

  4. 处理服务订阅更新;

    如果发现新服务,创建一个新的EventListener,并用它订阅这个服务的变化。一旦服务状态有变化,就会触发事件,然后通过NamingEvent事件传递给所有监听器;

  5. 事件监听与变更通知;

    NacosRegisterListener内部类中定义的onEvent方法会在每个服务变化时被调用,当onEvent方法被触发时,它会从Nacos服务中心查询服务的当前定义和实例信息,并通知所有注册的RegisterCenterListener监听器,这样客户端就可以采取相应的动作,如更新其内部服务列表、重新负载均衡等;

Bootstrap中容器启动后加载注册中心:

private static RegisterCenter registerAndSubscribe(Config config) {
    ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
    final RegisterCenter registerCenter = serviceLoader.findFirst().orElseThrow(() -> {
        log.error("not found RegisterCenter impl");
        return new RuntimeException("not found RegisterCenter impl");
    });
    registerCenter.init(config.getRegistryAddress(), config.getEnv());
    /*注册*/
    ServiceDefinition serviceDefinition = buildGatewayServiceDefinition(config);
    ServiceInstance serviceInstance = buildGatewayServiceInstance(config);
    registerCenter.register(serviceDefinition,serviceInstance);
    /*订阅*/
    registerCenter.subscribeAllServices((serviceDefinition1,serviceInstanceSet)->{
        log.info("refresh service and instance: {} {}", serviceDefinition1.getUniqueId(), JSON.toJSON(serviceInstanceSet));
        DynamicConfigManager manager = DynamicConfigManager.getInstance();
        manager.addServiceInstance(serviceDefinition1.getUniqueId(), serviceInstanceSet);
        manager.putServiceDefinition(serviceDefinition1.getUniqueId(),serviceDefinition1);
    });
    return registerCenter;
}

private static ServiceInstance buildGatewayServiceInstance(Config config) {
    String localIp = NetUtils.getLocalIp();
    int port = config.getPort();
    ServiceInstance serviceInstance = new ServiceInstance();
    serviceInstance.setServiceInstanceId(localIp + COLON_SEPARATOR + port);
    serviceInstance.setIp(localIp);
    serviceInstance.setPort(port);
    serviceInstance.setRegisterTime(TimeUtil.currentTimeMillis());
    return serviceInstance;
}

private static ServiceDefinition buildGatewayServiceDefinition(Config config) {
    ServiceDefinition serviceDefinition = new ServiceDefinition();
    serviceDefinition.setInvokerMap(Map.of());
    serviceDefinition.setUniqueId(config.getApplicationName());/*?*/
    serviceDefinition.setServiceId(config.getApplicationName());
    serviceDefinition.setEnvType(config.getEnv());
    return serviceDefinition;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
网络通信层
配置中心

← 网络通信层 配置中心→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式