渠道消息下发
# channel与Handler映射
@Component
public class HandlerHolder {
private Map<Integer, Handler> handlers = new HashMap<>(128);
public void putHandler(Integer channelCode, Handler handler) {
handlers.put(channelCode, handler);
}
public Handler route(Integer channelCode) {
return handlers.get(channelCode);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# Handler
# Handler接口
public interface Handler {
void doHandler(TaskInfo taskInfo);
void recall(RecallTaskInfo recallTaskInfo);
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# BaseHandler
public abstract class BaseHandler implements Handler {
/**
* 标识渠道的Code
* 子类初始化的时候指定
*/
protected Integer channelCode;
/**
* 限流相关的参数
* 子类初始化的时候指定
*/
protected FlowControlParam flowControlParam;
@Autowired
private HandlerHolder handlerHolder;
@Autowired
private FlowControlFactory flowControlFactory;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 初始化渠道与Handler的映射关系
*/
@PostConstruct
private void init() {
handlerHolder.putHandler(channelCode, this);
}
@Override
public void doHandler(TaskInfo taskInfo) {
// 只有子类指定了限流参数,才需要限流
if (Objects.nonNull(flowControlParam)) {
flowControlFactory.flowControl(taskInfo, flowControlParam);
}
if (handler(taskInfo)) {
// 日志处理;
return;
}
// 日志处理;
}
/**
* 统一处理的handler接口
*/
public abstract boolean handler(TaskInfo taskInfo);
/**
* 将撤回的消息存储到redis
*
* @param prefix redis前缀
* @param messageTemplateId 消息模板id
* @param taskId 消息下发taskId
* @param expireTime 存储到redis的有效时间(跟对应渠道可撤回多久的消息有关系)
*/
protected void saveRecallInfo(String prefix, Long messageTemplateId, String taskId, Long expireTime) {
redisTemplate.opsForList().leftPush(prefix + messageTemplateId, taskId);
redisTemplate.opsForValue().set(prefix + taskId, taskId);
redisTemplate.expire(prefix + messageTemplateId, expireTime, TimeUnit.SECONDS);
redisTemplate.expire(prefix + taskId, expireTime, TimeUnit.SECONDS);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 邮件发送处理
public class EmailHandler extends BaseHandler implements Handler {
@Autowired
private AccountUtils accountUtils;
public EmailHandler() {
channelCode = ChannelType.EMAIL.getCode();
// 按照请求限流,默认单机 3 qps (具体数值配置在apollo动态调整)
Double rateInitValue = Double.valueOf(3);
flowControlParam = FlowControlParam.builder().rateInitValue(rateInitValue)
.rateLimitStrategy(RateLimitStrategy.REQUEST_RATE_LIMIT)
.rateLimiter(RateLimiter.create(rateInitValue)).build();
}
@Override
public boolean handler(TaskInfo taskInfo) {
EmailContentModel emailContentModel = (EmailContentModel) taskInfo.getContentModel();
MailAccount account = getAccountConfig(taskInfo.getSendAccount());
try {
List<File> files = CharSequenceUtil.isNotBlank(emailContentModel.getUrl()) ? AustinFileUtils.getRemoteUrl2File(dataPath, CharSequenceUtil.split(emailContentModel.getUrl(), StrPool.COMMA)) : null;
if (CollUtil.isEmpty(files)) {
MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true);
} else {
MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true, files.toArray(new File[files.size()]));
}
} catch (Exception e) {
log.error("EmailHandler#handler fail!{},params:{}", Throwables.getStackTraceAsString(e), taskInfo);
return false;
}
return true;
}
/**
* 获取账号信息和配置
*
* @return
*/
private MailAccount getAccountConfig(Integer sendAccount) {
}
@Override
public void recall(RecallTaskInfo recallTaskInfo) {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 下发渠道限流
# 流控参数
public class FlowControlParam {
/**
* 限流器
* 子类初始化的时候指定
*/
protected RateLimiter rateLimiter;
/**
* 限流器初始限流大小
* 子类初始化的时候指定
*/
protected Double rateInitValue;
/**
* 限流的策略
* 子类初始化的时候指定
*/
protected RateLimitStrategy rateLimitStrategy;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 限流策略
public enum RateLimitStrategy {
/**
* 根据真实请求数限流 (实际意义上的QPS)
*/
REQUEST_RATE_LIMIT(10, "根据真实请求数限流"),
/**
* 根据发送用户数限流(人数限流)
*/
SEND_USER_NUM_RATE_LIMIT(20, "根据发送用户数限流"),
;
private final Integer code;
private final String description;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 渠道流控
public interface FlowControlService {
Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam);
}
1
2
3
4
2
3
4
@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.REQUEST_RATE_LIMIT)
public class RequestRateLimitServiceImpl implements FlowControlService {
@Override
public Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter();
return rateLimiter.acquire(1);
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
流控策略注解:
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Service public @interface LocalRateLimit { RateLimitStrategy rateLimitStrategy() default RateLimitStrategy.REQUEST_RATE_LIMIT; }
1
2
3
4
5
6
7
# 流控工厂
public class FlowControlFactory implements ApplicationContextAware {
private static final String FLOW_CONTROL_KEY = "flowControlRule";
private static final String FLOW_CONTROL_PREFIX = "flow_control_";
private final Map<RateLimitStrategy, FlowControlService> flowControlServiceMap = new ConcurrentHashMap<>();
@Autowired
private ConfigService config;
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter;
Double rateInitValue = flowControlParam.getRateInitValue();
// 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准
Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel());
if (Objects.nonNull(rateLimitConfig) && !rateInitValue.equals(rateLimitConfig)) {
rateLimiter = RateLimiter.create(rateLimitConfig);
flowControlParam.setRateInitValue(rateLimitConfig);
flowControlParam.setRateLimiter(rateLimiter);
}
FlowControlService flowControlService = flowControlServiceMap.get(flowControlParam.getRateLimitStrategy());
if (Objects.isNull(flowControlService)) {
log.error("没有找到对应的单机限流策略");
return;
}
double costTime = flowControlService.flowControl(taskInfo, flowControlParam);
if (costTime > 0) {
log.info("consumer {} flow control time {}",
EnumUtil.getEnumByCode(taskInfo.getSendChannel(), ChannelType.class).getDescription(), costTime);
}
}
/**
* 得到限流值的配置
* key:flowControl value:{"flow_control_40":1}
*/
private Double getRateLimitConfig(Integer channelCode) {
String flowControlConfig = config.getProperty(FLOW_CONTROL_KEY, CommonConstant.EMPTY_JSON_OBJECT);
JSONObject jsonObject = JSON.parseObject(flowControlConfig);
if (Objects.isNull(jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode))) {
return null;
}
return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode);
}
@PostConstruct
private void init() {
Map<String, Object> serviceMap = this.applicationContext.getBeansWithAnnotation(LocalRateLimit.class);
serviceMap.forEach((name, service) -> {
if (service instanceof FlowControlService) {
LocalRateLimit localRateLimit = AopUtils.getTargetClass(service).getAnnotation(LocalRateLimit.class);
RateLimitStrategy rateLimitStrategy = localRateLimit.rateLimitStrategy();
//通常情况下 实现的限流service与rateLimitStrategy一一对应
flowControlServiceMap.put(rateLimitStrategy, (FlowControlService) service);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65