Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 如何设计一个短链系统
  • 新Get的开发技巧
  • 项目通用工具

  • 用户模块

  • 短链模块

    • 短链接分组

    • 短链接管理

    • 回收站管理

    • 短链接监控

      • 持久层
      • 基础访问监控
      • 单个短链访问监控详情
      • 访问记录监控
      • 短链接记录变更分组
      • 削峰短链接监控
      • 消费队列幂等性
        • 整体逻辑
        • 监控消费者幂等性
        • 延迟队列监控消费者幂等性
  • 流量风控
  • 如何获取用户IP?
  • SaaS短链接系统
  • 短链模块
  • 短链接监控
Nreal
2024-02-20
目录

消费队列幂等性

# 整体逻辑

根据redis消息去重表,本质上是redis的setnx ex命令;

查询消息是否消费过,去redis中查询,如果没有消费过执行消费逻辑,消费过则写入一个预占标识,重复消息消费的时候发现有预占标识就直接返回;

消费消息的时候会捕捉异常,如果消费失败会删除这个预占标识,但是会有特殊情况,比如消费失败了,还没有删除预占标识,宕机了,重新消费的时候因为有预占标识就不会消费!

解决方法:增加一个过期时间,用两个状态位,0表示消费过程中,1表示消费完成;

过期时间设置为10分钟,消费失败会阶梯型的时间间隔发送消息重新消费,每次消息真正消费完成时,将状态为设置为1,重复消息过来时,先判断它是否存在预占标识,再判断状态为是否为1,不为1说明正在消费中,抛出异常;

# 监控消费者幂等性

幂等性处理器:

@Component
@RequiredArgsConstructor
public class MessageQueueIdempotentHandler {

    private final StringRedisTemplate stringRedisTemplate;

    private static final String IDEMPOTENT_KEY_PREFIX = "short-link:idempotent:";

    /**
     * 判断当前消息是否消费过
     * @param messageId 消息唯一标识
     * @return 消息是否消费过
     */
    public boolean isMessageProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES));
    }

    /**
     * 判断消息消费流程是否执行完成
     * @return 消息是否执行完成
     */
    public boolean isAccomplish(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1");
    }

    /**
     * 设置消息流程执行完成
     */
    public void setAccomplish(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);
    }

    /**
     * 如果消息处理遇到异常情况,删除幂等标识
     */
    public void delMessageProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        stringRedisTemplate.delete(key);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

ShortLinkStatsSaveConsumer:

@Override
public void onMessage(MapRecord<String, String, String> message) {
    String stream = message.getStream();
    RecordId id = message.getId();
    if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
        // 判断当前的这个消息流程是否执行完成
        if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
            return;
        }
        throw new ServiceException("消息未完成流程,需要消息队列重试");
    }
    try {
        Map<String, String> producerMap = message.getValue();
        String fullShortUrl = producerMap.get("fullShortUrl");
        if (StrUtil.isNotBlank(fullShortUrl)) {
            String gid = producerMap.get("gid");
            ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
            actualSaveShortLinkStats(fullShortUrl, gid, statsRecord);
        }
        stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
    } catch (Throwable ex) {
        // 某某某情况宕机了
        messageQueueIdempotentHandler.delMessageProcessed(id.toString());
        log.error("记录短链接监控消费异常", ex);
    }
    messageQueueIdempotentHandler.setAccomplish(id.toString());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 延迟队列监控消费者幂等性

ShortLinkStatsRecordDTO中添加消息队列唯一标识;

生产者中设置唯一标识的值:

@Component
@RequiredArgsConstructor
public class DelayShortLinkStatsProducer {

    private final RedissonClient redissonClient;

    /**
     * 发送延迟消费短链接统计
     * @param statsRecord 短链接统计实体参数
     */
    public void send(ShortLinkStatsRecordDTO statsRecord) {
        statsRecord.setKeys(UUID.fastUUID().toString());
        RBlockingDeque<ShortLinkStatsRecordDTO> blockingDeque = redissonClient.getBlockingDeque(DELAY_QUEUE_STATS_KEY);
        RDelayedQueue<ShortLinkStatsRecordDTO> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        delayedQueue.offer(statsRecord, 5, TimeUnit.SECONDS);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

消费者:

public void onMessage() {
    Executors.newSingleThreadExecutor(
                    runnable -> {
                        Thread thread = new Thread(runnable);
                        thread.setName("delay_short-link_stats_consumer");
                        thread.setDaemon(Boolean.TRUE);
                        return thread;
                    })
            .execute(() -> {
                RBlockingDeque<ShortLinkStatsRecordDTO> blockingDeque = redissonClient.getBlockingDeque(DELAY_QUEUE_STATS_KEY);
                RDelayedQueue<ShortLinkStatsRecordDTO> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
                for (; ; ) {
                    try {
                        ShortLinkStatsRecordDTO statsRecord = delayedQueue.poll();
                        if (statsRecord != null) {
                            if (!messageQueueIdempotentHandler.isMessageProcessed(statsRecord.getKeys())) {
                                // 判断当前的这个消息流程是否执行完成
                                if (messageQueueIdempotentHandler.isAccomplish(statsRecord.getKeys())) {
                                    return;
                                }
                                throw new ServiceException("消息未完成流程,需要消息队列重试");
                            }
                            try {
                                shortLinkService.shortLinkStats(null, null, statsRecord);
                            } catch (Throwable ex) {
                                messageQueueIdempotentHandler.delMessageProcessed(statsRecord.getKeys());
                                log.error("延迟记录短链接监控消费异常", ex);
                            }
                            messageQueueIdempotentHandler.setAccomplish(statsRecord.getKeys());
                            continue;
                        }
                        LockSupport.parkUntil(500);
                    } catch (Throwable ignored) {
                    }
                }
            });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
削峰短链接监控
流量风控

← 削峰短链接监控 流量风控→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式