消费队列幂等性
# 整体逻辑
根据redis消息去重表,本质上是redis的setnx ex命令;
查询消息是否消费过,去redis中查询,如果没有消费过执行消费逻辑,消费过则写入一个预占标识,重复消息消费的时候发现有预占标识就直接返回;
消费消息的时候会捕捉异常,如果消费失败会删除这个预占标识,但是会有特殊情况,比如消费失败了,还没有删除预占标识,宕机了,重新消费的时候因为有预占标识就不会消费!
解决方法:增加一个过期时间,用两个状态位,0表示消费过程中,1表示消费完成;
过期时间设置为10分钟,消费失败会阶梯型的时间间隔发送消息重新消费,每次消息真正消费完成时,将状态为设置为1,重复消息过来时,先判断它是否存在预占标识,再判断状态为是否为1,不为1说明正在消费中,抛出异常;
# 监控消费者幂等性
幂等性处理器:
@Component
@RequiredArgsConstructor
public class MessageQueueIdempotentHandler {
private final StringRedisTemplate stringRedisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "short-link:idempotent:";
/**
* 判断当前消息是否消费过
* @param messageId 消息唯一标识
* @return 消息是否消费过
*/
public boolean isMessageProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES));
}
/**
* 判断消息消费流程是否执行完成
* @return 消息是否执行完成
*/
public boolean isAccomplish(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1");
}
/**
* 设置消息流程执行完成
*/
public void setAccomplish(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);
}
/**
* 如果消息处理遇到异常情况,删除幂等标识
*/
public void delMessageProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
stringRedisTemplate.delete(key);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
ShortLinkStatsSaveConsumer:
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
// 判断当前的这个消息流程是否执行完成
if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
return;
}
throw new ServiceException("消息未完成流程,需要消息队列重试");
}
try {
Map<String, String> producerMap = message.getValue();
String fullShortUrl = producerMap.get("fullShortUrl");
if (StrUtil.isNotBlank(fullShortUrl)) {
String gid = producerMap.get("gid");
ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
actualSaveShortLinkStats(fullShortUrl, gid, statsRecord);
}
stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
} catch (Throwable ex) {
// 某某某情况宕机了
messageQueueIdempotentHandler.delMessageProcessed(id.toString());
log.error("记录短链接监控消费异常", ex);
}
messageQueueIdempotentHandler.setAccomplish(id.toString());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 延迟队列监控消费者幂等性
ShortLinkStatsRecordDTO
中添加消息队列唯一标识;
生产者中设置唯一标识的值:
@Component
@RequiredArgsConstructor
public class DelayShortLinkStatsProducer {
private final RedissonClient redissonClient;
/**
* 发送延迟消费短链接统计
* @param statsRecord 短链接统计实体参数
*/
public void send(ShortLinkStatsRecordDTO statsRecord) {
statsRecord.setKeys(UUID.fastUUID().toString());
RBlockingDeque<ShortLinkStatsRecordDTO> blockingDeque = redissonClient.getBlockingDeque(DELAY_QUEUE_STATS_KEY);
RDelayedQueue<ShortLinkStatsRecordDTO> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(statsRecord, 5, TimeUnit.SECONDS);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
消费者:
public void onMessage() {
Executors.newSingleThreadExecutor(
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("delay_short-link_stats_consumer");
thread.setDaemon(Boolean.TRUE);
return thread;
})
.execute(() -> {
RBlockingDeque<ShortLinkStatsRecordDTO> blockingDeque = redissonClient.getBlockingDeque(DELAY_QUEUE_STATS_KEY);
RDelayedQueue<ShortLinkStatsRecordDTO> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
for (; ; ) {
try {
ShortLinkStatsRecordDTO statsRecord = delayedQueue.poll();
if (statsRecord != null) {
if (!messageQueueIdempotentHandler.isMessageProcessed(statsRecord.getKeys())) {
// 判断当前的这个消息流程是否执行完成
if (messageQueueIdempotentHandler.isAccomplish(statsRecord.getKeys())) {
return;
}
throw new ServiceException("消息未完成流程,需要消息队列重试");
}
try {
shortLinkService.shortLinkStats(null, null, statsRecord);
} catch (Throwable ex) {
messageQueueIdempotentHandler.delMessageProcessed(statsRecord.getKeys());
log.error("延迟记录短链接监控消费异常", ex);
}
messageQueueIdempotentHandler.setAccomplish(statsRecord.getKeys());
continue;
}
LockSupport.parkUntil(500);
} catch (Throwable ignored) {
}
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37