Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 项目架构
  • 网络通信层
  • 注册中心
  • 配置中心
  • 过滤器链
  • 路由转发过滤器
  • 重试与限流
  • 熔断与降级
  • 用户鉴权
  • 缓存优化
  • Disruptor缓冲区优化
  • 客户端—dubbo接口
  • 网关上下文
    • 网关上下文
    • 包装请求类
    • 包装网关上下文
    • 启动类
    • 动态配置管理类
  • 负载均衡
  • API网关
Nreal
2024-04-05
目录

网关上下文

# 网关上下文

public class GatewayContext extends BasicContext{

    private GatewayRequest request;

    private GatewayResponse response;

    private Rule rule;

    private int currentRetryTimes;
    
    ...
        
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 包装请求类

RequestHelper#doRequest()

private static GatewayRequest doRequest(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx) {
    HttpHeaders headers = fullHttpRequest.headers();
    /*从header头获取必须要传入的关键属性 uniqueId*/
    String uniqueId = headers.get(GatewayConst.UNIQUE_ID);
    String host = headers.get(HttpHeaderNames.HOST);
    HttpMethod method = fullHttpRequest.method();
    String uri = fullHttpRequest.uri();
    String clientIp = getClientIp(ctx, fullHttpRequest);
    String contentType = HttpUtil.getMimeType(fullHttpRequest) == null ? null : HttpUtil.getMimeType(fullHttpRequest).toString();
    Charset charset = HttpUtil.getCharset(fullHttpRequest, StandardCharsets.UTF_8);
    GatewayRequest gatewayRequest = new GatewayRequest(uniqueId,
            charset,
            clientIp,
            host, 
            uri, 
            method,
            contentType,
            headers,
            fullHttpRequest);
    return gatewayRequest;
}

private static String getClientIp(ChannelHandlerContext ctx, FullHttpRequest request) {
    String xForwardedValue = request.headers().get(BasicConst.HTTP_FORWARD_SEPARATOR);
    String clientIp = null;
    if(StringUtils.isNotEmpty(xForwardedValue)) {
        List<String> values = Arrays.asList(xForwardedValue.split(", "));
        if(values.size() >= 1 && StringUtils.isNotBlank(values.get(0))) {
            clientIp = values.get(0);
        }
    }
    if(clientIp == null) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
        clientIp = inetSocketAddress.getAddress().getHostAddress();
    }
    return clientIp;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 包装网关上下文

RequestHelper#doContext()

public static GatewayContext doContext(FullHttpRequest request, ChannelHandlerContext ctx) {
    GatewayRequest gateWayRequest = doRequest(request, ctx);
    /*根据请求对象里的uniqueId,获取资源服务信息(也就是服务定义信息)*/
//		ServiceDefinition serviceDefinition = ServiceDefinition.builder()
//				.serviceId("demo")
//				.enable(true)
//				.version("v1")
//				.patternPath("**")
//				.envType("dev")
//				.protocol(GatewayProtocol.HTTP)
//				.build();
    ServiceDefinition serviceDefinition = DynamicConfigManager.getInstance().getServiceDefinition(gateWayRequest.getUniquedId());
    /*根据请求对象获取服务定义对应的方法调用,然后获取对应的规则*/
    ServiceInvoker serviceInvoker = new HttpServiceInvoker();
    serviceInvoker.setInvokerPath(gateWayRequest.getPath());
    serviceInvoker.setTimeout(500);
    /*根据请求对象获取规则*/
    Rule rule = getRule(gateWayRequest,serviceDefinition.getServiceId());
    /*构建GateWayContext对象*/
    GatewayContext gatewayContext = new GatewayContext(
            serviceDefinition.getProtocol(),
            ctx,
            HttpUtil.isKeepAlive(request),
            gateWayRequest,
            rule,0);
//		gatewayContext.getRequest().setModifyHost("127.0.0.1:8080");
    return gatewayContext;
}

private static Rule getRule(GatewayRequest gateWayRequest,String serviceId){
    String key = serviceId + "." + gateWayRequest.getPath();
    Rule rule = DynamicConfigManager.getInstance().getRuleByPath(key);
    if (rule != null){
        return rule;
    }
    return DynamicConfigManager
            .getInstance()
            .getRuleByServiceId(serviceId)
            .stream().filter(r -> gateWayRequest.getPath().startsWith(r.getPrefix()))
            .findAny().orElseThrow(()-> new ResponseException(PATH_NO_MATCHED));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 启动类

public class Bootstrap {

    public static void main(String[] args) {

        /*加载网关核心静态配置*/
        /*优先级:运行参数 -> jvm参数 -> 环境变量 -> 配置文件 -> 配置对象对默认值*/
        Config config = ConfigLoader.getInstance().load(args);
        System.out.println(config.getPort());

        /*加载配置中心并监听*/
        ServiceLoader<ConfigCenter> serviceLoader = ServiceLoader.load(ConfigCenter.class);
        final ConfigCenter configCenter = serviceLoader.findFirst().orElseThrow(() -> {
            log.error("not found ConfigCenter impl");
            return new RuntimeException("not found ConfigCenter impl");
        });
        configCenter.init(config.getRegistryAddress(), config.getEnv());
        configCenter.subscribeRulesChange(rules-> DynamicConfigManager.getInstance().putAllRule(rules));
//        configCenter.subscribeDtpChange(dtps-> DynamicConfigManager.getInstance().updateDtp(dtps));

        /*容器启动*/
        Container container = new Container(config);
        container.start();

        /*加载注册中心*/
        final RegisterCenter registerCenter = registerAndSubscribe(config);

        /*服务优雅关机*/
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                registerCenter.deregister(buildGatewayServiceDefinition(config),
                        buildGatewayServiceInstance(config));
                container.shutdown();
            }
        });

    }

    private static RegisterCenter registerAndSubscribe(Config config) {
        ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
        final RegisterCenter registerCenter = serviceLoader.findFirst().orElseThrow(() -> {
            log.error("not found RegisterCenter impl");
            return new RuntimeException("not found RegisterCenter impl");
        });
        registerCenter.init(config.getRegistryAddress(), config.getEnv());
        /*注册*/
        ServiceDefinition serviceDefinition = buildGatewayServiceDefinition(config);
        ServiceInstance serviceInstance = buildGatewayServiceInstance(config);
        registerCenter.register(serviceDefinition,serviceInstance);
        /*订阅*/
        registerCenter.subscribeAllServices((serviceDefinition1,serviceInstanceSet)->{
            log.info("refresh service and instance: {} {}", serviceDefinition1.getUniqueId(), JSON.toJSON(serviceInstanceSet));
            DynamicConfigManager manager = DynamicConfigManager.getInstance();
            manager.addServiceInstance(serviceDefinition1.getUniqueId(), serviceInstanceSet);
            manager.putServiceDefinition(serviceDefinition1.getUniqueId(),serviceDefinition1);
        });
        return registerCenter;
    }

    private static ServiceInstance buildGatewayServiceInstance(Config config) {
        String localIp = NetUtils.getLocalIp();
        int port = config.getPort();
        ServiceInstance serviceInstance = new ServiceInstance();
        serviceInstance.setServiceInstanceId(localIp + COLON_SEPARATOR + port);
        serviceInstance.setIp(localIp);
        serviceInstance.setPort(port);
        serviceInstance.setRegisterTime(TimeUtil.currentTimeMillis());
        return serviceInstance;
    }

    private static ServiceDefinition buildGatewayServiceDefinition(Config config) {
        ServiceDefinition serviceDefinition = new ServiceDefinition();
        serviceDefinition.setInvokerMap(Map.of());
        serviceDefinition.setUniqueId(config.getApplicationName());/*?*/
        serviceDefinition.setServiceId(config.getApplicationName());
        serviceDefinition.setEnvType(config.getEnv());
        return serviceDefinition;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

# 动态配置管理类

@NoArgsConstructor
public class DynamicConfigManager {

    /*key:uniqueId*/
    private ConcurrentHashMap<String,ServiceDefinition> serviceDefinitionMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String,Set<ServiceInstance>> serviceInstanceMap = new ConcurrentHashMap<>();
    /*规则集合*/
    private ConcurrentHashMap<String,Rule> ruleMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String,Rule> pathRuleMap = new ConcurrentHashMap<>();/*k:路径*/
    private ConcurrentHashMap<String,List<Rule>> serviceRuleMap = new ConcurrentHashMap<>();/*k:服务名*/

    .......
        
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
客户端—dubbo接口
负载均衡

← 客户端—dubbo接口 负载均衡→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式