Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 责任链模式消息校验
  • 参数拼装
  • Kafka异步下发
  • 消息隔离
    • 消费架构
    • 如何构建messageId?
    • 如何构建businessId?
    • 如何构建GroupId?
    • 去重key
    • Kafka与线程池配合
    • 消费者
    • 动态线程池
  • 消息延时推送
  • 持久层
  • 渠道消息下发
  • 应用优雅关闭
  • 消息去重
  • 链路追踪
  • 实时流处理
  • 消息推送平台
Nreal
2023-12-04
目录

消息隔离

# 消费架构

单Topic,多Group,不同 group间消费隔离;

Kafka消费者消费步骤:

  1. 消息丢弃;
  2. 夜间屏蔽;
  3. 去重;
  4. 真正发送;

去重与发送消息都为网络IO密集型,为提高吞吐量,每个group开启一个线程池(缓冲区);

# 如何构建messageId?

消息唯一Id:

public static String generateMessageId() {
    return IdUtil.nanoId();
}
1
2
3

可以再根据TaskInfo中的字段发送渠道(sendChannel)和消息类型(msgType)拼接构成全局唯一消息ID;

# 如何构建businessId?

TaskInfoUtils类中:链路追踪使用

/**
 * 生成BusinessId
 * 模板类型+模板ID+当天日期
 * (固定16位)
 */
public static Long generateBusinessId(Long templateId, Integer templateType) {
    Integer today = Integer.valueOf(DateUtil.format(new Date(), DatePattern.PURE_DATE_PATTERN));
    return Long.valueOf(String.format("%d%s", templateType * TYPE_FLAG + templateId, today));
}
1
2
3
4
5
6
7
8
9

# 如何构建GroupId?

根据TaskInfo的消息渠道和消息类型拼接;

public static String getGroupIdByTaskInfo(TaskInfo taskInfo) {
    String channelCodeEn = EnumUtil.getEnumByCode(taskInfo.getSendChannel(), ChannelType.class).getCodeEn();
    String msgCodeEn = EnumUtil.getEnumByCode(taskInfo.getMsgType(), MessageType.class).getCodeEn();
    return channelCodeEn + "." + msgCodeEn;
}
1
2
3
4
5

# 去重key

从消费队列拉取后根据发送的消息体生成;(线程池中业务逻辑)

# Kafka与线程池配合

如果线程池中有线程可以执行,就执行Kafka拉取的消息,没有线程执行,仍阻塞队列,如果阻塞队列也满了,拉取消息的过程阻塞;

# 消费者

如何根据消息类型,找到不同的group?

消息模板有msgType字段标识当前模板属于哪种类型,根据不同的消息类型划分到对应的group;

可以使用幂等性注解,注解在consume2Send方法上,这个方法相当于是消费者到对应线程池的一个路由;拿到里面参数中的TaskInfo,获取到里面的唯一key;

@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class Receiver {
    @Autowired
    private ConsumeService consumeService;

    /**
     * 发送消息
     *
     * @param consumerRecord
     * @param topicGroupId
     */
    @KafkaListener(topics = "#{'${business.topic.name}'}", containerFactory = "filterContainerFactory")
    public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
        Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
        if (kafkaMessage.isPresent()) {

            List<TaskInfo> taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
            String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
            /**
             * 每个消费者组 只消费 他们自身关心的消息
             */
            if (topicGroupId.equals(messageGroupId)) {
                consumeService.consume2Send(taskInfoLists);
            }
        }
    }
    
    //.......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

不可能为每个group写一个消费者,消费者为多例,如何将 各GroupId绑定到消费者上?

groupIdEnhancer方法将每一个Receiver初始化的时候做了动态的切面,拿到对应的@KafkaListener注解,修改其groupId;

public class ReceiverStart {

	//......

/**
     * 获取得到所有的groupId
*/
private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();
/**
     * 下标(用于迭代groupIds位置)
*/
private static Integer index = 0;
@Autowired
private ApplicationContext context;
@Autowired
private ConsumerFactory consumerFactory;

/**
     * 给每个Receiver对象的consumer方法 @KafkaListener赋值相应的groupId
*/
@Bean
public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer() {
  return (attrs, element) -> {
      if (element instanceof Method) {
          String name = ((Method) element).getDeclaringClass().getSimpleName() + StrPool.DOT + ((Method) element).getName();
          if (RECEIVER_METHOD_NAME.equals(name)) {
              attrs.put("groupId", groupIds.get(index++));
          }
      }
      return attrs;
  };
}

/**
     * 为每个渠道不同的消息类型 创建一个Receiver对象
*/
@PostConstruct
public void init() {
  for (int i = 0; i < groupIds.size(); i++) {
      context.getBean(Receiver.class);
  }
}

//......

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

要给每个消费者创建自己独有的线程池,又不能给每个group都定义一个创建线程池的方法?

如何将各GroupId里的消息路由给各线程池?

@Override
public void consume2Send(List<TaskInfo> taskInfoLists) {
String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
for (TaskInfo taskInfo : taskInfoLists) {
  logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
  Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
  taskPendingHolder.route(topicGroupId).execute(task);
}
}
1
2
3
4
5
6
7
8
9

Task实现了Runnable接口,是线程池中具体执行任务;

public class Task implements Runnable {

    @Autowired
    private HandlerHolder handlerHolder;

    @Autowired
    private DeduplicationRuleService deduplicationRuleService;

    @Autowired
    private DiscardMessageService discardMessageService;

    @Autowired
    private ShieldService shieldService;

    private TaskInfo taskInfo;


    @Override
    public void run() {

        // 0. 丢弃消息
        if (discardMessageService.isDiscard(taskInfo)) {
            return;
        }
        // 1. 屏蔽消息
        shieldService.shield(taskInfo);

        // 2.平台通用去重
        if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
            deduplicationRuleService.duplication(taskInfo);
        }

        // 3. 真正发送消息
        if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
            handlerHolder.route(taskInfo.getSendChannel()).doHandler(taskInfo);
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Component
public class TaskPendingHolder {
/**
     * 获取得到所有的groupId
*/
private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();
@Autowired
private ThreadPoolUtils threadPoolUtils;
private Map<String, ExecutorService> holder = new HashMap<>(32);

/**
     * 给每个渠道,每种消息类型初始化一个线程池
*/
@PostConstruct
public void init() {
  /**
   采用了动态线程池
   */
  for (String groupId : groupIds) {
      DtpExecutor executor = HandlerThreadPoolConfig.getExecutor(groupId);
      threadPoolUtils.register(executor);

      holder.put(groupId, executor);
  }
}

/**
     * 得到对应的线程池
*/
public ExecutorService route(String groupId) {
  return holder.get(groupId);
}


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 动态线程池

配置:

public class HandlerThreadPoolConfig {

    private static final String PRE_FIX = "austin.";

    private HandlerThreadPoolConfig() {

    }

    /**
     * 业务:处理某个渠道的某种类型消息的线程池
     * 配置:不丢弃消息,核心线程数不会随着keepAliveTime而减少(不会被回收)
     * 动态线程池且被Spring管理:true
     *
     * @return
     */
    public static DtpExecutor getExecutor(String groupId) {
        return ThreadPoolBuilder.newBuilder()
                .threadPoolName(PRE_FIX + groupId)
                .corePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE)
                .maximumPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE)
                .keepAliveTime(ThreadPoolConstant.COMMON_KEEP_LIVE_TIME)
                .timeUnit(TimeUnit.SECONDS)
                .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
                .allowCoreThreadTimeOut(false)
                .workQueue(QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE.getName(), ThreadPoolConstant.COMMON_QUEUE_SIZE, false)
                .buildDynamic();
    }


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Kafka异步下发
消息延时推送

← Kafka异步下发 消息延时推送→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式