消息隔离
# 消费架构
单Topic,多Group,不同 group间消费隔离;
Kafka消费者消费步骤:
- 消息丢弃;
- 夜间屏蔽;
- 去重;
- 真正发送;
去重与发送消息都为网络IO密集型,为提高吞吐量,每个group开启一个线程池(缓冲区);
# 如何构建messageId?
消息唯一Id:
public static String generateMessageId() { return IdUtil.nanoId(); }
1
2
3可以再根据TaskInfo中的字段发送渠道(sendChannel)和消息类型(msgType)拼接构成全局唯一消息ID;
# 如何构建businessId?
TaskInfoUtils类中:链路追踪使用
/** * 生成BusinessId * 模板类型+模板ID+当天日期 * (固定16位) */ public static Long generateBusinessId(Long templateId, Integer templateType) { Integer today = Integer.valueOf(DateUtil.format(new Date(), DatePattern.PURE_DATE_PATTERN)); return Long.valueOf(String.format("%d%s", templateType * TYPE_FLAG + templateId, today)); }
1
2
3
4
5
6
7
8
9
# 如何构建GroupId?
根据TaskInfo的消息渠道和消息类型拼接;
public static String getGroupIdByTaskInfo(TaskInfo taskInfo) { String channelCodeEn = EnumUtil.getEnumByCode(taskInfo.getSendChannel(), ChannelType.class).getCodeEn(); String msgCodeEn = EnumUtil.getEnumByCode(taskInfo.getMsgType(), MessageType.class).getCodeEn(); return channelCodeEn + "." + msgCodeEn; }
1
2
3
4
5
# 去重key
从消费队列拉取后根据发送的消息体生成;(线程池中业务逻辑)
# Kafka与线程池配合
如果线程池中有线程可以执行,就执行Kafka拉取的消息,没有线程执行,仍阻塞队列,如果阻塞队列也满了,拉取消息的过程阻塞;
# 消费者
如何根据消息类型,找到不同的group?
消息模板有msgType字段标识当前模板属于哪种类型,根据不同的消息类型划分到对应的group;
可以使用幂等性注解,注解在consume2Send方法上,这个方法相当于是消费者到对应线程池的一个路由;拿到里面参数中的TaskInfo,获取到里面的唯一key;
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class Receiver {
@Autowired
private ConsumeService consumeService;
/**
* 发送消息
*
* @param consumerRecord
* @param topicGroupId
*/
@KafkaListener(topics = "#{'${business.topic.name}'}", containerFactory = "filterContainerFactory")
public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
List<TaskInfo> taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
/**
* 每个消费者组 只消费 他们自身关心的消息
*/
if (topicGroupId.equals(messageGroupId)) {
consumeService.consume2Send(taskInfoLists);
}
}
}
//.......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
不可能为每个group写一个消费者,消费者为多例,如何将 各GroupId绑定到消费者上?
groupIdEnhancer方法将每一个Receiver初始化的时候做了动态的切面,拿到对应的@KafkaListener注解,修改其groupId;
public class ReceiverStart { //...... /** * 获取得到所有的groupId */ private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds(); /** * 下标(用于迭代groupIds位置) */ private static Integer index = 0; @Autowired private ApplicationContext context; @Autowired private ConsumerFactory consumerFactory; /** * 给每个Receiver对象的consumer方法 @KafkaListener赋值相应的groupId */ @Bean public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer() { return (attrs, element) -> { if (element instanceof Method) { String name = ((Method) element).getDeclaringClass().getSimpleName() + StrPool.DOT + ((Method) element).getName(); if (RECEIVER_METHOD_NAME.equals(name)) { attrs.put("groupId", groupIds.get(index++)); } } return attrs; }; } /** * 为每个渠道不同的消息类型 创建一个Receiver对象 */ @PostConstruct public void init() { for (int i = 0; i < groupIds.size(); i++) { context.getBean(Receiver.class); } } //...... }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46要给每个消费者创建自己独有的线程池,又不能给每个group都定义一个创建线程池的方法?
如何将各GroupId里的消息路由给各线程池?
@Override public void consume2Send(List<TaskInfo> taskInfoLists) { String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); for (TaskInfo taskInfo : taskInfoLists) { logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); Task task = context.getBean(Task.class).setTaskInfo(taskInfo); taskPendingHolder.route(topicGroupId).execute(task); } }
1
2
3
4
5
6
7
8
9Task实现了Runnable接口,是线程池中具体执行任务;
public class Task implements Runnable { @Autowired private HandlerHolder handlerHolder; @Autowired private DeduplicationRuleService deduplicationRuleService; @Autowired private DiscardMessageService discardMessageService; @Autowired private ShieldService shieldService; private TaskInfo taskInfo; @Override public void run() { // 0. 丢弃消息 if (discardMessageService.isDiscard(taskInfo)) { return; } // 1. 屏蔽消息 shieldService.shield(taskInfo); // 2.平台通用去重 if (CollUtil.isNotEmpty(taskInfo.getReceiver())) { deduplicationRuleService.duplication(taskInfo); } // 3. 真正发送消息 if (CollUtil.isNotEmpty(taskInfo.getReceiver())) { handlerHolder.route(taskInfo.getSendChannel()).doHandler(taskInfo); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39@Component public class TaskPendingHolder { /** * 获取得到所有的groupId */ private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds(); @Autowired private ThreadPoolUtils threadPoolUtils; private Map<String, ExecutorService> holder = new HashMap<>(32); /** * 给每个渠道,每种消息类型初始化一个线程池 */ @PostConstruct public void init() { /** 采用了动态线程池 */ for (String groupId : groupIds) { DtpExecutor executor = HandlerThreadPoolConfig.getExecutor(groupId); threadPoolUtils.register(executor); holder.put(groupId, executor); } } /** * 得到对应的线程池 */ public ExecutorService route(String groupId) { return holder.get(groupId); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 动态线程池
配置:
public class HandlerThreadPoolConfig {
private static final String PRE_FIX = "austin.";
private HandlerThreadPoolConfig() {
}
/**
* 业务:处理某个渠道的某种类型消息的线程池
* 配置:不丢弃消息,核心线程数不会随着keepAliveTime而减少(不会被回收)
* 动态线程池且被Spring管理:true
*
* @return
*/
public static DtpExecutor getExecutor(String groupId) {
return ThreadPoolBuilder.newBuilder()
.threadPoolName(PRE_FIX + groupId)
.corePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE)
.maximumPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE)
.keepAliveTime(ThreadPoolConstant.COMMON_KEEP_LIVE_TIME)
.timeUnit(TimeUnit.SECONDS)
.rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
.allowCoreThreadTimeOut(false)
.workQueue(QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE.getName(), ThreadPoolConstant.COMMON_QUEUE_SIZE, false)
.buildDynamic();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30