实时流处理
# 实时消息存入redis
public class AustinSink implements SinkFunction<AnchorInfo> {
@Override
public void invoke(AnchorInfo anchorInfo, Context context) throws Exception {
realTimeData(anchorInfo);
}
/**
* 实时数据存入Redis
* 1.用户维度(查看用户当天收到消息的链路详情),数量级大,只保留当天
* 2.消息模板维度(查看消息模板整体下发情况),数量级小,保留30天
*/
private void realTimeData(AnchorInfo info) {
try {
LettuceRedisUtils.pipeline(redisAsyncCommands -> {
List<RedisFuture<?>> redisFutures = new ArrayList<>();
/**
* 0.构建messageId维度的链路信息 数据结构list:{key,list}
* key:Austin:MessageId:{messageId},listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
*/
String redisMessageKey = CharSequenceUtil.join(StrPool.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, info.getMessageId());
SimpleAnchorInfo messageAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(), JSON.toJSONString(messageAnchorInfo).getBytes()));
redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(), Duration.ofDays(3).toMillis() / 1000));
/**
* 1.构建userId维度的链路信息 数据结构list:{key,list}
* key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
*/
SimpleAnchorInfo userAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
for (String id : info.getIds()) {
redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(userAnchorInfo).getBytes()));
redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
}
/**
* 2.构建消息模板维度的链路信息 数据结构hash:{key,hash}
* key:businessId,hashValue:{state,stateCount}
*/
redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
String.valueOf(info.getState()).getBytes(), info.getIds().size()));
redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(),
((DateUtil.offsetDay(new Date(), 30).getTime()) / 1000) - DateUtil.currentSeconds()));
return redisFutures;
});
} catch (Exception e) {
log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Flink启动类
@Slf4j
public class AustinBootStrap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 1.获取KafkaConsumer
*/
KafkaSource<String> kafkaConsumer = MessageQueueUtils.getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER);
DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), AustinFlinkConstant.SOURCE_NAME);
/**
* 2. 数据转换处理
*/
SingleOutputStreamOperator<AnchorInfo> dataStream = kafkaSource.flatMap(new AustinFlatMapFunction()).name(AustinFlinkConstant.FUNCTION_NAME);
/**
* 3. 将实时数据多维度写入Redis(已实现),离线数据写入hive(未实现)
*/
dataStream.addSink(new AustinSink()).name(AustinFlinkConstant.SINK_NAME);
env.execute(AustinFlinkConstant.JOB_NAME);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27