Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 项目架构
  • 网络通信层
  • 注册中心
  • 配置中心
  • 过滤器链
  • 路由转发过滤器
  • 重试与限流
    • 重试
    • 限流
      • IGatewayFlowCtlRule
      • 限流过滤器
      • 路径限流
      • 分布式限流
    • 限流算法总结
      • 计数器限流
      • 滑动窗口限流
      • 漏桶算法
      • 令牌桶算法
  • 熔断与降级
  • 用户鉴权
  • 缓存优化
  • Disruptor缓冲区优化
  • 客户端—dubbo接口
  • 网关上下文
  • 负载均衡
  • API网关
Nreal
2024-03-04
目录

重试与限流

# 重试

会在IO异常以及请求超时进行一个请求重试,在路由过滤器中添加一个重试的函数,用于在请求出现如上两个异常的时候进行重试;

@Slf4j
@FilterAspect(id=ROUTER_FILTER_ID,
        name = ROUTER_FILTER_NAME,
        order = ROUTER_FILTER_ORDER)
public class RouterFilter implements Filter {

    @Override
    public void doFilter(GatewayContext ctx) throws Exception {
        Request request = ctx.getRequest().build();
        CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);
        boolean whenComplete = ConfigLoader.getConfig().isWhenComplete();
        if(whenComplete){
            future.whenComplete((response, throwable) -> {
                complete(request, response, throwable, ctx);
            });
        }else{
            future.whenCompleteAsync((response, throwable) -> {
                complete(request, response, throwable, ctx);
            });
        }
    }

    private void complete(Request request, Response response, Throwable throwable, GatewayContext gatewayContext) {
        gatewayContext.releaseRequest();
        /*添加异常重试逻辑*/
        Rule rule = gatewayContext.getRule();
        int currentRetryTimes = gatewayContext.getCurrentRetryTimes();
        int confRetryTimes = rule.getRetryConfig().getTimes();
        if ((throwable instanceof TimeoutException || throwable instanceof IOException) && currentRetryTimes<=confRetryTimes) {
            doRetry(gatewayContext,currentRetryTimes);
            return;
        }
        try {
            if (Objects.nonNull(throwable)) {
                String url = request.getUrl();
                if (throwable instanceof TimeoutException) {
                    log.warn("complete time out {}", url);
                    gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
                    gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.REQUEST_TIMEOUT));
                } else {
                    gatewayContext.setThrowable(new ConnectException(throwable,gatewayContext.getUniqueId(),url,ResponseCode.HTTP_RESPONSE_ERROR));
                    gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.HTTP_RESPONSE_ERROR));
                }
            } else {
                gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response));
            }
        } catch (Throwable t) {
            gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
            gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.INTERNAL_ERROR));
            log.error("complete error", t);
        } finally {
            gatewayContext.written();
            ResponseHelper.writeResponse(gatewayContext);
        }
    }

    private void doRetry(GatewayContext gatewayContext, int retryTimes) {
        System.out.println("当前重试次数为"+retryTimes);
        gatewayContext.setCurrentRetryTimes(retryTimes+1);
        try {
            doFilter(gatewayContext);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

重试即再一次调用doFilter方法执行路由过滤器中逻辑;

# 限流

分布式限流:令牌桶算法;

# IGatewayFlowCtlRule

用于获取对应的限流过滤器;

public interface IGatewayFlowCtlRule {
    void doFlowCtlFilter(Rule.FlowCtlConfig flowCtlConfig, String serviceId);
}
1
2
3

# 限流过滤器

根据请求获取对应的限流规则;

@Slf4j
@FilterAspect(id=FLOW_CTL_FILTER_ID,
        name = FLOW_CTL_FILTER_NAME,
        order = FLOW_CTL_FILTER_ORDER)
public class FlowCtlFilter implements Filter {
    @Override
    public void doFilter(GatewayContext ctx) throws Exception {
        Rule rule = ctx.getRule();
        if(rule!=null){
            Set<Rule.FlowCtlConfig> flowCtlConfigs = rule.getFlowCtlConfigs();
            Iterator<Rule.FlowCtlConfig> iterator = flowCtlConfigs.iterator();
            Rule.FlowCtlConfig flowCtlConfig;
            while(iterator.hasNext()){
                IGatewayFlowCtlRule flowCtlRule = null;
                flowCtlConfig = (Rule.FlowCtlConfig)iterator.next();
                if(flowCtlConfig == null){
                    continue;
                }
                String path = ctx.getRequest().getPath();
                if(flowCtlConfig.getType().equalsIgnoreCase(FLOW_CTL_TYPE_PATH) && path.equals(flowCtlConfig.getValue())){
                    flowCtlRule = FlowCtlByPathRule.getInstance(rule.getServiceId(),path);
                }else if(flowCtlConfig.getType().equalsIgnoreCase(FLOW_CTL_TYPE_SERVICE)){

                }
                if(flowCtlRule != null){
                    flowCtlRule.doFlowCtlFilter(flowCtlConfig,rule.getServiceId());
                }
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 路径限流

需要服务以及请求路径;

保存规则,每当对应的请求到来,就从缓存中获取对应的限流规则;

public class FlowCtlByPathRule implements IGatewayFlowCtlRule{

    private String serviceId;

    private String path;

    private static final String LIMIT_MESSAGE ="您的请求过于频繁,请稍后重试";

    private static ConcurrentHashMap<String,FlowCtlByPathRule> servicePathMap = new ConcurrentHashMap<>();

    public FlowCtlByPathRule(String serviceId, String path) {
        this.serviceId = serviceId;
        this.path = path;
    }

    public static FlowCtlByPathRule getInstance(String serviceId, String path) {
        StringBuffer buffer = new StringBuffer();
        String key = buffer.append(serviceId).append(".").append(path).toString();
        FlowCtlByPathRule flowCtlByPathRule = servicePathMap.get(key);
        if(flowCtlByPathRule == null){
            flowCtlByPathRule = new FlowCtlByPathRule(serviceId, path);
        }
        return flowCtlByPathRule;
    }
    
    
    @Override
    public void doFlowCtlFilter(Rule.FlowCtlConfig flowCtlConfig, String serviceId) {
        if(flowCtlConfig == null || StringUtils.isEmpty(serviceId) || StringUtils.isEmpty(flowCtlConfig.getConfig())){
            return;
        }
        Map<String,Integer> configMap = JSON.parseObject(flowCtlConfig.getConfig(),Map.class);
        if(!configMap.containsKey(FLOW_CTL_LIMIT_DURATION) || !configMap.containsKey(FLOW_CTL_LIMIT_PERMITS)){
            return;
        }
        double duration = configMap.get(FLOW_CTL_LIMIT_DURATION);
        double permits = configMap.get(FLOW_CTL_LIMIT_PERMITS);
        StringBuffer buffer = new StringBuffer();
        boolean flag = true;
        String key = buffer.append(serviceId).append(".").append(path).toString();
        if(FLOW_CTL_MODEL_DISTRIBUTED.equalsIgnoreCase(flowCtlConfig.getModel())) {
            flag = redisCountLimiter.doFlowCtl(key,(int)permits,(int)duration);
        }else{
            GuavaCountLimiter guavaCountLimiter = GuavaCountLimiter.getInstance(serviceId,flowCtlConfig);
            if(guavaCountLimiter == null){
                throw new RuntimeException("获取单机限流工具类为空");
            }
            double count = Math.ceil(permits/duration);
            flag = guavaCountLimiter.acquire((int)count);
        }
        if(!flag){
            throw new RuntimeException(LIMIT_MESSAGE);
        }
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# 分布式限流

key为服务id+路径;limit和expire配置指定;

@Slf4j
public class RedisCountLimiter {

    protected JedisUtil jedisUtil;

    public RedisCountLimiter(JedisUtil jedisUtil) {
        this.jedisUtil = jedisUtil;
    }

    private static final int SUCCESS_RESULT = 1;
    private static final int FAILED_RESULT = 0;

    public  boolean doFlowCtl(String key,int limit,int expire){
        try {
            Object object = jedisUtil.executeScript(key,limit,expire);
            if(object == null){
                return true;
            }
            Long result = Long.valueOf(object.toString());
            if(FAILED_RESULT == result){
                return  false;
            }
        }catch (Exception e){
            throw new RuntimeException("分布式限流发生错误");
        }
        return true;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

具体限流:

public Object executeScript(String key, int limit, int expire){
    Jedis jedis = jedisPool.getJedis();
    String lua = buildLuaScript();
    String scriptLoad =jedis.scriptLoad(lua);
    try {
        Object result = jedis.evalsha(scriptLoad, Arrays.asList(key), Arrays.asList(String.valueOf(expire), String.valueOf(limit)));
        System.out.println(result);
        return result;
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (jedis != null) {
            try {
                jedis.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    return null;
}


// 构造lua脚本
private static String buildLuaScript() {
    String lua = "local num = redis.call('incr', KEYS[1])\n" +
            "if tonumber(num) == 1 then\n" +
            "\tredis.call('expire', KEYS[1], ARGV[1])\n" +
            "\treturn 1\n" +
            "elseif tonumber(num) > tonumber(ARGV[2]) then\n" +
            "\treturn 0\n" +
            "else \n" +
            "\treturn 1\n" +
            "end\n";
    return lua;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

key为服务Id+路径,每有一个请求都会执行incr操作,直到num>limit,拒绝请求;

num一直增大?

每个key设置了过期时间,num并不会无限制增大;

# 漏桶算法

以一定的速率往同种存放令牌,消除突刺现象,不管服务调用方多么不稳定,通过漏桶算法进行限流,每10毫秒处理一次请求。因为处理的速度是固定的,请求进来的速度是未知的,可能突然进来很多请求,没来得及处理的请求就先放在桶里,既然是个桶,肯定是有容量上限,如果桶满了,那么新进来的请求就丢弃;

# 令牌桶算法

以一定的速率往桶中放令牌。每次请求调用需要先获取令牌,只有拿到令牌,才有机会继续执行,否则选择选择等待可用的令牌、或者直接拒绝;

# 限流算法总结

# 计数器限流

定义单位时间内的阈值,每次请求增加一次计数:

  • 请求总数<=单位时间内阈值,执行正常流程;
  • 请求总数>单位时间内阈值,限流处理;

进入下个时间单位,计数清除;

临界值问题:所有请求集中在 0.99s~1.01s,0.02s内2000个请求超过阈值,但是单位时间内没有超过阈值;

# 滑动窗口限流

解决临界值问题:

  • 将单位时间划分多个区间;
  • 每个时间区间内,没有一次请求将计数+1;
  • 每个时间区间的请求计数,加起来为整个滑动窗口的总数;
  • 每经过一个区间,抛弃最左区间,加入最右区间;
  • 请求总数超过阈值,新的请求被限流;

缺点:时间区间精度把控,精度越高,所需要的空间容量越大;

# 漏桶算法

限制流出速率;

让流量更平滑:流量整形,速率限制

原理:

  • 每个请i去到达时,先经过一个队列;
  • 进水速率<=流出速率,漏桶不起作用;
  • 进水速率>流出速率,桶里的水延迟流出,并以固定速率流出;
  • 桶满时,请求被限流;

# 令牌桶算法

限制流入速率;

除了漏桶算法拥有的流量整形&速率限制功能外,还可以允许突发数据的发送;

桶里装的是令牌:

  • 如果使用在网络带宽时,令牌可以看作字节;
  • 如果根据请求数量来控制流量,令牌可以看作请求;

工作过程:

  • 产生令牌:配置平均发送速率r,每个1/r秒,有一个令牌放入桶中;
  • 令牌上限:桶满后,多余令牌丢弃;
  • 消耗令牌:当一个n个字节数据包到达时,就从令牌桶中删除n个令牌,数据包再被发送到网络中;
  • 突发流量:桶内最多可以存放b个令牌,允许最长b个字节的突发流量;
路由转发过滤器
熔断与降级

← 路由转发过滤器 熔断与降级→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式