削峰短链接监控
# 需求背景
海量访问短链接,直接访问数据库,导致数据库负载变高,甚至宕机;
# 使用Redis Stream削峰
# 创建配置
创建Strean Key
XADD "short_link:stats-stream" * "New key" "New value"
1创建消费者组
XADD "short_link:stats-stream" * "New key" "New value"
1
# 生产者
@Component
@RequiredArgsConstructor
public class ShortLinkStatsSaveProducer {
private final StringRedisTemplate stringRedisTemplate;
@Value("${spring.data.redis.channel-topic.short-link-stats}")
private String topic;
/**
* 发送延迟消费短链接统计
*/
public void send(Map<String, String> producerMap) {
stringRedisTemplate.opsForStream().add(topic, producerMap);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 消费者
将原先shortLinkStats中代码迁移;
迁移后更改:
@Override public void shortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) { Map<String, String> producerMap = new HashMap<>(); producerMap.put("fullShortUrl", fullShortUrl); producerMap.put("gid", gid); producerMap.put("statsRecord", JSON.toJSONString(statsRecord)); shortLinkStatsSaveProducer.send(producerMap); }
1
2
3
4
5
6
7
8
@Slf4j
@Component
@RequiredArgsConstructor
public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> {
private final ShortLinkMapper shortLinkMapper;
private final ShortLinkGotoMapper shortLinkGotoMapper;
private final RedissonClient redissonClient;
private final LinkAccessStatsMapper linkAccessStatsMapper;
private final LinkLocaleStatsMapper linkLocaleStatsMapper;
private final LinkOsStatsMapper linkOsStatsMapper;
private final LinkBrowserStatsMapper linkBrowserStatsMapper;
private final LinkAccessLogsMapper linkAccessLogsMapper;
private final LinkDeviceStatsMapper linkDeviceStatsMapper;
private final LinkNetworkStatsMapper linkNetworkStatsMapper;
private final LinkStatsTodayMapper linkStatsTodayMapper;
private final DelayShortLinkStatsProducer delayShortLinkStatsProducer;
private final StringRedisTemplate stringRedisTemplate;
@Value("${short-link.stats.locale.amap-key}")
private String statsLocaleAmapKey;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
Map<String, String> producerMap = message.getValue();
String fullShortUrl = producerMap.get("fullShortUrl");
if (StrUtil.isNotBlank(fullShortUrl)) {
String gid = producerMap.get("gid");
ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
actualSaveShortLinkStats(fullShortUrl, gid, statsRecord);
}
stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
}
public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) {
fullShortUrl = Optional.ofNullable(fullShortUrl).orElse(statsRecord.getFullShortUrl());
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl));
RLock rLock = readWriteLock.readLock();
if (!rLock.tryLock()) {
delayShortLinkStatsProducer.send(statsRecord);
return;
}
try {
if (StrUtil.isBlank(gid)) {
LambdaQueryWrapper<ShortLinkGotoDO> queryWrapper = Wrappers.lambdaQuery(ShortLinkGotoDO.class)
.eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl);
ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(queryWrapper);
gid = shortLinkGotoDO.getGid();
}
int hour = DateUtil.hour(new Date(), true);
Week week = DateUtil.dayOfWeekEnum(new Date());
int weekValue = week.getIso8601Value();
LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()
.pv(1)
.uv(statsRecord.getUvFirstFlag() ? 1 : 0)
.uip(statsRecord.getUipFirstFlag() ? 1 : 0)
.hour(hour)
.weekday(weekValue)
.fullShortUrl(fullShortUrl)
.gid(gid)
.date(new Date())
.build();
linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO);
Map<String, Object> localeParamMap = new HashMap<>();
localeParamMap.put("key", statsLocaleAmapKey);
localeParamMap.put("ip", statsRecord.getRemoteAddr());
String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap);
JSONObject localeResultObj = JSON.parseObject(localeResultStr);
String infoCode = localeResultObj.getString("infocode");
String actualProvince = "未知";
String actualCity = "未知";
if (StrUtil.isNotBlank(infoCode) && StrUtil.equals(infoCode, "10000")) {
String province = localeResultObj.getString("province");
boolean unknownFlag = StrUtil.equals(province, "[]");
LinkLocaleStatsDO linkLocaleStatsDO = LinkLocaleStatsDO.builder()
.province(actualProvince = unknownFlag ? actualProvince : province)
.city(actualCity = unknownFlag ? actualCity : localeResultObj.getString("city"))
.adcode(unknownFlag ? "未知" : localeResultObj.getString("adcode"))
.cnt(1)
.fullShortUrl(fullShortUrl)
.country("中国")
.gid(gid)
.date(new Date())
.build();
linkLocaleStatsMapper.shortLinkLocaleState(linkLocaleStatsDO);
}
LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()
.os(statsRecord.getOs())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkOsStatsMapper.shortLinkOsState(linkOsStatsDO);
LinkBrowserStatsDO linkBrowserStatsDO = LinkBrowserStatsDO.builder()
.browser(statsRecord.getBrowser())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkBrowserStatsMapper.shortLinkBrowserState(linkBrowserStatsDO);
LinkDeviceStatsDO linkDeviceStatsDO = LinkDeviceStatsDO.builder()
.device(statsRecord.getDevice())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkDeviceStatsMapper.shortLinkDeviceState(linkDeviceStatsDO);
LinkNetworkStatsDO linkNetworkStatsDO = LinkNetworkStatsDO.builder()
.network(statsRecord.getNetwork())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkNetworkStatsMapper.shortLinkNetworkState(linkNetworkStatsDO);
LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()
.user(statsRecord.getUv())
.ip(statsRecord.getRemoteAddr())
.browser(statsRecord.getBrowser())
.os(statsRecord.getOs())
.network(statsRecord.getNetwork())
.device(statsRecord.getDevice())
.locale(StrUtil.join("-", "中国", actualProvince, actualCity))
.gid(gid)
.fullShortUrl(fullShortUrl)
.build();
linkAccessLogsMapper.insert(linkAccessLogsDO);
shortLinkMapper.incrementStats(gid, fullShortUrl, 1, statsRecord.getUvFirstFlag() ? 1 : 0, statsRecord.getUipFirstFlag() ? 1 : 0);
LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()
.todayPv(1)
.todayUv(statsRecord.getUvFirstFlag() ? 1 : 0)
.todayUip(statsRecord.getUipFirstFlag() ? 1 : 0)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO);
} catch (Throwable ex) {
log.error("短链接访问量统计异常", ex);
} finally {
rLock.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# 配置类
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {
private final RedisConnectionFactory redisConnectionFactory;
private final ShortLinkStatsSaveConsumer shortLinkStatsSaveConsumer;
@Value("${spring.data.redis.channel-topic.short-link-stats}")
private String topic;
@Value("${spring.data.redis.channel-topic.short-link-stats-group}")
private String group;
@Bean
public ExecutorService asyncStreamConsumer() {
AtomicInteger index = new AtomicInteger();
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(processors,
processors + processors >> 1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("stream_consumer_short-link_stats_" + index.incrementAndGet());
thread.setDaemon(true);
return thread;
}
);
}
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(ExecutorService asyncStreamConsumer) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(10)
// 执行从 Stream 拉取到消息的任务流程
.executor(asyncStreamConsumer)
// 如果没有拉取到消息,需要阻塞的时间。不能大于 ${spring.data.redis.timeout},否则会超时
.pollTimeout(Duration.ofSeconds(3))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
streamMessageListenerContainer.receiveAutoAck(Consumer.from(group, "stats-consumer"),
StreamOffset.create(topic, ReadOffset.lastConsumed()), shortLinkStatsSaveConsumer);
return streamMessageListenerContainer;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49