Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • Mq选型
  • Kafka

    • Kafka入门
      • docker中启动kafka
      • 创建topic
      • 发送消息
      • 监听消息
      • SpringBoot整合
        • 生产者
        • 消费者
        • 测试
      • kafka为什么不支持读写分离?
      • 消息回溯
      • Zookeeper作用
      • 消息队列的持久化
      • 消息有序性
      • 消息不丢失
      • 消费失败是否会影响其它消费者?
      • 延时队列
    • Kafka之SpringBoot整合
    • Kafka数据模型
    • Kafka副本机制
  • RocketMq

  • 可靠消息方案架构
  • 消息中间件
  • Kafka
Nreal
2024-01-01
目录

Kafka入门

# docker中启动kafka

默认启动zookeeper

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.30.131 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
1

# 创建topic

[root@nrStudy ~]# docker exec kafka \
> kafka-topics.sh \
> --create --topic hello \
> --partitions 1 \
> --zookeeper zookeeper:2181 \
> --replication-factor 1
Created topic hello.
1
2
3
4
5
6
7

查看topic

[root@nrStudy ~]# docker exec kafka kafka-topics.sh --list --zookeeper zookeeper:2181 hello
hello
1
2

# 发送消息

docker exec kafka kafka-console-producer.sh --broker-list localhost:9092 --topic hello
1

# 监听消息

docker exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning
1

# SpringBoot整合

项目地址:https://github.com/hedsay/Kafka_learning

# 生产者

@Component
public class KafkaProducer {

    private Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public static final String TOPIC_TEST = "hello";

    public static final String TOPIC_GROUP = "test-consumer-group";

    public void send(Object obj) {
        String obj2String = JSON.toJSONString(obj);
        logger.info("准备发送消息为:{}", obj2String);

        // 发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                logger.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                logger.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 消费者

一个消费者能消费多个partition;

消费者组中消费者数量不能比一个topic中的partition数量多,否则多出来的消费者消费不到消息;

设置消费者组,消费指定topic:

@Component
public class KafkaConsumer {

    private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP)
    public void topicTest(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional<?> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();// 手动提交offset
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

设置消费者组,多topic,指定分区,指定偏移量消费 及 设置消费者个数;

@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100"))}
,concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数
public void listenGroup(ConsumerRecord<String, String> record,Acknowledgment ack) {
    String value = record.value();
    System.out.println(value);
    System.out.println(record);
    //手动提交offset
    ack.acknowledge();
}
1
2
3
4
5
6
7
8
9
10
11

poll过程:

消费者建立与broker之间长连接,开始poll消息;默认一次poll 500条消息,如果两次poll时间超过30s时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。

如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次⻓轮询结束。

kafka会发送 心跳检测,如果超过 10 秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。

# 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

    private Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);

    @Resource
    private KafkaProducer kafkaProducer;

    @Test
    public void test_send() throws InterruptedException {
        // 循环发送消息
        while (true) {
            kafkaProducer.send("hello,Nreal");
            Thread.sleep(3500);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# kafka为什么不支持读写分离?

生产者写入消息、消费者读取消息的操作都是与Leader副本进行交互的,从而实现的是一种主写主读的生产消费模型;

主写从读2个缺点:

  1. 数据不一致;

  2. 延时问题

    Redis主从经历:网络-主节点内存-网络-从节点内存;

    Kafka主从经历:网络-主节点内存-主节点磁盘-网络-从节点内存-从节点磁盘;

# 消息回溯

如果消费者组中的某个消费者挂了,挂掉的消费者所消费的分区可能就由存活的消费者消费,存活的消费者需要知道挂掉的消费者消费到哪了;

Kafka用offset表示消费者的消费进度,之前版本这个offset由Zookeeper来管理,但是Zookeeper不适合大量的删改操作,于是把offset在broker以内部topic(_consumer_offsets)方式保存;

消费者消费的时候,都会提交这个offset;

# Zookeeper作用

  1. Broker注册;

  2. Topic注册;

  3. 负载均衡;

    尽可能将Partition分布到不同的Broker服务器上去;

Kafka 2.8之后,引入了基于Raft协议的KRaft模式,不再依赖Zookeeper;

# 消息队列的持久化

将partition以消息日志的方式存储起来,通过 顺序存储 和 缓存(等到一定时间)才真正把数据写到磁盘,提高速度;

# 消息有序性

  • 单个partition的写入是有顺序的,保证全局有序,只能写入一个partition中,如果消费有序,消费者也只能有一个;
  • 发送消息时,可以指定topic,partition,key,data 4个参数,同一个key的消息可以保证只发送到一个partition;

# 消息不丢失

  1. Producer重试机制;

    send方法是异步的,通过get()方法获取调用结果,如果放松失败就重试;建议写成回调函数形式;

  2. 消费者拿到消息后会提交offset;

    消费者提交offset后,宕机没有消费成功?

    关闭自动提交,每次真正消费完提交offset;但这又出现重复消费问题,消费完还没有提交offset宕机,再消费一次提交offset;消费者需要幂等性处理;

  3. 多副本机制;

    Kafka为partition引入多副本机制,如果leader副本所在的broker宕机,就要从follower副本中重新选一个leader,但是这个follower副本还没有同步完,造成消息丢失?

    设置acks=all,acks是Producer的一个参数;设置为all,表示所有副本全部收到生产者的消息时,生产者才会接收到来自服务器的响应;

    或者设置replication.factor>=3,保证每个分区至少3个副本同步完消息;

# 消费失败是否会影响其它消费者?

消费失败的消息会进retry的topic,不影响后面的消息消费;

# 延时队列

在订单创建成功后如果超过 30 分钟没有付款,则需要取消订单,此时可用延时队列来实现;

  • 创建多个topic,每个topic表示延时的间隔;

    topic_30m: 延时 30 分钟执行的队列

  • 消息发送者发送消息到相应的topic,并带上消息的发送时间;

  • 消费者订阅相应的topic,消费时轮询消费整个topic中的消息;

    1. 如果消息的发送时间,和消费的当前时间没有超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费;
    2. 下次继续消费该offset处的消息,判断时间是否已满足预设值;
Mq选型
Kafka之SpringBoot整合

← Mq选型 Kafka之SpringBoot整合→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式