责任链模式消息校验
# 责任链容器(模板)
需要一个容器将所有责任串起来
public class ProcessTemplate {
private List<BusinessProcess> processList;
public List<BusinessProcess> getProcessList() {
return processList;
}
public void setProcessList(List<BusinessProcess> processList) {
this.processList = processList;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 责任链上下文
为什么需要上下文,再链式调用时,处理逻辑A、B、C,可能逻辑 B需要依赖逻辑 A的处理结果,需要载体记录下来;
public class ProcessContext<T extends ProcessModel> implements Serializable {
/**
* 标识责任链的code
*/
private String code;
/**
* 存储责任链上下文数据的模型
*/
private T processModel;
/**
* 责任链中断的标识
*/
private Boolean needBreak;
/**
* 流程处理的结果
*/
private BasicResultVO response;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 责任执行器
public interface BusinessProcess<T extends ProcessModel> {
void process(ProcessContext<T> context);
}
1
2
3
2
3
# 前置参数校验
public class SendTaskModel implements ProcessModel { /** * 消息模板Id */ private Long messageTemplateId; /** * 请求参数 */ private List<MessageParam> messageParamList; /** * 发送任务的信息 */ private List<TaskInfo> taskInfo; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SendPreCheckAction implements BusinessProcess<SendTaskModel> {
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();
Long messageTemplateId = sendTaskModel.getMessageTemplateId();
List<MessageParam> messageParamList = sendTaskModel.getMessageParamList();
// 1. 没有传入 消息模板Id 或者 messageParam
...
// 2. 过滤 receiver=null 的messageParam
...
// 3. 过滤 receiver 大于100的请求
...
sendTaskModel.setMessageParamList(resultMessageParamList);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 拼装参数
public class SendAssembleAction implements BusinessProcess<SendTaskModel> {
//...
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();
Long messageTemplateId = sendTaskModel.getMessageTemplateId();
try {
Optional<MessageTemplate> messageTemplate = messageTemplateDao.findById(messageTemplateId);
if (!messageTemplate.isPresent() || messageTemplate.get().getIsDeleted().equals(CommonConstant.TRUE)) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.TEMPLATE_NOT_FOUND));
return;
}
//组装 TaskInfo 任务消息
List<TaskInfo> taskInfos = assembleTaskInfo(sendTaskModel, messageTemplate.get());
sendTaskModel.setTaskInfo(taskInfos);
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("assemble task fail! templateId:{}, e:{}", messageTemplateId, Throwables.getStackTraceAsString(e));
}
}
//...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 后置参数检查
public class SendAfterCheckAction implements BusinessProcess<SendTaskModel> {
//...
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();
List<TaskInfo> taskInfo = sendTaskModel.getTaskInfo();
// 过滤掉不合法的手机号、邮件
filterIllegalReceiver(taskInfo);
if (CollUtil.isEmpty(taskInfo)) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS, "手机号或邮箱不合法, 无有效的发送任务"));
}
}
//...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 发送消息至MQ
为什么需要将 api模块的消息先发送给MQ?
直接调用下发接口服务,可能会存在超时风险,拖垮整个接口性能;某些渠道发送消息后结果是异步告知的,所以引入MQ来承载接口的流量以及做异步;
public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Autowired
private SendMqService sendMqService;
@Value("${austin.business.topic.name}")
private String sendMessageTopic;
@Value("${austin.business.tagId.value}")
private String tagId;
@Value("${austin.mq.pipeline}")
private String mqPipeline;
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();
List<TaskInfo> taskInfo = sendTaskModel.getTaskInfo();
try {
String message = JSON.toJSONString(sendTaskModel.getTaskInfo(),
new SerializerFeature[]{SerializerFeature.WriteClassName});
sendMqService.send(sendMessageTopic, message, tagId);
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e)
, JSON.toJSONString(CollUtil.getFirst(taskInfo.listIterator())));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 责任链的流程控制器
public class ProcessController {
/**
* 模板映射
*/
private Map<String, ProcessTemplate> templateConfig = null;
/**
* 执行责任链
*/
public ProcessContext process(ProcessContext context) {
/**
* 前置检查
*/
try {
preCheck(context);
} catch (ProcessException e) {
return e.getProcessContext();
}
/**
* 遍历流程节点
*/
List<BusinessProcess> processList = templateConfig.get(context.getCode()).getProcessList();
for (BusinessProcess businessProcess : processList) {
businessProcess.process(context);
if (Boolean.TRUE.equals(context.getNeedBreak())) {
break;
}
}
return context;
}
//...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 消息推送接口
public class SendRequest { /** * 执行业务类型 * send:发送消息 * recall:撤回消息 */ private String code; /** * 消息模板Id * 【必填】 */ private Long messageTemplateId; /** * 消息相关的参数 * 当业务类型为"send",必传 */ private MessageParam messageParam; /** * 需要撤回的消息messageIds (可根据发送接口返回的消息messageId进行撤回) * 【可选】 */ private List<String> recallMessageIds; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public SendResponse send(SendRequest sendRequest) {
if (ObjectUtils.isEmpty(sendRequest)) {
return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg(), null);
}
SendTaskModel sendTaskModel = SendTaskModel.builder()
.messageTemplateId(sendRequest.getMessageTemplateId())
.messageParamList(Collections.singletonList(sendRequest.getMessageParam()))
.build();
ProcessContext context = ProcessContext.builder()
.code(sendRequest.getCode())
.processModel(sendTaskModel)
.needBreak(false)
.response(BasicResultVO.success()).build();
ProcessContext process = processController.process(context);
return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg(), (List<SimpleTaskInfo>) process.getResponse().getData());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20