 过滤器链
过滤器链
  # 规则
public class Rule implements Comparable<Rule>, Serializable {
    private String id;
    private String name;
    private String protocol;
    private Integer order;
    /*后端服务id*/
    private String serviceId;
    /*请求前缀*/
    private String prefix;
    /*路径集合*/
    private List<String> paths;
    private Set<FilterConfig> filterConfigs =new HashSet<>();
    private RetryConfig retryConfig = new RetryConfig();
    private Set<FlowCtlConfig> flowCtlConfigs =new HashSet<>();
    private Set<HystrixConfig> hystrixConfigs = new HashSet<>();
    private AuthConfig authConfig = new AuthConfig();
    
    ......
        
    @Data
    public static class AuthConfig{
        private String secretKey;
        private String cookieName;
    }
    @Data
    public static class HystrixConfig{
        private String path;
        private int timeoutInMilliseconds;
        private int threadCoreSize;
        private String  fallbackResponse;
    }
    @Data
    public static class FlowCtlConfig{
        private String type;/*限流类型,path/service*/
        private String value;/*限流对象的值*/
        private String model;/*单机/分布式*/
        private String config;/*限流规则,JSON*/
    }
    @Data
    public static class RetryConfig{
        private int times;
    }
    
    @Data
    //@EqualsAndHashCode
    public static class FilterConfig{
        private String id;
        private String config;
        @Override
        public  boolean equals(Object o){
            if (this == o) return  true;
            if((o== null) || getClass() != o.getClass()){
                return false;
            }
            FilterConfig that =(FilterConfig)o;
            return id.equals(that.id);
        }
        @Override
        public int hashCode(){
            return Objects.hash(id);
        }
    }
    
    ......
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Filter接口
顶层接口:子类实现这个接口实现具体方法;
public interface Filter {
    void doFilter(GatewayContext ctx) throws Exception;
    default int getOrder(){
        FilterAspect annotation = this.getClass().getAnnotation(FilterAspect.class);
        if(annotation != null){
            return annotation.order();
        }
        return Integer.MAX_VALUE;
    };
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 过滤器注解
提供过滤器AOP功能,方便堆过滤器进行管理;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface FilterAspect {
    String id();
    String name() default "";
    int order() default 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 过滤器链
@Slf4j
public class GatewayFilterChain {
    private List<Filter> filters = new ArrayList<>();
    public GatewayFilterChain addFilter(Filter filter){
        filters.add(filter);
        return this;
    }
    public GatewayFilterChain addFilterList(List<Filter> filter){
        filters.addAll(filter);
        return this;
    }
    public GatewayContext doFilter(GatewayContext ctx) throws Exception {
        if(filters.isEmpty()){
            return ctx;
        }
        try {
            for(Filter fl: filters){
                fl.doFilter(ctx);
            }
        }catch (Exception e){
            log.error("执行过滤器发生异常,异常信息:{}",e.getMessage());
            throw e;
        }
        return ctx;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 过滤器工厂
构建过滤器链,提供根据过滤器ID获取过滤器的方法;
public interface FilterFactory {
    GatewayFilterChain buildFilterChain(GatewayContext ctx) throws Exception;
    <T> T getFilterInfo(String filterId) throws Exception;
}
1
2
3
4
5
6
7
2
3
4
5
6
7
GatewayFilterChainFactory实现具体的方法,提供根据ID获取过滤器的实际方法;
根据SPI机制找到接口类,再将过滤器ID和过滤器接口缓存到本地;
@Slf4j
public class GatewayFilterChainFactory implements FilterFactory{
    private static class SingletonInstance{
        private static final GatewayFilterChainFactory INSTANCE = new GatewayFilterChainFactory();
    }
    public static GatewayFilterChainFactory getInstance(){
        return SingletonInstance.INSTANCE;
    }
    private Map<String,Filter> processorFilterIdMap = new ConcurrentHashMap<>();
    public GatewayFilterChainFactory(){
        ServiceLoader<Filter> serviceLoader = ServiceLoader.load(Filter.class);
        serviceLoader.stream().forEach(filterProvider -> {
            Filter filter = filterProvider.get();
            FilterAspect annotation = filter.getClass().getAnnotation(FilterAspect.class);
            log.info("load filter success:{},{},{},{}",filter.getClass(), annotation.id(),annotation.name(),annotation.order());
            if(annotation!=null){
                String filterId = annotation.id();
                if(StringUtils.isEmpty(filterId)){
                    filterId = filter.getClass().getName();
                }
                processorFilterIdMap.put(filterId,filter);
            }
        });
    }
    @Override
    public GatewayFilterChain buildFilterChain(GatewayContext ctx) throws Exception {
        GatewayFilterChain chain = new GatewayFilterChain();
        List<Filter> filters = new ArrayList<>();
        Rule rule = ctx.getRule();
        if(rule!=null){
            Set<Rule.FilterConfig> filterConfigs = rule.getFilterConfigs();
            Iterator<Rule.FilterConfig> iterator = filterConfigs.iterator();
            Rule.FilterConfig filterConfig;
            while(iterator.hasNext()){
                filterConfig = (Rule.FilterConfig)iterator.next();
                if(filterConfig == null){
                    continue;
                }
                String filterId = filterConfig.getId();
                if(StringUtils.isNotEmpty(filterId) && getFilterInfo(filterId)!=null){
                    Filter filter = getFilterInfo(filterId);
                    filters.add(filter);
                }
            }
        }
        filters.add(new RouterFilter());
        filters.sort(Comparator.comparingInt(Filter::getOrder));
        chain.addFilterList(filters);
        return chain;
    }
    @Override
    public Filter getFilterInfo(String filterId) throws Exception {
        return processorFilterIdMap.get(filterId);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 使用案例
负载均衡:
@FilterAspect(id=LOAD_BALANCE_FILTER_ID,
        name = LOAD_BALANCE_FILTER_NAME,
        order = LOAD_BALANCE_FILTER_ORDER)
public class LoadBalanceFilter implements Filter {
 	//.....   
}
1
2
3
4
5
6
2
3
4
5
6
# 过滤器使用
NettyCoreProcessor中使用:
@Slf4j
public class NettyCoreProcessor implements NettyProcessor {
    private FilterFactory filterFactory = GatewayFilterChainFactory.getInstance();
    @Override
    public void process(HttpRequestWrapper wrapper) {
        FullHttpRequest request = wrapper.getRequest();
        ChannelHandlerContext ctx = wrapper.getCtx();
        try {
            GatewayContext gatewayContext = RequestHelper.doContext(request, ctx);
//            route(gatewayContext);
            filterFactory.buildFilterChain(gatewayContext).doFilter(gatewayContext);
        } catch (BaseException e) {
            log.error("process error {} {}", e.getCode().getCode(), e.getCode().getMessage());
            FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(e.getCode());
            doWriteAndRelease(ctx, request, httpResponse);
        } catch (Throwable t) {
            log.error("process unkown error", t);
            FullHttpResponse httpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
            doWriteAndRelease(ctx, request, httpResponse);
        }
    }
 	//......   
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
