路由转发过滤器
将请求最终转发到某一个指定的后端服务实例上去;
# 路由转发作用
- 请求修改和重定向:允许修改请求的各个部分,包括请求头、请求体、请求参数等,以适应目标服务的要求;
- 安全性:可以添加安全性相关的功能,例如身份验证和授权,以确保只有授权用户可以访问某些服务;
- 缓存:可以使用过滤器来实现请求和响应的缓存,以减轻目标服务的负载;
- 流量控制:通过路由转发过滤器,您可以实现流量控制和限流,以防止某个服务被过多的请求压垮;
- 负载均衡:确保请求均匀地分发到不同的服务实例;
- 日志监控:记录请求和响应的信息,以便进行监控和故障排除;
# 异步发送HTTP请求
使用异步方式发送http请求,Netty+AsyncHttpClient方式实现异步IO通信功能;
初始化AsyncHttpClient异步http请求发送工具;
@Slf4j
public class NettyHttpClient implements LifeCycle {
private final Config config;
private final EventLoopGroup eventLoopGroupWoker;
private AsyncHttpClient asyncHttpClient;
public NettyHttpClient(Config config, EventLoopGroup eventLoopGroupWoker) {
this.config = config;
this.eventLoopGroupWoker = eventLoopGroupWoker;
init();
}
@Override
public void init() {
DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder()
.setEventLoopGroup(eventLoopGroupWoker)
.setConnectTimeout(config.getHttpConnectTimeout())
.setRequestTimeout(config.getHttpRequestTimeout())
.setMaxRedirects(config.getHttpMaxRequestRetry())
.setAllocator(PooledByteBufAllocator.DEFAULT) //池化的byteBuf分配器,提升性能
.setCompressionEnforced(true)
.setMaxConnections(config.getHttpMaxConnections())
.setMaxConnectionsPerHost(config.getHttpConnectionsPerHost())
.setPooledConnectionIdleTimeout(config.getHttpPooledConnectionIdleTimeout());
this.asyncHttpClient = new DefaultAsyncHttpClient(builder.build());
}
@Override
public void start() {
AsyncHttpHelper.getInstance().initialized(asyncHttpClient);
}
@Override
public void shutdown() {
if (asyncHttpClient != null) {
try {
this.asyncHttpClient.close();
} catch (IOException e) {
log.error("NettyHttpClient shutdown error", e);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 路由转发过滤器
实现对请求的响应:将响应内容写回到请求的响应体中;
@Slf4j
@FilterAspect(id=ROUTER_FILTER_ID,
name = ROUTER_FILTER_NAME,
order = ROUTER_FILTER_ORDER)
public class RouterFilter implements Filter {
@Override
public void doFilter(GatewayContext ctx) throws Exception {
Request request = ctx.getRequest().build();
CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);
boolean whenComplete = ConfigLoader.getConfig().isWhenComplete();
if(whenComplete){
future.whenComplete((response, throwable) -> {
complete(request, response, throwable, ctx);
});
}else{
future.whenCompleteAsync((response, throwable) -> {
complete(request, response, throwable, ctx);
});
}
}
private void complete(Request request, Response response, Throwable throwable, GatewayContext gatewayContext) {
gatewayContext.releaseRequest();
try {
if (Objects.nonNull(throwable)) {
String url = request.getUrl();
if (throwable instanceof TimeoutException) {
log.warn("complete time out {}", url);
gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
} else {
gatewayContext.setThrowable(new ConnectException(throwable,
gatewayContext.getUniqueId(),
url, ResponseCode.HTTP_RESPONSE_ERROR));
}
} else {
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response));
}
} catch (Throwable t) {
gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
log.error("complete error", t);
} finally {
gatewayContext.written();
ResponseHelper.writeResponse(gatewayContext);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
使用CompletableFuture工具异步接受请求结果,请求执行完会执行我们的回调方法。
使用单异步模式,接收到请求响应之后,就会执行complete方法;响应如果遇到异常,那么我们就捕获异常并报错,如果没有,就正常走finally写回响应数据到前端;
至此请求转发到后端服务上;