Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 责任链模式消息校验
  • 参数拼装
    • 任务消息
      • 发送消息任务模型
      • TaskInfo
    • 参数拼装
    • 不同渠道的消息模板怎么管理?
  • Kafka异步下发
  • 消息隔离
  • 消息延时推送
  • 持久层
  • 渠道消息下发
  • 应用优雅关闭
  • 消息去重
  • 链路追踪
  • 实时流处理
  • 消息推送平台
Nreal
2023-12-02
目录

参数拼装

# 任务消息

# 发送消息任务模型

public class SendTaskModel implements ProcessModel {

    /**
     * 消息模板Id
     */
    private Long messageTemplateId;

    /**
     * 请求参数
     */
    private List<MessageParam> messageParamList;

    /**
     * 发送任务的信息
     */
    private List<TaskInfo> taskInfo;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# TaskInfo

public class TaskInfo implements Serializable, ProcessModel {

    /**
     * 业务消息发送Id, 用于链路追踪, 若不存在, 则使用 messageId
     */
    private String bizId;

    /**
     * 消息唯一Id(数据追踪使用)
     * 生成逻辑参考 TaskInfoUtils
     */
    private String messageId;

    /**
     * 消息模板Id
     */
    private Long messageTemplateId;

    /**
     * 业务Id(数据追踪使用)
     * 生成逻辑参考 TaskInfoUtils
     */
    private Long businessId;

    /**
     * 接收者
     */
    private Set<String> receiver;

    /**
     * 发送的Id类型
     */
    private Integer idType;

    /**
     * 发送渠道
     */
    private Integer sendChannel;

    /**
     * 模板类型
     */
    private Integer templateType;

    /**
     * 消息类型
     */
    private Integer msgType;

    /**
     * 屏蔽类型
     */
    private Integer shieldType;

    /**
     * 发送文案模型
     * message_template表存储的content是JSON(所有内容都会塞进去)
     * 不同的渠道要发送的内容不一样(比如发push会有img,而短信没有)
     * 所以会有ContentModel
     */
    private ContentModel contentModel;

    /**
     * 发送账号(邮件下可有多个发送账号、短信可有多个发送账号..)
     */
    private Integer sendAccount;


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

1.为何不直接用消息模板?

TaskInfo基于模板,并添加了平台性字段(messageId,businessId),解析出用户设置的模板而逍遥发送的真实内容;

public class MessageTemplate implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    //......

    /**
     * 发送渠道
     */
    private Integer sendChannel;

    /**
     * 模板类型
     */
    private Integer templateType;

    /**
     * 屏蔽类型
     */
    private Integer shieldType;

    /**
     * 消息类型,营销类型,通知类型
     */
    private Integer msgType;

    /**
     * 推送消息的时间
     * 0:立即发送
     * else:crontab 表达式
     */
    private String expectPushTime;

    /**
     * 消息内容  {$var} 为占位符
     */
    private String msgContent;

    /**
     * 发送账号(邮件下可有多个发送账号、短信可有多个发送账号..)
     */
    private Integer sendAccount;


	//......

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

消息模板中的msgContent字段存入数据库为JSON格式,不同渠道的该字段结构不同:

  • 短信:{"content":"","url":""}
  • 邮件:{"content":"","subTitle":""}
  • Push:{"content":"","subTitle":"","phoneImgUrl":""}
  • 小程序:{"content":"","pagePath":"" .......}

将该字段抽象为ContentModel,不同渠道实现不同的ContentModel;

2.消息唯一id,业务id如何生成?

public class TaskInfoUtils {
    private static final int TYPE_FLAG = 1000000;
    private static final String CODE = "track_code_bid";
    private TaskInfoUtils() {
    }

    /**
     * 生成任务唯一Id
     *
     * @return
     */
    public static String generateMessageId() {
        return IdUtil.nanoId();
    }

    /**
     * 生成BusinessId
     * 模板类型+模板ID+当天日期
     * (固定16位)
     */
    public static Long generateBusinessId(Long templateId, Integer templateType) {
        Integer today = Integer.valueOf(DateUtil.format(new Date(), DatePattern.PURE_DATE_PATTERN));
        return Long.valueOf(String.format("%d%s", templateType * TYPE_FLAG + templateId, today));
    }

    /**
     * 第二到8位为MessageTemplateId 切割出模板ID
     */
    public static Long getMessageTemplateIdFromBusinessId(Long businessId) {
        return Long.valueOf(String.valueOf(businessId).substring(1, 8));
    }

    /**
     * 从businessId切割出日期
     */
    public static Long getDateFromBusinessId(Long businessId) {
        return Long.valueOf(String.valueOf(businessId).substring(8));
    }


    /**
     * 对url添加平台参数(用于追踪数据)
     */
    public static String generateUrl(String url, Long templateId, Integer templateType) {
        url = url.trim();
        Long businessId = generateBusinessId(templateId, templateType);
        if (url.indexOf(CommonConstant.QM) == -1) {
            return url + CommonConstant.QM_STRING + CODE + CommonConstant.EQUAL_STRING + businessId;
        } else {
            return url + CommonConstant.AND_STRING + CODE + CommonConstant.EQUAL_STRING + businessId;
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# 参数拼装

责任链设计模型中的拼装参数执行器:

public class SendAssembleAction implements BusinessProcess<SendTaskModel> {

    //...
    @Override
    public void process(ProcessContext<SendTaskModel> context) {
        SendTaskModel sendTaskModel = context.getProcessModel();
        Long messageTemplateId = sendTaskModel.getMessageTemplateId();

        try {
            Optional<MessageTemplate> messageTemplate = messageTemplateDao.findById(messageTemplateId);
            if (!messageTemplate.isPresent() || messageTemplate.get().getIsDeleted().equals(CommonConstant.TRUE)) {
                context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.TEMPLATE_NOT_FOUND));
                return;
            }
            //组装 TaskInfo 任务消息
            List<TaskInfo> taskInfos = assembleTaskInfo(sendTaskModel, messageTemplate.get());
            sendTaskModel.setTaskInfo(taskInfos);
        } catch (Exception e) {
            context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
        }

    }
    //...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

组装TaskInfo时候利用反射进行映射,替换占位符:

/**
 * 获取 contentModel,替换模板msgContent中占位符信息
 */
private static ContentModel getContentModelValue(MessageTemplate messageTemplate, MessageParam messageParam) {

    // 得到真正的ContentModel 类型
    Integer sendChannel = messageTemplate.getSendChannel();
    Class<? extends ContentModel> contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);

    // 得到模板的 msgContent 和 入参
    Map<String, String> variables = messageParam.getVariables();
    JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());


    // 通过反射 组装出 contentModel
    Field[] fields = ReflectUtil.getFields(contentModelClass);
    ContentModel contentModel = ReflectUtil.newInstance(contentModelClass);
    for (Field field : fields) {
        String originValue = jsonObject.getString(field.getName());

        if (CharSequenceUtil.isNotBlank(originValue)) {
            String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables);
            Object resultObj = JSONUtil.isJsonObj(resultValue) ? JSONUtil.toBean(resultValue, field.getType()) : resultValue;
            ReflectUtil.setFieldValue(contentModel, field, resultObj);
        }
    }
    return contentModel;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

TaskInfo用ContentModel存储内容模型,在序列化JSON时需要把“类信息”写进去,不然反序列的时候拿不到子类的数据:

public class SendMqAction implements BusinessProcess<SendTaskModel> {
	// ......
    @Override
    public void process(ProcessContext<SendTaskModel> context) {
        SendTaskModel sendTaskModel = context.getProcessModel();
        List<TaskInfo> taskInfo = sendTaskModel.getTaskInfo();
        try {
            //反序列化将子类信息写进去
            String message = JSON.toJSONString(sendTaskModel.getTaskInfo(),
                    new SerializerFeature[]{SerializerFeature.WriteClassName});
            sendMqService.send(sendMessageTopic, message, tagId);
        } catch (Exception e) {
			//......
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 不同渠道的消息模板怎么管理?

把所有发送消息的元信息固化到一个模板里;

责任链模式消息校验
Kafka异步下发

← 责任链模式消息校验 Kafka异步下发→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式