Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 项目架构
  • 网络通信层
    • Request
    • 服务端
    • 客户端
    • 容器
  • 注册中心
  • 配置中心
  • 过滤器链
  • 路由转发过滤器
  • 重试与限流
  • 熔断与降级
  • 用户鉴权
  • 缓存优化
  • Disruptor缓冲区优化
  • 客户端—dubbo接口
  • 网关上下文
  • 负载均衡
  • API网关
Nreal
2023-12-03
目录

网络通信层

# Request

@Slf4j
public class GatewayRequest implements IGatewayRequest{

    @Getter
    private final String uniquedId;

    @Getter
    private final long beginTime;

    @Getter
    private final Charset charset;

    @Getter
    private final String clientIp;

    @Getter
    private final String host;

    @Getter
    private final String path;

    @Getter
    private final String uri;

    @Getter
    private final HttpMethod method;

    @Getter
    private final String contentType;

    @Getter
    private final HttpHeaders headers;

    @Getter
    private final QueryStringDecoder queryStringDecoder;

    @Getter
    private final FullHttpRequest fullHttpRequest;

    @Getter
    private String body;

    @Setter
    @Getter
    private long userId;

    @Getter
    private Map<String,io.netty.handler.codec.http.cookie.Cookie> cookieMap;

    @Getter
    private Map<String,List<String>> postParameters;

/*========================================可修改变量===========================================*/

    private String modifyScheme;

    private String modifyHost;

    private String modifyPath;

    private final RequestBuilder requestBuilder;

    ...
        
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

# 服务端

服务端启动类:

public class NettyHttpServer implements LifeCycle {

    private final Config config;
    private final NettyProcessor nettyProcessor;
    private ServerBootstrap serverBootstrap;
    private EventLoopGroup eventLoopGroupBoss;

    @Getter
    private EventLoopGroup eventLoopGroupWorker;

    public NettyHttpServer(Config config, NettyProcessor nettyProcessor) {
        this.config = config;
        this.nettyProcessor = nettyProcessor;
        init();
    }


    @Override
    public void init() {
        this.serverBootstrap = new ServerBootstrap();
        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(config.getEventLoopGroupBossNum(),
                    new DefaultThreadFactory("netty-boss-nio"));
            this.eventLoopGroupWorker = new EpollEventLoopGroup(config.getEventLoopGroupWokerNum(),
                    new DefaultThreadFactory("netty-woker-nio"));
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(config.getEventLoopGroupBossNum(),
                    new DefaultThreadFactory("netty-boss-nio"));
            this.eventLoopGroupWorker = new NioEventLoopGroup(config.getEventLoopGroupWokerNum(),
                    new DefaultThreadFactory("netty-woker-nio"));
        }
    }

    public  boolean useEpoll() {
        return RemotingUtil.isLinuxPlatform() && Epoll.isAvailable();
    }

    @Override
    public void start() {
        this.serverBootstrap
                .group(eventLoopGroupBoss, eventLoopGroupWorker)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(config.getPort()))
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(
                                new HttpServerCodec(),
                                new HttpObjectAggregator(config.getMaxContentLength()),
                                new NettyServerConnectManagerHandler(),
                                new NettyHttpServerHandler(nettyProcessor)
                        );
                    }
                });
        try {
            this.serverBootstrap.bind().sync();
            log.info("server startup on port {}", this.config.getPort());
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }

    @Override
    public void shutdown() {
        if (eventLoopGroupBoss != null) {
            eventLoopGroupBoss.shutdownGracefully();
        }
        if (eventLoopGroupWorker != null) {
            eventLoopGroupWorker.shutdownGracefully();
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

pipeline中的 handler主要关注:

  • HttpServerCodec:Http编解码

    Netty 从netty角度分析http协议解析(一)HttpServerCodec_httoservercodec-CSDN博客 (opens new window)

  • HttpObjectAggregator:聚合消息

    一个HTTP请求最少也会在HttpRequestDecoder里分成两次往后传递,第一次是消息行和消息头,第二次是消息体,哪怕没有消息体,也会传一个空消息体。如果发送的消息体比较大的话,可能还会分成好几个消息体来处理,往后传递多次,这样使得我们后续的处理器可能要写多个逻辑判断,比较麻烦,可以聚合消息,将消息整合成一个完整的往后传递;

    吃透Netty源码系列四十五之HttpObjectAggregator详解-CSDN博客 (opens new window)

  • ChannelDuplexHandler:入站出站事件处理;

    public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
     		//...   
    }
    
    1
    2
    3

    NettyServerConnectManagerHandler:连接管理处理类;

    对网络请求链接的生命周期处理,第一次发送请求创建链接时调用register和active方法;

    @Slf4j
    public class NettyServerConnectManagerHandler extends ChannelDuplexHandler {
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            //当Channel注册到它的EventLoop并且能够处理I/O时调用
            final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.debug("NETTY SERVER PIPLINE: channelRegistered {}", remoteAddr);
            super.channelRegistered(ctx);
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            //当Channel从它的EventLoop中注销并且无法处理任何I/O时调用
            final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.debug("NETTY SERVER PIPLINE: channelUnregistered {}", remoteAddr);
            super.channelUnregistered(ctx);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //当Channel处理于活动状态时被调用,可以接收与发送数据
            final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.debug("NETTY SERVER PIPLINE: channelActive {}", remoteAddr);
            super.channelActive(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            //不再是活动状态且不再连接它的远程节点时被调用
            final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.debug("NETTY SERVER PIPLINE: channelInactive {}", remoteAddr);
            super.channelInactive(ctx);
        }
        
     
        //......
        
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
  • NettyHttpServerHandler:自定义处理类;

    继承ChannelInboundHandlerAdapter,为了实现自定义入站数据处理逻辑;

    实现channelRead,把逻辑委托给NettyProcessor;

public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter {

   private final NettyProcessor nettyProcessor;

   public NettyHttpServerHandler(NettyProcessor nettyProcessor) {
       this.nettyProcessor = nettyProcessor;
   }

   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       /*pipeLine中的HttpObjectAggregator实现了将http报文转FullHttpRequest*/
       FullHttpRequest request = (FullHttpRequest) msg;
       HttpRequestWrapper httpRequestWrapper = new HttpRequestWrapper();
       httpRequestWrapper.setCtx(ctx);
       httpRequestWrapper.setRequest(request);
       nettyProcessor.process(httpRequestWrapper);
   }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

HttpRequestWrapper对象包装了FullHttpRequest 与 ChannelHandlerContext:

  • FullHttpRequest :Netty框架,FullHttpRequest 类是一个接口,代表一个完整HTTP请求,包括请求行(如方法GET/POST、URI、HTTP版本)、请求头,请求体;
  • ChannelHandlerContext:过滤器上下文,指向了下一个要处理当前请求的类;

正式对请求处理NettyCoreProcessor:获取网关上下文(规则),对请求过滤;

public class NettyCoreProcessor implements NettyProcessor {

    private FilterFactory filterFactory = GatewayFilterChainFactory.getInstance();

    @Override
    public void process(HttpRequestWrapper wrapper) {
        FullHttpRequest request = wrapper.getRequest();
        ChannelHandlerContext ctx = wrapper.getCtx();
        try {
            GatewayContext gatewayContext = RequestHelper.doContext(request, ctx);
//            route(gatewayContext);
            filterFactory.buildFilterChain(gatewayContext).doFilter(gatewayContext);
        } catch (BaseException e) {
            log.error("process error {} {}", e.getCode().getCode(), e.getCode().getMessage());
            FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(e.getCode());
            doWriteAndRelease(ctx, request, httpResponse);
        } catch (Throwable t) {
            log.error("process unkown error", t);
            FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
            doWriteAndRelease(ctx, request, httpResponse);
        }
    }

    @Override
    public void start() {

    }

    @Override
    public void shutDown() {

    }
    
    //......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 客户端

客户端启动类:

启动Netty的客户端和服务端;

public class NettyHttpClient implements LifeCycle {

    private final Config config;

    private final EventLoopGroup eventLoopGroupWoker;

    private AsyncHttpClient asyncHttpClient;

    public NettyHttpClient(Config config, EventLoopGroup eventLoopGroupWoker) {
        this.config = config;
        this.eventLoopGroupWoker = eventLoopGroupWoker;
        init();
    }

    @Override
    public void init() {
        DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder()
                .setEventLoopGroup(eventLoopGroupWoker)
                .setConnectTimeout(config.getHttpConnectTimeout())
                .setRequestTimeout(config.getHttpRequestTimeout())
                .setMaxRedirects(config.getHttpMaxRequestRetry())
                .setAllocator(PooledByteBufAllocator.DEFAULT) //池化的byteBuf分配器,提升性能
                .setCompressionEnforced(true)
                .setMaxConnections(config.getHttpMaxConnections())
                .setMaxConnectionsPerHost(config.getHttpConnectionsPerHost())
                .setPooledConnectionIdleTimeout(config.getHttpPooledConnectionIdleTimeout());
        this.asyncHttpClient = new DefaultAsyncHttpClient(builder.build());
    }

    @Override
    public void start() {
        AsyncHttpHelper.getInstance().initialized(asyncHttpClient);
    }

    @Override
    public void shutdown() {
        if (asyncHttpClient != null) {
            try {
                this.asyncHttpClient.close();
            } catch (IOException e) {
                log.error("NettyHttpClient shutdown error", e);
            }
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

客户端使用 AsyncHttpClient,基于Netty构建,提供异步的HTTP客户端功能,非阻塞发送HTTP请求;

# 容器

负责管理Netty模块的生命周期;

public class Container implements LifeCycle {

    private final Config config;

    private NettyHttpServer nettyHttpServer;

    private NettyHttpClient nettyHttpClient;

    private NettyProcessor nettyProcessor;

    public Container(Config config) {
        this.config = config;
        init();
    }

    @Override
    public void init() {

        NettyCoreProcessor nettyCoreProcessor = new NettyCoreProcessor();

        if(BUFFER_TYPE_PARALLEL.equals(config.getBufferType())){
            this.nettyProcessor = new DisruptorNettyCoreProcessor(config,nettyCoreProcessor);
        }else{
            this.nettyProcessor = nettyCoreProcessor;
        }

        this.nettyHttpServer = new NettyHttpServer(config,nettyProcessor);

        this.nettyHttpClient = new NettyHttpClient(config,nettyHttpServer.getEventLoopGroupWorker());

    }

    @Override
    public void start() {

        nettyProcessor.start();

        nettyHttpServer.start();

        nettyHttpClient.start();

        log.info("api gateway started!");

    }

    @Override
    public void shutdown() {

        nettyProcessor.shutDown();

        nettyHttpServer.shutdown();

        nettyHttpClient.shutdown();

    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
项目架构
注册中心

← 项目架构 注册中心→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式