消息去重
# 去重参数
public class DeduplicationParam {
/**
* TaskIno信息
*/
private TaskInfo taskInfo;
/**
* 去重时间
* 单位:秒
*/
@JSONField(name = "time")
private Long deduplicationTime;
/**
* 需达到的次数去重
*/
@JSONField(name = "num")
private Integer countNum;
/**
* 标识属于哪种去重(数据埋点)
*/
private AnchorState anchorState;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 获取所有去重key
public interface LimitService {
Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param);
}
1
2
3
4
2
3
4
public abstract class AbstractLimitService implements LimitService {
/**
* 获取得到当前消息模板所有的去重Key
*/
protected List<String> deduplicationAllKey(AbstractDeduplicationService service, TaskInfo taskInfo) {
List<String> result = new ArrayList<>(taskInfo.getReceiver().size());
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(service, taskInfo, receiver);
result.add(key);
}
return result;
}
protected String deduplicationSingleKey(AbstractDeduplicationService service, TaskInfo taskInfo, String receiver) {
return service.deduplicationSingleKey(taskInfo, receiver);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 滑动窗口去重
超过阈值返回1;
--KEYS[1]: 限流 key
--ARGV[1]: 限流窗口,毫秒
--ARGV[2]: 当前时间戳(作为score)
--ARGV[3]: 阈值
--ARGV[4]: score 对应的唯一value
-- 1\. 移除开始时间窗口之前的数据
redis.call('zremrangeByScore', KEYS[1], 0, ARGV[2]-ARGV[1])
-- 2\. 统计当前元素数量
local res = redis.call('zcard', KEYS[1])
-- 3\. 是否超过阈值
if (res == nil) or (res < tonumber(ARGV[3])) then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[4])
redis.call('expire', KEYS[1], ARGV[1]/1000)
return 0
else
return 1
end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
核心:获取符合去重条件的receiver;
@Service(value = "SlideWindowLimitService")
public class SlideWindowLimitService extends AbstractLimitService {
private static final String LIMIT_TAG = "SW_";
@Autowired
private RedisUtils redisUtils;
private DefaultRedisScript<Long> redisScript;
@PostConstruct
public void init() {
redisScript = new DefaultRedisScript();
redisScript.setResultType(Long.class);
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("limit.lua")));
}
/**
* @param service 去重器对象
* @param taskInfo
* @param param 去重参数
* @return 返回不符合条件的receiver
*/
@Override
public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
long nowTime = System.currentTimeMillis();
for (String receiver : taskInfo.getReceiver()) {
String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver);
String scoreValue = String.valueOf(IdUtil.getSnowflake().nextId());
String score = String.valueOf(nowTime);
final Boolean result = redisUtils.execLimitLua(redisScript, Collections.singletonList(key),
String.valueOf(param.getDeduplicationTime() * 1000), score, String.valueOf(param.getCountNum()), scoreValue);
if (Boolean.TRUE.equals(result)) {
filterReceiver.add(receiver);
}
}
return filterReceiver;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 去重模板类
去重接口:
public interface DeduplicationService {
void deduplication(DeduplicationParam param);
}
1
2
3
2
3
去重抽象类:
核心:将去重条件的用户删除 + 构建去重key
public abstract class AbstractDeduplicationService implements DeduplicationService {
protected Integer deduplicationType;
protected LimitService limitService;
@Autowired
private DeduplicationHolder deduplicationHolder;
@Autowired
private LogUtils logUtils;
@PostConstruct
private void init() {
deduplicationHolder.putService(deduplicationType, this);
}
@Override
public void deduplication(DeduplicationParam param) {
TaskInfo taskInfo = param.getTaskInfo();
Set<String> filterReceiver = limitService.limitFilter(this, taskInfo, param);
// 剔除符合去重条件的用户
if (CollUtil.isNotEmpty(filterReceiver)) {
taskInfo.getReceiver().removeAll(filterReceiver);
logUtils.print(AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).businessId(taskInfo.getBusinessId()).ids(filterReceiver).state(param.getAnchorState().getCode()).build());
}
}
/**
* 构建去重的Key
*/
public abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 内容去重
5分钟内相同文案发给相同用户去重;
方法参数传接口实现类?
@Service
public class ContentDeduplicationService extends AbstractDeduplicationService {
@Autowired
public ContentDeduplicationService(@Qualifier("SlideWindowLimitService") LimitService limitService) {
this.limitService = limitService;
deduplicationType = DeduplicationType.CONTENT.getCode();
}
/**
* 内容去重 构建key
* key: md5(templateId + receiver + content)
*/
@Override
public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) {
return DigestUtil.md5Hex(taskInfo.getMessageTemplateId() + receiver
+ JSON.toJSONString(taskInfo.getContentModel()));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 频次去重
一天一个用户只能受到某个渠道消息N次;
@Service
public class FrequencyDeduplicationService extends AbstractDeduplicationService {
private static final String PREFIX = "FRE";
@Autowired
public FrequencyDeduplicationService(@Qualifier("SimpleLimitService") LimitService limitService) {
this.limitService = limitService;
deduplicationType = DeduplicationType.FREQUENCY.getCode();
}
/**
* 业务规则去重 构建key
* key : receiver + templateId + sendChannel
*/
@Override
public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) {
return PREFIX + StrPool.C_UNDERLINE
+ receiver + StrPool.C_UNDERLINE
+ taskInfo.getMessageTemplateId() + StrPool.C_UNDERLINE
+ taskInfo.getSendChannel();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26