过滤器链
# 规则
public class Rule implements Comparable<Rule>, Serializable {
private String id;
private String name;
private String protocol;
private Integer order;
/*后端服务id*/
private String serviceId;
/*请求前缀*/
private String prefix;
/*路径集合*/
private List<String> paths;
private Set<FilterConfig> filterConfigs =new HashSet<>();
private RetryConfig retryConfig = new RetryConfig();
private Set<FlowCtlConfig> flowCtlConfigs =new HashSet<>();
private Set<HystrixConfig> hystrixConfigs = new HashSet<>();
private AuthConfig authConfig = new AuthConfig();
......
@Data
public static class AuthConfig{
private String secretKey;
private String cookieName;
}
@Data
public static class HystrixConfig{
private String path;
private int timeoutInMilliseconds;
private int threadCoreSize;
private String fallbackResponse;
}
@Data
public static class FlowCtlConfig{
private String type;/*限流类型,path/service*/
private String value;/*限流对象的值*/
private String model;/*单机/分布式*/
private String config;/*限流规则,JSON*/
}
@Data
public static class RetryConfig{
private int times;
}
@Data
//@EqualsAndHashCode
public static class FilterConfig{
private String id;
private String config;
@Override
public boolean equals(Object o){
if (this == o) return true;
if((o== null) || getClass() != o.getClass()){
return false;
}
FilterConfig that =(FilterConfig)o;
return id.equals(that.id);
}
@Override
public int hashCode(){
return Objects.hash(id);
}
}
......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Filter接口
顶层接口:子类实现这个接口实现具体方法;
public interface Filter {
void doFilter(GatewayContext ctx) throws Exception;
default int getOrder(){
FilterAspect annotation = this.getClass().getAnnotation(FilterAspect.class);
if(annotation != null){
return annotation.order();
}
return Integer.MAX_VALUE;
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 过滤器注解
提供过滤器AOP功能,方便堆过滤器进行管理;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface FilterAspect {
String id();
String name() default "";
int order() default 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 过滤器链
@Slf4j
public class GatewayFilterChain {
private List<Filter> filters = new ArrayList<>();
public GatewayFilterChain addFilter(Filter filter){
filters.add(filter);
return this;
}
public GatewayFilterChain addFilterList(List<Filter> filter){
filters.addAll(filter);
return this;
}
public GatewayContext doFilter(GatewayContext ctx) throws Exception {
if(filters.isEmpty()){
return ctx;
}
try {
for(Filter fl: filters){
fl.doFilter(ctx);
}
}catch (Exception e){
log.error("执行过滤器发生异常,异常信息:{}",e.getMessage());
throw e;
}
return ctx;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 过滤器工厂
构建过滤器链,提供根据过滤器ID获取过滤器的方法;
public interface FilterFactory {
GatewayFilterChain buildFilterChain(GatewayContext ctx) throws Exception;
<T> T getFilterInfo(String filterId) throws Exception;
}
1
2
3
4
5
6
7
2
3
4
5
6
7
GatewayFilterChainFactory实现具体的方法,提供根据ID获取过滤器的实际方法;
根据SPI机制找到接口类,再将过滤器ID和过滤器接口缓存到本地;
@Slf4j
public class GatewayFilterChainFactory implements FilterFactory{
private static class SingletonInstance{
private static final GatewayFilterChainFactory INSTANCE = new GatewayFilterChainFactory();
}
public static GatewayFilterChainFactory getInstance(){
return SingletonInstance.INSTANCE;
}
private Map<String,Filter> processorFilterIdMap = new ConcurrentHashMap<>();
public GatewayFilterChainFactory(){
ServiceLoader<Filter> serviceLoader = ServiceLoader.load(Filter.class);
serviceLoader.stream().forEach(filterProvider -> {
Filter filter = filterProvider.get();
FilterAspect annotation = filter.getClass().getAnnotation(FilterAspect.class);
log.info("load filter success:{},{},{},{}",filter.getClass(), annotation.id(),annotation.name(),annotation.order());
if(annotation!=null){
String filterId = annotation.id();
if(StringUtils.isEmpty(filterId)){
filterId = filter.getClass().getName();
}
processorFilterIdMap.put(filterId,filter);
}
});
}
@Override
public GatewayFilterChain buildFilterChain(GatewayContext ctx) throws Exception {
GatewayFilterChain chain = new GatewayFilterChain();
List<Filter> filters = new ArrayList<>();
Rule rule = ctx.getRule();
if(rule!=null){
Set<Rule.FilterConfig> filterConfigs = rule.getFilterConfigs();
Iterator<Rule.FilterConfig> iterator = filterConfigs.iterator();
Rule.FilterConfig filterConfig;
while(iterator.hasNext()){
filterConfig = (Rule.FilterConfig)iterator.next();
if(filterConfig == null){
continue;
}
String filterId = filterConfig.getId();
if(StringUtils.isNotEmpty(filterId) && getFilterInfo(filterId)!=null){
Filter filter = getFilterInfo(filterId);
filters.add(filter);
}
}
}
filters.add(new RouterFilter());
filters.sort(Comparator.comparingInt(Filter::getOrder));
chain.addFilterList(filters);
return chain;
}
@Override
public Filter getFilterInfo(String filterId) throws Exception {
return processorFilterIdMap.get(filterId);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 使用案例
负载均衡:
@FilterAspect(id=LOAD_BALANCE_FILTER_ID,
name = LOAD_BALANCE_FILTER_NAME,
order = LOAD_BALANCE_FILTER_ORDER)
public class LoadBalanceFilter implements Filter {
//.....
}
1
2
3
4
5
6
2
3
4
5
6
# 过滤器使用
NettyCoreProcessor中使用:
@Slf4j
public class NettyCoreProcessor implements NettyProcessor {
private FilterFactory filterFactory = GatewayFilterChainFactory.getInstance();
@Override
public void process(HttpRequestWrapper wrapper) {
FullHttpRequest request = wrapper.getRequest();
ChannelHandlerContext ctx = wrapper.getCtx();
try {
GatewayContext gatewayContext = RequestHelper.doContext(request, ctx);
// route(gatewayContext);
filterFactory.buildFilterChain(gatewayContext).doFilter(gatewayContext);
} catch (BaseException e) {
log.error("process error {} {}", e.getCode().getCode(), e.getCode().getMessage());
FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(e.getCode());
doWriteAndRelease(ctx, request, httpResponse);
} catch (Throwable t) {
log.error("process unkown error", t);
FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
doWriteAndRelease(ctx, request, httpResponse);
}
}
//......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26