重试与限流
# 重试
会在IO异常以及请求超时进行一个请求重试,在路由过滤器中添加一个重试的函数,用于在请求出现如上两个异常的时候进行重试;
@Slf4j
@FilterAspect(id=ROUTER_FILTER_ID,
name = ROUTER_FILTER_NAME,
order = ROUTER_FILTER_ORDER)
public class RouterFilter implements Filter {
@Override
public void doFilter(GatewayContext ctx) throws Exception {
Request request = ctx.getRequest().build();
CompletableFuture<Response> future = AsyncHttpHelper.getInstance().executeRequest(request);
boolean whenComplete = ConfigLoader.getConfig().isWhenComplete();
if(whenComplete){
future.whenComplete((response, throwable) -> {
complete(request, response, throwable, ctx);
});
}else{
future.whenCompleteAsync((response, throwable) -> {
complete(request, response, throwable, ctx);
});
}
}
private void complete(Request request, Response response, Throwable throwable, GatewayContext gatewayContext) {
gatewayContext.releaseRequest();
/*添加异常重试逻辑*/
Rule rule = gatewayContext.getRule();
int currentRetryTimes = gatewayContext.getCurrentRetryTimes();
int confRetryTimes = rule.getRetryConfig().getTimes();
if ((throwable instanceof TimeoutException || throwable instanceof IOException) && currentRetryTimes<=confRetryTimes) {
doRetry(gatewayContext,currentRetryTimes);
return;
}
try {
if (Objects.nonNull(throwable)) {
String url = request.getUrl();
if (throwable instanceof TimeoutException) {
log.warn("complete time out {}", url);
gatewayContext.setThrowable(new ResponseException(ResponseCode.REQUEST_TIMEOUT));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.REQUEST_TIMEOUT));
} else {
gatewayContext.setThrowable(new ConnectException(throwable,gatewayContext.getUniqueId(),url,ResponseCode.HTTP_RESPONSE_ERROR));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.HTTP_RESPONSE_ERROR));
}
} else {
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(response));
}
} catch (Throwable t) {
gatewayContext.setThrowable(new ResponseException(ResponseCode.INTERNAL_ERROR));
gatewayContext.setResponse(GatewayResponse.buildGatewayResponse(ResponseCode.INTERNAL_ERROR));
log.error("complete error", t);
} finally {
gatewayContext.written();
ResponseHelper.writeResponse(gatewayContext);
}
}
private void doRetry(GatewayContext gatewayContext, int retryTimes) {
System.out.println("当前重试次数为"+retryTimes);
gatewayContext.setCurrentRetryTimes(retryTimes+1);
try {
doFilter(gatewayContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
重试即再一次调用doFilter方法执行路由过滤器中逻辑;
# 限流
分布式限流:令牌桶算法;
# IGatewayFlowCtlRule
用于获取对应的限流过滤器;
public interface IGatewayFlowCtlRule {
void doFlowCtlFilter(Rule.FlowCtlConfig flowCtlConfig, String serviceId);
}
1
2
3
2
3
# 限流过滤器
根据请求获取对应的限流规则;
@Slf4j
@FilterAspect(id=FLOW_CTL_FILTER_ID,
name = FLOW_CTL_FILTER_NAME,
order = FLOW_CTL_FILTER_ORDER)
public class FlowCtlFilter implements Filter {
@Override
public void doFilter(GatewayContext ctx) throws Exception {
Rule rule = ctx.getRule();
if(rule!=null){
Set<Rule.FlowCtlConfig> flowCtlConfigs = rule.getFlowCtlConfigs();
Iterator<Rule.FlowCtlConfig> iterator = flowCtlConfigs.iterator();
Rule.FlowCtlConfig flowCtlConfig;
while(iterator.hasNext()){
IGatewayFlowCtlRule flowCtlRule = null;
flowCtlConfig = (Rule.FlowCtlConfig)iterator.next();
if(flowCtlConfig == null){
continue;
}
String path = ctx.getRequest().getPath();
if(flowCtlConfig.getType().equalsIgnoreCase(FLOW_CTL_TYPE_PATH) && path.equals(flowCtlConfig.getValue())){
flowCtlRule = FlowCtlByPathRule.getInstance(rule.getServiceId(),path);
}else if(flowCtlConfig.getType().equalsIgnoreCase(FLOW_CTL_TYPE_SERVICE)){
}
if(flowCtlRule != null){
flowCtlRule.doFlowCtlFilter(flowCtlConfig,rule.getServiceId());
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 路径限流
需要服务以及请求路径;
保存规则,每当对应的请求到来,就从缓存中获取对应的限流规则;
public class FlowCtlByPathRule implements IGatewayFlowCtlRule{
private String serviceId;
private String path;
private static final String LIMIT_MESSAGE ="您的请求过于频繁,请稍后重试";
private static ConcurrentHashMap<String,FlowCtlByPathRule> servicePathMap = new ConcurrentHashMap<>();
public FlowCtlByPathRule(String serviceId, String path) {
this.serviceId = serviceId;
this.path = path;
}
public static FlowCtlByPathRule getInstance(String serviceId, String path) {
StringBuffer buffer = new StringBuffer();
String key = buffer.append(serviceId).append(".").append(path).toString();
FlowCtlByPathRule flowCtlByPathRule = servicePathMap.get(key);
if(flowCtlByPathRule == null){
flowCtlByPathRule = new FlowCtlByPathRule(serviceId, path);
}
return flowCtlByPathRule;
}
@Override
public void doFlowCtlFilter(Rule.FlowCtlConfig flowCtlConfig, String serviceId) {
if(flowCtlConfig == null || StringUtils.isEmpty(serviceId) || StringUtils.isEmpty(flowCtlConfig.getConfig())){
return;
}
Map<String,Integer> configMap = JSON.parseObject(flowCtlConfig.getConfig(),Map.class);
if(!configMap.containsKey(FLOW_CTL_LIMIT_DURATION) || !configMap.containsKey(FLOW_CTL_LIMIT_PERMITS)){
return;
}
double duration = configMap.get(FLOW_CTL_LIMIT_DURATION);
double permits = configMap.get(FLOW_CTL_LIMIT_PERMITS);
StringBuffer buffer = new StringBuffer();
boolean flag = true;
String key = buffer.append(serviceId).append(".").append(path).toString();
if(FLOW_CTL_MODEL_DISTRIBUTED.equalsIgnoreCase(flowCtlConfig.getModel())) {
flag = redisCountLimiter.doFlowCtl(key,(int)permits,(int)duration);
}else{
GuavaCountLimiter guavaCountLimiter = GuavaCountLimiter.getInstance(serviceId,flowCtlConfig);
if(guavaCountLimiter == null){
throw new RuntimeException("获取单机限流工具类为空");
}
double count = Math.ceil(permits/duration);
flag = guavaCountLimiter.acquire((int)count);
}
if(!flag){
throw new RuntimeException(LIMIT_MESSAGE);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 分布式限流
key为服务id+路径;limit和expire配置指定;
@Slf4j
public class RedisCountLimiter {
protected JedisUtil jedisUtil;
public RedisCountLimiter(JedisUtil jedisUtil) {
this.jedisUtil = jedisUtil;
}
private static final int SUCCESS_RESULT = 1;
private static final int FAILED_RESULT = 0;
public boolean doFlowCtl(String key,int limit,int expire){
try {
Object object = jedisUtil.executeScript(key,limit,expire);
if(object == null){
return true;
}
Long result = Long.valueOf(object.toString());
if(FAILED_RESULT == result){
return false;
}
}catch (Exception e){
throw new RuntimeException("分布式限流发生错误");
}
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
具体限流:
public Object executeScript(String key, int limit, int expire){
Jedis jedis = jedisPool.getJedis();
String lua = buildLuaScript();
String scriptLoad =jedis.scriptLoad(lua);
try {
Object result = jedis.evalsha(scriptLoad, Arrays.asList(key), Arrays.asList(String.valueOf(expire), String.valueOf(limit)));
System.out.println(result);
return result;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
try {
jedis.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
return null;
}
// 构造lua脚本
private static String buildLuaScript() {
String lua = "local num = redis.call('incr', KEYS[1])\n" +
"if tonumber(num) == 1 then\n" +
"\tredis.call('expire', KEYS[1], ARGV[1])\n" +
"\treturn 1\n" +
"elseif tonumber(num) > tonumber(ARGV[2]) then\n" +
"\treturn 0\n" +
"else \n" +
"\treturn 1\n" +
"end\n";
return lua;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
key为服务Id+路径,每有一个请求都会执行incr操作,直到num>limit,拒绝请求;
num一直增大?
每个key设置了过期时间,num并不会无限制增大;
# 漏桶算法
以一定的速率往同种存放令牌,消除突刺现象,不管服务调用方多么不稳定,通过漏桶算法进行限流,每10毫秒处理一次请求。因为处理的速度是固定的,请求进来的速度是未知的,可能突然进来很多请求,没来得及处理的请求就先放在桶里,既然是个桶,肯定是有容量上限,如果桶满了,那么新进来的请求就丢弃;
# 令牌桶算法
以一定的速率往桶中放令牌。每次请求调用需要先获取令牌,只有拿到令牌,才有机会继续执行,否则选择选择等待可用的令牌、或者直接拒绝;
# 限流算法总结
# 计数器限流
定义单位时间内的阈值,每次请求增加一次计数:
- 请求总数<=单位时间内阈值,执行正常流程;
- 请求总数>单位时间内阈值,限流处理;
进入下个时间单位,计数清除;
临界值问题:所有请求集中在 0.99s~1.01s,0.02s内2000个请求超过阈值,但是单位时间内没有超过阈值;
# 滑动窗口限流
解决临界值问题:
- 将单位时间划分多个区间;
- 每个时间区间内,没有一次请求将计数+1;
- 每个时间区间的请求计数,加起来为整个滑动窗口的总数;
- 每经过一个区间,抛弃最左区间,加入最右区间;
- 请求总数超过阈值,新的请求被限流;
缺点:时间区间精度把控,精度越高,所需要的空间容量越大;
# 漏桶算法
限制流出速率;
让流量更平滑:流量整形,速率限制
原理:
- 每个请i去到达时,先经过一个队列;
- 进水速率<=流出速率,漏桶不起作用;
- 进水速率>流出速率,桶里的水延迟流出,并以固定速率流出;
- 桶满时,请求被限流;
# 令牌桶算法
限制流入速率;
除了漏桶算法拥有的流量整形&速率限制功能外,还可以允许突发数据的发送;
桶里装的是令牌:
- 如果使用在网络带宽时,令牌可以看作字节;
- 如果根据请求数量来控制流量,令牌可以看作请求;
工作过程:
- 产生令牌:配置平均发送速率r,每个1/r秒,有一个令牌放入桶中;
- 令牌上限:桶满后,多余令牌丢弃;
- 消耗令牌:当一个n个字节数据包到达时,就从令牌桶中删除n个令牌,数据包再被发送到网络中;
- 突发流量:桶内最多可以存放b个令牌,允许最长b个字节的突发流量;