Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 责任链模式消息校验
  • 参数拼装
  • Kafka异步下发
  • 消息隔离
  • 消息延时推送
  • 持久层
  • 渠道消息下发
  • 应用优雅关闭
  • 消息去重
  • 链路追踪
  • 实时流处理
    • 实时消息存入redis
    • Flink启动类
  • 消息推送平台
Nreal
2024-04-06
目录

实时流处理

# 实时消息存入redis

public class AustinSink implements SinkFunction<AnchorInfo> {

    @Override
    public void invoke(AnchorInfo anchorInfo, Context context) throws Exception {
        realTimeData(anchorInfo);
    }

    /**
     * 实时数据存入Redis
     * 1.用户维度(查看用户当天收到消息的链路详情),数量级大,只保留当天
     * 2.消息模板维度(查看消息模板整体下发情况),数量级小,保留30天
     */
    private void realTimeData(AnchorInfo info) {
        try {
            LettuceRedisUtils.pipeline(redisAsyncCommands -> {
                List<RedisFuture<?>> redisFutures = new ArrayList<>();

                /**
                 * 0.构建messageId维度的链路信息 数据结构list:{key,list}
                 * key:Austin:MessageId:{messageId},listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
                 */
                String redisMessageKey = CharSequenceUtil.join(StrPool.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, info.getMessageId());
                SimpleAnchorInfo messageAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
                redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(), JSON.toJSONString(messageAnchorInfo).getBytes()));
                redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(), Duration.ofDays(3).toMillis() / 1000));

                /**
                 * 1.构建userId维度的链路信息 数据结构list:{key,list}
                 * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
                 */
                SimpleAnchorInfo userAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
                for (String id : info.getIds()) {
                    redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(userAnchorInfo).getBytes()));
                    redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
                }

                /**
                 * 2.构建消息模板维度的链路信息 数据结构hash:{key,hash}
                 * key:businessId,hashValue:{state,stateCount}
                 */
                redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
                        String.valueOf(info.getState()).getBytes(), info.getIds().size()));
                redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(),
                        ((DateUtil.offsetDay(new Date(), 30).getTime()) / 1000) - DateUtil.currentSeconds()));

                return redisFutures;
            });

        } catch (Exception e) {
            log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e));
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# Flink启动类

@Slf4j
public class AustinBootStrap {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         * 1.获取KafkaConsumer
         */
        KafkaSource<String> kafkaConsumer = MessageQueueUtils.getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER);
        DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), AustinFlinkConstant.SOURCE_NAME);


        /**
         * 2. 数据转换处理
         */
        SingleOutputStreamOperator<AnchorInfo> dataStream = kafkaSource.flatMap(new AustinFlatMapFunction()).name(AustinFlinkConstant.FUNCTION_NAME);

        /**
         * 3. 将实时数据多维度写入Redis(已实现),离线数据写入hive(未实现)
         */
        dataStream.addSink(new AustinSink()).name(AustinFlinkConstant.SINK_NAME);
        env.execute(AustinFlinkConstant.JOB_NAME);

    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
链路追踪

← 链路追踪

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式