网络通信层
# Request
@Slf4j
public class GatewayRequest implements IGatewayRequest{
@Getter
private final String uniquedId;
@Getter
private final long beginTime;
@Getter
private final Charset charset;
@Getter
private final String clientIp;
@Getter
private final String host;
@Getter
private final String path;
@Getter
private final String uri;
@Getter
private final HttpMethod method;
@Getter
private final String contentType;
@Getter
private final HttpHeaders headers;
@Getter
private final QueryStringDecoder queryStringDecoder;
@Getter
private final FullHttpRequest fullHttpRequest;
@Getter
private String body;
@Setter
@Getter
private long userId;
@Getter
private Map<String,io.netty.handler.codec.http.cookie.Cookie> cookieMap;
@Getter
private Map<String,List<String>> postParameters;
/*========================================可修改变量===========================================*/
private String modifyScheme;
private String modifyHost;
private String modifyPath;
private final RequestBuilder requestBuilder;
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 服务端
服务端启动类:
public class NettyHttpServer implements LifeCycle {
private final Config config;
private final NettyProcessor nettyProcessor;
private ServerBootstrap serverBootstrap;
private EventLoopGroup eventLoopGroupBoss;
@Getter
private EventLoopGroup eventLoopGroupWorker;
public NettyHttpServer(Config config, NettyProcessor nettyProcessor) {
this.config = config;
this.nettyProcessor = nettyProcessor;
init();
}
@Override
public void init() {
this.serverBootstrap = new ServerBootstrap();
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(config.getEventLoopGroupBossNum(),
new DefaultThreadFactory("netty-boss-nio"));
this.eventLoopGroupWorker = new EpollEventLoopGroup(config.getEventLoopGroupWokerNum(),
new DefaultThreadFactory("netty-woker-nio"));
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(config.getEventLoopGroupBossNum(),
new DefaultThreadFactory("netty-boss-nio"));
this.eventLoopGroupWorker = new NioEventLoopGroup(config.getEventLoopGroupWokerNum(),
new DefaultThreadFactory("netty-woker-nio"));
}
}
public boolean useEpoll() {
return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
}
@Override
public void start() {
this.serverBootstrap
.group(eventLoopGroupBoss, eventLoopGroupWorker)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(config.getPort()))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(config.getMaxContentLength()),
new NettyServerConnectManagerHandler(),
new NettyHttpServerHandler(nettyProcessor)
);
}
});
try {
this.serverBootstrap.bind().sync();
log.info("server startup on port {}", this.config.getPort());
} catch (Exception e) {
throw new RuntimeException();
}
}
@Override
public void shutdown() {
if (eventLoopGroupBoss != null) {
eventLoopGroupBoss.shutdownGracefully();
}
if (eventLoopGroupWorker != null) {
eventLoopGroupWorker.shutdownGracefully();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
pipeline中的 handler主要关注:
HttpServerCodec:Http编解码
Netty 从netty角度分析http协议解析(一)HttpServerCodec_httoservercodec-CSDN博客 (opens new window)
HttpObjectAggregator:聚合消息
一个HTTP请求最少也会在HttpRequestDecoder里分成两次往后传递,第一次是消息行和消息头,第二次是消息体,哪怕没有消息体,也会传一个空消息体。如果发送的消息体比较大的话,可能还会分成好几个消息体来处理,往后传递多次,这样使得我们后续的处理器可能要写多个逻辑判断,比较麻烦,可以聚合消息,将消息整合成一个完整的往后传递;
吃透Netty源码系列四十五之HttpObjectAggregator详解-CSDN博客 (opens new window)
ChannelDuplexHandler:入站出站事件处理;
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler { //... }
1
2
3NettyServerConnectManagerHandler:连接管理处理类;
对网络请求链接的生命周期处理,第一次发送请求创建链接时调用register和active方法;
@Slf4j public class NettyServerConnectManagerHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { //当Channel注册到它的EventLoop并且能够处理I/O时调用 final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.debug("NETTY SERVER PIPLINE: channelRegistered {}", remoteAddr); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { //当Channel从它的EventLoop中注销并且无法处理任何I/O时调用 final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.debug("NETTY SERVER PIPLINE: channelUnregistered {}", remoteAddr); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //当Channel处理于活动状态时被调用,可以接收与发送数据 final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.debug("NETTY SERVER PIPLINE: channelActive {}", remoteAddr); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //不再是活动状态且不再连接它的远程节点时被调用 final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.debug("NETTY SERVER PIPLINE: channelInactive {}", remoteAddr); super.channelInactive(ctx); } //...... }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39NettyHttpServerHandler:自定义处理类;
继承ChannelInboundHandlerAdapter,为了实现自定义入站数据处理逻辑;
实现channelRead,把逻辑委托给NettyProcessor;
public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter { private final NettyProcessor nettyProcessor; public NettyHttpServerHandler(NettyProcessor nettyProcessor) { this.nettyProcessor = nettyProcessor; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { /*pipeLine中的HttpObjectAggregator实现了将http报文转FullHttpRequest*/ FullHttpRequest request = (FullHttpRequest) msg; HttpRequestWrapper httpRequestWrapper = new HttpRequestWrapper(); httpRequestWrapper.setCtx(ctx); httpRequestWrapper.setRequest(request); nettyProcessor.process(httpRequestWrapper); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19HttpRequestWrapper对象包装了FullHttpRequest 与 ChannelHandlerContext:
- FullHttpRequest :Netty框架,FullHttpRequest 类是一个接口,代表一个完整HTTP请求,包括请求行(如方法GET/POST、URI、HTTP版本)、请求头,请求体;
- ChannelHandlerContext:过滤器上下文,指向了下一个要处理当前请求的类;
正式对请求处理NettyCoreProcessor:获取网关上下文(规则),对请求过滤;
public class NettyCoreProcessor implements NettyProcessor { private FilterFactory filterFactory = GatewayFilterChainFactory.getInstance(); @Override public void process(HttpRequestWrapper wrapper) { FullHttpRequest request = wrapper.getRequest(); ChannelHandlerContext ctx = wrapper.getCtx(); try { GatewayContext gatewayContext = RequestHelper.doContext(request, ctx); // route(gatewayContext); filterFactory.buildFilterChain(gatewayContext).doFilter(gatewayContext); } catch (BaseException e) { log.error("process error {} {}", e.getCode().getCode(), e.getCode().getMessage()); FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(e.getCode()); doWriteAndRelease(ctx, request, httpResponse); } catch (Throwable t) { log.error("process unkown error", t); FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR); doWriteAndRelease(ctx, request, httpResponse); } } @Override public void start() { } @Override public void shutDown() { } //...... }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 客户端
客户端启动类:
启动Netty的客户端和服务端;
public class NettyHttpClient implements LifeCycle {
private final Config config;
private final EventLoopGroup eventLoopGroupWoker;
private AsyncHttpClient asyncHttpClient;
public NettyHttpClient(Config config, EventLoopGroup eventLoopGroupWoker) {
this.config = config;
this.eventLoopGroupWoker = eventLoopGroupWoker;
init();
}
@Override
public void init() {
DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder()
.setEventLoopGroup(eventLoopGroupWoker)
.setConnectTimeout(config.getHttpConnectTimeout())
.setRequestTimeout(config.getHttpRequestTimeout())
.setMaxRedirects(config.getHttpMaxRequestRetry())
.setAllocator(PooledByteBufAllocator.DEFAULT) //池化的byteBuf分配器,提升性能
.setCompressionEnforced(true)
.setMaxConnections(config.getHttpMaxConnections())
.setMaxConnectionsPerHost(config.getHttpConnectionsPerHost())
.setPooledConnectionIdleTimeout(config.getHttpPooledConnectionIdleTimeout());
this.asyncHttpClient = new DefaultAsyncHttpClient(builder.build());
}
@Override
public void start() {
AsyncHttpHelper.getInstance().initialized(asyncHttpClient);
}
@Override
public void shutdown() {
if (asyncHttpClient != null) {
try {
this.asyncHttpClient.close();
} catch (IOException e) {
log.error("NettyHttpClient shutdown error", e);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
客户端使用
AsyncHttpClient
,基于Netty构建,提供异步的HTTP客户端功能,非阻塞发送HTTP请求;
# 容器
负责管理Netty模块的生命周期;
public class Container implements LifeCycle {
private final Config config;
private NettyHttpServer nettyHttpServer;
private NettyHttpClient nettyHttpClient;
private NettyProcessor nettyProcessor;
public Container(Config config) {
this.config = config;
init();
}
@Override
public void init() {
NettyCoreProcessor nettyCoreProcessor = new NettyCoreProcessor();
if(BUFFER_TYPE_PARALLEL.equals(config.getBufferType())){
this.nettyProcessor = new DisruptorNettyCoreProcessor(config,nettyCoreProcessor);
}else{
this.nettyProcessor = nettyCoreProcessor;
}
this.nettyHttpServer = new NettyHttpServer(config,nettyProcessor);
this.nettyHttpClient = new NettyHttpClient(config,nettyHttpServer.getEventLoopGroupWorker());
}
@Override
public void start() {
nettyProcessor.start();
nettyHttpServer.start();
nettyHttpClient.start();
log.info("api gateway started!");
}
@Override
public void shutdown() {
nettyProcessor.shutDown();
nettyHttpServer.shutdown();
nettyHttpClient.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57