网关上下文
# 网关上下文
public class GatewayContext extends BasicContext{
private GatewayRequest request;
private GatewayResponse response;
private Rule rule;
private int currentRetryTimes;
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 包装请求类
RequestHelper#doRequest()
private static GatewayRequest doRequest(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx) {
HttpHeaders headers = fullHttpRequest.headers();
/*从header头获取必须要传入的关键属性 uniqueId*/
String uniqueId = headers.get(GatewayConst.UNIQUE_ID);
String host = headers.get(HttpHeaderNames.HOST);
HttpMethod method = fullHttpRequest.method();
String uri = fullHttpRequest.uri();
String clientIp = getClientIp(ctx, fullHttpRequest);
String contentType = HttpUtil.getMimeType(fullHttpRequest) == null ? null : HttpUtil.getMimeType(fullHttpRequest).toString();
Charset charset = HttpUtil.getCharset(fullHttpRequest, StandardCharsets.UTF_8);
GatewayRequest gatewayRequest = new GatewayRequest(uniqueId,
charset,
clientIp,
host,
uri,
method,
contentType,
headers,
fullHttpRequest);
return gatewayRequest;
}
private static String getClientIp(ChannelHandlerContext ctx, FullHttpRequest request) {
String xForwardedValue = request.headers().get(BasicConst.HTTP_FORWARD_SEPARATOR);
String clientIp = null;
if(StringUtils.isNotEmpty(xForwardedValue)) {
List<String> values = Arrays.asList(xForwardedValue.split(", "));
if(values.size() >= 1 && StringUtils.isNotBlank(values.get(0))) {
clientIp = values.get(0);
}
}
if(clientIp == null) {
InetSocketAddress inetSocketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
clientIp = inetSocketAddress.getAddress().getHostAddress();
}
return clientIp;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 包装网关上下文
RequestHelper#doContext()
public static GatewayContext doContext(FullHttpRequest request, ChannelHandlerContext ctx) {
GatewayRequest gateWayRequest = doRequest(request, ctx);
/*根据请求对象里的uniqueId,获取资源服务信息(也就是服务定义信息)*/
// ServiceDefinition serviceDefinition = ServiceDefinition.builder()
// .serviceId("demo")
// .enable(true)
// .version("v1")
// .patternPath("**")
// .envType("dev")
// .protocol(GatewayProtocol.HTTP)
// .build();
ServiceDefinition serviceDefinition = DynamicConfigManager.getInstance().getServiceDefinition(gateWayRequest.getUniquedId());
/*根据请求对象获取服务定义对应的方法调用,然后获取对应的规则*/
ServiceInvoker serviceInvoker = new HttpServiceInvoker();
serviceInvoker.setInvokerPath(gateWayRequest.getPath());
serviceInvoker.setTimeout(500);
/*根据请求对象获取规则*/
Rule rule = getRule(gateWayRequest,serviceDefinition.getServiceId());
/*构建GateWayContext对象*/
GatewayContext gatewayContext = new GatewayContext(
serviceDefinition.getProtocol(),
ctx,
HttpUtil.isKeepAlive(request),
gateWayRequest,
rule,0);
// gatewayContext.getRequest().setModifyHost("127.0.0.1:8080");
return gatewayContext;
}
private static Rule getRule(GatewayRequest gateWayRequest,String serviceId){
String key = serviceId + "." + gateWayRequest.getPath();
Rule rule = DynamicConfigManager.getInstance().getRuleByPath(key);
if (rule != null){
return rule;
}
return DynamicConfigManager
.getInstance()
.getRuleByServiceId(serviceId)
.stream().filter(r -> gateWayRequest.getPath().startsWith(r.getPrefix()))
.findAny().orElseThrow(()-> new ResponseException(PATH_NO_MATCHED));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 启动类
public class Bootstrap {
public static void main(String[] args) {
/*加载网关核心静态配置*/
/*优先级:运行参数 -> jvm参数 -> 环境变量 -> 配置文件 -> 配置对象对默认值*/
Config config = ConfigLoader.getInstance().load(args);
System.out.println(config.getPort());
/*加载配置中心并监听*/
ServiceLoader<ConfigCenter> serviceLoader = ServiceLoader.load(ConfigCenter.class);
final ConfigCenter configCenter = serviceLoader.findFirst().orElseThrow(() -> {
log.error("not found ConfigCenter impl");
return new RuntimeException("not found ConfigCenter impl");
});
configCenter.init(config.getRegistryAddress(), config.getEnv());
configCenter.subscribeRulesChange(rules-> DynamicConfigManager.getInstance().putAllRule(rules));
// configCenter.subscribeDtpChange(dtps-> DynamicConfigManager.getInstance().updateDtp(dtps));
/*容器启动*/
Container container = new Container(config);
container.start();
/*加载注册中心*/
final RegisterCenter registerCenter = registerAndSubscribe(config);
/*服务优雅关机*/
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
registerCenter.deregister(buildGatewayServiceDefinition(config),
buildGatewayServiceInstance(config));
container.shutdown();
}
});
}
private static RegisterCenter registerAndSubscribe(Config config) {
ServiceLoader<RegisterCenter> serviceLoader = ServiceLoader.load(RegisterCenter.class);
final RegisterCenter registerCenter = serviceLoader.findFirst().orElseThrow(() -> {
log.error("not found RegisterCenter impl");
return new RuntimeException("not found RegisterCenter impl");
});
registerCenter.init(config.getRegistryAddress(), config.getEnv());
/*注册*/
ServiceDefinition serviceDefinition = buildGatewayServiceDefinition(config);
ServiceInstance serviceInstance = buildGatewayServiceInstance(config);
registerCenter.register(serviceDefinition,serviceInstance);
/*订阅*/
registerCenter.subscribeAllServices((serviceDefinition1,serviceInstanceSet)->{
log.info("refresh service and instance: {} {}", serviceDefinition1.getUniqueId(), JSON.toJSON(serviceInstanceSet));
DynamicConfigManager manager = DynamicConfigManager.getInstance();
manager.addServiceInstance(serviceDefinition1.getUniqueId(), serviceInstanceSet);
manager.putServiceDefinition(serviceDefinition1.getUniqueId(),serviceDefinition1);
});
return registerCenter;
}
private static ServiceInstance buildGatewayServiceInstance(Config config) {
String localIp = NetUtils.getLocalIp();
int port = config.getPort();
ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setServiceInstanceId(localIp + COLON_SEPARATOR + port);
serviceInstance.setIp(localIp);
serviceInstance.setPort(port);
serviceInstance.setRegisterTime(TimeUtil.currentTimeMillis());
return serviceInstance;
}
private static ServiceDefinition buildGatewayServiceDefinition(Config config) {
ServiceDefinition serviceDefinition = new ServiceDefinition();
serviceDefinition.setInvokerMap(Map.of());
serviceDefinition.setUniqueId(config.getApplicationName());/*?*/
serviceDefinition.setServiceId(config.getApplicationName());
serviceDefinition.setEnvType(config.getEnv());
return serviceDefinition;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# 动态配置管理类
@NoArgsConstructor
public class DynamicConfigManager {
/*key:uniqueId*/
private ConcurrentHashMap<String,ServiceDefinition> serviceDefinitionMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String,Set<ServiceInstance>> serviceInstanceMap = new ConcurrentHashMap<>();
/*规则集合*/
private ConcurrentHashMap<String,Rule> ruleMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String,Rule> pathRuleMap = new ConcurrentHashMap<>();/*k:路径*/
private ConcurrentHashMap<String,List<Rule>> serviceRuleMap = new ConcurrentHashMap<>();/*k:服务名*/
.......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14