Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 责任链模式消息校验
  • 参数拼装
  • Kafka异步下发
    • 为什么使用Kafka
    • Kafka为什么能承载这么大的QPS?
    • 消息积压问题?
    • 消费渠道挂了消息堆积怎么处理?
    • 生产者
  • 消息隔离
  • 消息延时推送
  • 持久层
  • 渠道消息下发
  • 应用优雅关闭
  • 消息去重
  • 链路追踪
  • 实时流处理
  • 消息推送平台
Nreal
2024-03-02
目录

Kafka异步下发

# 为什么使用Kafka

消息队列使用目的:异步、解耦、削峰;

消息管理中台对外提供接口给各个业务方调用,调用接口后,不是同步下发消息,而是将消息放到消息队列上,直接返回结果给接口调用者;

好处:接口吞吐量大幅度提高,即使有大批量的消息调用接口都不会受系统影响(流量由消息队列承载,削峰);

# Kafka为什么能承载这么大的QPS?

消息队列一般都是生产—消费者模型,把生产者的数据存储起来,交给各个业务把数据读取出来;

Kafka 存储 读取 过程优化:

存储数据的为一个TOPIC,实际内部是多个Partition(队列)在处理(并行);

存储消息时,Kafka内部是顺序写磁盘的,并且使用了操作系统的缓冲区来提高性能;

读取数据也使用零拷贝 减少CPU拷贝文件的次数;

# 消息积压问题?

增加消费者,增加topic

# 消费渠道挂了消息堆积怎么处理?

  • 如果是时效性消息(验证码),直接丢弃;
  • 否则,临时扩容,将现有topic中的消息写入另一个topic(扩展更多的partition,提高并行性),开更多的消费者去消费者去消费;

# 生产者

@Slf4j
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {


    @Autowired
    private SendMqService sendMqService;

    @Value("${austin.business.topic.name}")
    private String sendMessageTopic;

    @Value("${austin.business.tagId.value}")
    private String tagId;

    @Value("${austin.mq.pipeline}")
    private String mqPipeline;

    @Override
    public void process(ProcessContext<SendTaskModel> context) {
        SendTaskModel sendTaskModel = context.getProcessModel();
        List<TaskInfo> taskInfo = sendTaskModel.getTaskInfo();
        try {
            String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
            sendMqService.send(sendMessageTopic, message, tagId);

            context.setResponse(BasicResultVO.success(taskInfo.stream().map(v -> SimpleTaskInfo.builder().businessId(v.getBusinessId()).messageId(v.getMessageId()).bizId(v.getBizId()).build()).collect(Collectors.toList())));
        } catch (Exception e) {
            context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
            log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e)
                    , JSON.toJSONString(CollUtil.getFirst(taskInfo.listIterator())));
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
参数拼装
消息隔离

← 参数拼装 消息隔离→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式