Kafka异步下发
# 为什么使用Kafka
消息队列使用目的:异步、解耦、削峰;
消息管理中台对外提供接口给各个业务方调用,调用接口后,不是同步下发消息,而是将消息放到消息队列上,直接返回结果给接口调用者;
好处:接口吞吐量大幅度提高,即使有大批量的消息调用接口都不会受系统影响(流量由消息队列承载,削峰);
# Kafka为什么能承载这么大的QPS?
消息队列一般都是生产—消费者模型,把生产者的数据存储起来,交给各个业务把数据读取出来;
Kafka 存储 读取 过程优化:
存储数据的为一个TOPIC,实际内部是多个Partition(队列)在处理(并行);
存储消息时,Kafka内部是顺序写磁盘的,并且使用了操作系统的缓冲区来提高性能;
读取数据也使用零拷贝 减少CPU拷贝文件的次数;
# 消息积压问题?
增加消费者,增加topic
# 消费渠道挂了消息堆积怎么处理?
- 如果是时效性消息(验证码),直接丢弃;
- 否则,临时扩容,将现有topic中的消息写入另一个topic(扩展更多的partition,提高并行性),开更多的消费者去消费者去消费;
# 生产者
@Slf4j
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Autowired
private SendMqService sendMqService;
@Value("${austin.business.topic.name}")
private String sendMessageTopic;
@Value("${austin.business.tagId.value}")
private String tagId;
@Value("${austin.mq.pipeline}")
private String mqPipeline;
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();
List<TaskInfo> taskInfo = sendTaskModel.getTaskInfo();
try {
String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
sendMqService.send(sendMessageTopic, message, tagId);
context.setResponse(BasicResultVO.success(taskInfo.stream().map(v -> SimpleTaskInfo.builder().businessId(v.getBusinessId()).messageId(v.getMessageId()).bizId(v.getBizId()).build()).collect(Collectors.toList())));
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e)
, JSON.toJSONString(CollUtil.getFirst(taskInfo.listIterator())));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34