Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • Mq选型
  • Kafka

    • Kafka入门
    • Kafka之SpringBoot整合
      • 项目搭建
      • 获取发送结果
      • Kafka事务
      • 监听多个topic
      • 指定发送主题
      • 消息应答
      • 消息丢失
        • 偏移量
        • 模拟
        • 自动提交出现消息丢失的原因
        • 手动提交
      • 消息确认
      • 动态创建消费者
      • 动态删除Topic
      • 其它
    • Kafka数据模型
    • Kafka副本机制
  • RocketMq

  • 可靠消息方案架构
  • 消息中间件
  • Kafka
Nreal
2024-03-12
目录

Kafka之SpringBoot整合

# 项目搭建

依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
1
2
3
4

配置

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      producer:
            retries: 3 # 设置大于 0 的值,则客户端会将发送失败的记录重新发送
            batch-size: 16384
            buffer-memory: 33554432
            acks: 1
            # 指定消息key和消息体的编解码方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
            group-id: default-group
            enable-auto-commit: false
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            max-poll-records: 500
        listener:
        # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
        # RECORD
        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
        # BATCH
        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
        # TIME
        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
        # COUNT
        # TIME | COUNT 有一个条件满足时提交
        # COUNT_TIME
        # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
        # MANUAL
        # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
        # MANUAL_IMMEDIATE
            ack-mode: MANUAL_IMMEDIATE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

demo

@KafkaListener指定消费者ID以及监听的 topic

@Slf4j
@RestController
public class HelloController {

    private static final String topic = "test";

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    // 接收消息
    @KafkaListener(id = "helloGroup", topics = topic)
    public void listen(String msg) {
        log.info("hello receive value: {}" , msg);
    }

    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send(topic, "hello kafka");
        return "hello";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 获取发送结果

同步获取:

@GetMapping("/hello")
public String hello() {
    // 同步获取结果
    ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send(topic,"hello");
    try {
        SendResult<Object,Object> result = future.get();
        log.info(result.getRecordMetadata().topic()); 
    }catch (Throwable e){
        e.printStackTrace();
    }

    return "hello";
}
1
2
3
4
5
6
7
8
9
10
11
12
13

异步获取:

@GetMapping("/hello")
public String hello() {
    // 发送消息 - 异步获取通知结果
    kafkaTemplate.send(topic, "hello").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
        @Override
        public void onFailure(Throwable throwable) {
            log.error("fail >>>>{}", throwable.getMessage());
        }

        @Override
        public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
            log.info("async success >>> {}", objectObjectSendResult.getRecordMetadata().topic());
        }
    });

    return "hello";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# Kafka事务

如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息;

默认情况,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的,需要添加transaction-id-prefix来激活它:

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      transaction-id-prefix: kafka_.
1
2
3
4
5

方法1:

@GetMapping("/hello")
public String hello() {
    kafkaTemplate.executeInTransaction(t -> {
        t.send("hello","msg1");
        if(true)
            throw new RuntimeException("failed");
        t.send("hello","msg2");
        return true;
    });

    return "hello";
}

// 接收消息
@KafkaListener(id = "helloGroup", topics = "hello")
public void listen(String msg) {
    log.info("receive value: {}" , msg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

方法2:

// 注解方式
@Transactional(rollbackFor = RuntimeException.class)
@GetMapping("/hello")
public String hello() {
    kafkaTemplate.send("hello","msg1");
    if(true)
        throw new RuntimeException("failed");
    kafkaTemplate.send("hello","msg2");
    return "hello";
}
1
2
3
4
5
6
7
8
9
10

# 监听多个topic

@KafkaListener 注解可以添加到类级别或方法级别:

  • 在类级别添加注解,将指定默认的 Topic和消费者组 ID;
  • 在方法级别添加注解,则可以使用不同的 Topic 和消费者组 ID。

监听多个topic:

@Slf4j
@RestController
public class ListenerController {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("topic1", "1");
        kafkaTemplate.send("topic2", "2");
        return "hello";
    }

    /**
     * 监听多个topic
     * @param message
     */
    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1")
    public void listen(String message) {
        log.info("Received message >>> {}", message);
        // Received message >>> 1
        // Received message >>> 2
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

@KafkaListener 参数列表:

  • topics:指定要消费的 Topic 的名称;
  • groupId:指定消费者组 ID。消费者组是一组共享相同 Topic 的消费者的集合;
  • containerFactory:指定要使用的 KafkaListenerContainerFactory 实例的名称。如果没有指定,将使用默认的 KafkaListenerContainerFactory 实例;
  • concurrency:指定要创建的并发消费者的数量,默认值为 1;
  • autoStartup:指定容器是否在应用启动时自动启动,默认值为 true;
  • id:指定监听器的唯一标识符,默认值为 "";
  • errorHandler:指定在处理消息时出现异常时要使用的 ErrorHandler 实例;
  • properties:指定传递给消费者工厂的 Kafka 消费者配置属性的 Map;
  • partitionOffsets: 是一个 Map 类型的参数,该参数用于指定要从 Topic 的每个分区的哪个偏移量开始消费消息;

ContainerFactory:

Kafka 消费者可以使用不同的消息监听器容器,例如 ConcurrentKafkaListenerContainerFactory、KafkaMessageListenerContainer 等,每个容器都提供了不同的功能和配置选项,可以根据实际需求进行选择和配置;

可以通过SpringBoot配置文件或通过创建KafkaListenerContainerFactory bean实现自定义Kafka消费者的配置选项;

案例:

创建一个 ConcurrentKafkaListenerContainerFactory 的 bean,并配置了一些属性,例如:

  • 使用 DefaultKafkaConsumerFactory 作为消费者工厂;
  • 设置并发消费者数量为 3;
  • 设置轮询超时时间为 3000 毫秒。
@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello")
public String hello() {
    // 发送消息
    kafkaTemplate.send("topic", "hello");
    return "hello";
}

@KafkaListener(topics = "topic", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(String message) {
    // 处理消息
    log.info("hello1 >>>>> {}", message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
@EnableKafka
public class KafkaConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
         //也可以设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        //factory.setBatchListener(true);

        return factory;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 指定发送主题

@SendTo:用于指定消息被发送到的目标 Topic。当消费者成功消费一个消息后,可以将结果发送到指定的目标 Topic,以供其他消费者进一步处理;

用于消费者上,指定消息的处理结果发送给哪个topic

@Slf4j
@RestController
public class SendToController {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    // 接收消息
    @Transactional(rollbackFor = Exception.class)
    @KafkaListener(id = "input", topics = "inputTopic")
    @SendTo("outputTopic")
    public String processMessage(String message) {
        // 处理消息并返回结果
        log.info("inputTopic >>>> {}", message); // inputTopic >>>> 1
        return "2";
    }

    @KafkaListener(id = "output", topics = "outputTopic")
    public String process1Message(String message) {
        // 处理消息并返回结果
        String result = "Processed message: " + message;
        log.info("outputTopic >>>> {}", result); // outputTopic >>>> Processed message: 2
        return result;
    }

    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("inputTopic", "1");
        return "hello";
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 消息应答

消费者消费完消息,需要给回应;

案例:ReplyingKafkaTemplate

@Slf4j
@RestController
public class ReceiveCustomerController {

    private static final String topic = "hello";
    private static final String topicCroup = "helloGroup";

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer(topic + "_replies");
        repliesContainer.getContainerProperties().setGroupId(topicGroup);
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
    }

    @Bean
    public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate(producerFactory);
    }

    @Autowired
    private ReplyingKafkaTemplate kafkaReplyTemplate;

    @GetMapping("/send/{msg}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMsg(@PathVariable String msg) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
        RequestReplyFuture<String, String, String> replyFuture = kafkaReplyTemplate.sendAndReceive(record);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        log.info("customer reply >>>> {}: ", consumerRecord.value()); // customer reply >>>>listen: I do it >>> 1:
    }

    @KafkaListener(id = topicGroup, topics = topic)
    @SendTo
    public String listen(String msg) {
        log.info("listen receive msg >>>  {}", msg); // listen receive msg >>>  1
        return "listen: I do it >>> " + msg;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 消息丢失

默认情况,Kafka会使用 自动提交偏移量 的方式来 管理偏移量,每当消费者从Kafka拉取一批消息并消费完毕后,将自动提交偏移量,以便下次消费者拉取消息能够从上次提交的偏移量处开始消费消息;

# 偏移量

偏移量:Kafka中用于标识消息在分区中位置的一个数字,每个消息都有唯一的偏移量,可以用于回溯分区中的消息,也可用于跟踪已经消费的消息;

Kafka使用偏移量来保证消费者可以从上次离开的地方继续消费,从而保证消息的顺序性和可靠性;

实际应用:

  • 并发消费:多个消费者来并发地消费同一个主题的消息。在这种情况下,每个消费者都可以独立地管理自己的偏移量,并根据自己的需要进行提交,以确保每个消费者都能够独立地处理消息;
  • 重新消费:假设一个消费者在处理消息时发生了故障或错误,导致它无法处理后续的消息,可以将消费者的偏移量重置为较早的位置,以重新消费之前未能处理的消息;
  • 消费者组协调器:Kafka消费者API中的消费者组协调器负责管理消费者组中的偏移量。当消费者加入或离开消费者组时,协调器将重新分配偏移量,以确保消费者可以从正确的位置开始消费;

# 模拟

首先发送10条消息到topic1,消费成功后将结果发送到topic2;

@Slf4j
@RestController
public class OffsetController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @GetMapping("/hello")
    public String hello() throws Exception {
        // 发送消息
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            kafkaTemplate.send("topic1", message);
            log.info("Sent message: {}", message);
            Thread.sleep(1000);
        }
        return "hello";
    }

    @KafkaListener(topics = "topic1", id = "to1")
    public void listen1(String message) {
        log.info("listen1 Received message >>> {}", message);
        kafkaTemplate.send("topic2", message);
    }

    @KafkaListener(topics = "topic2", id = "to2")
    public void listen2(String message) {
        log.info("listen2 Received message >>> {}", message);

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

假设消费者to1在某些情况下发生了异常或者宕机了;

@KafkaListener(topics = "topic1", id = "to1")
public void listen1(String message) {
    if(message.contains("6"))
        throw new RuntimeException("系统异常");
    log.info("listen1 Received message >>> {}", message);
    kafkaTemplate.send("topic2", message);
}

@KafkaListener(topics = "topic2", id = "to2")
public void listen2(String message) {
    if(String.format("Message %d", 6 + 1).equals(message)) {
        log.error("消息丢失, 消息为 >>> {}", 6);
    }
    log.info("listen2 Received message >>> {}", message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

出现消息丢失:当消费者to1消费到Message 6这条消息的时候报了异常导致消息没有被消费成功,但是还是正常提交了offset,接着继续消费,这就导致了消息的丢失。理论上讲没有消费成功的消息应当重新消费,然后提交offset;

# 自动提交出现消息丢失的原因

消费者poll到消息后,还未消费,就向broker的主题提交当前分区消费的偏移量;

poll过程:

  • 消费者建立与broker之间长连接,开始poll消息;默认一次poll 500条消息,如果两次poll时间超过30s时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。
  • 如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次⻓轮询结束。
  • kafka会发送 心跳检测,如果超过 10 秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。

# 手动提交

解决方法:手动提交偏移量(消费者完成所有消息的消费后手动提交偏移量),即使在消费者消费消息的过程中出现异常或者消费者应用程序被关闭,也能够确保消息的完整性和可靠性。

配置类:

spring:
    kafka:
        bootstrap-servers: 192.168.30.131: 9093
        ...
        consumer:
            group-id: default-group
            enable-auto-commit: false
            auto-offset-reset: earliest
        ...
1
2
3
4
5
6
7
8
9

# 消息确认

消费者在成功消费一条消息后,向Kafka集群确认消息已经被成功消费:

  • acks=0:生产者发送消息后,不等待任何确认,直接发送下一条消息。
  • acks=1:生产者发送消息后,等待leader节点成功写入消息后返回确认,然后发送下一条消息。
  • acks=all:生产者发送消息后,等待所有的follower节点和leader节点都成功写入消息后返回确认,然后发送下一条消息。

# 动态创建消费者

  1. 创建消费者对象

    消费者工厂KafkaDynamicConsumerFactory ,创建消费者对象:

    @Component
    public class KafkaDynamicConsumerFactory {
    
    	@Autowired
    	private KafkaProperties kafkaProperties;
    
    	@Value("${spring.kafka.consumer.key-deserializer}")
    	private String keyDeSerializerClassName;
    
    	@Value("${spring.kafka.consumer.value-deserializer}")
    	private String valueDeSerializerClassName;
    
    	/**
    	 * 创建一个Kafka消费者
    	 * @param topic   消费者订阅的话题
    	 * @param groupId 消费者组名
    	 * @return 消费者对象
    	 */
    	public <K, V> KafkaConsumer<K, V> createConsumer(String topic, String groupId) throws ClassNotFoundException {
    		Properties consumerProperties = new Properties();
    		// 设定一些关于新的消费者的配置信息
    		consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    		// 设定新的消费者的组名
    		consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    		// 设定反序列化方式
    		consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Class.forName(keyDeSerializerClassName));
    		consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Class.forName(valueDeSerializerClassName));
    		// 设定信任所有类型以反序列化
    		consumerProperties.put("spring.json.trusted.packages", "*");
    		// 新建一个消费者
    		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerProperties);
    		// 使这个消费者订阅对应话题
    		consumer.subscribe(Collections.singleton(topic));
    		return consumer;
    	}
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
  2. 使用定时任务实现消费者实时订阅

    • 使用定时任务,在定时任务中使消费者不停地接收并处理消息
    • 将每个定时任务和消费者都存起来,后面在消费者不需要的时候可以移除它们并关闭定时任务

    上下文类KafkaConsumerContext ,用于存放所有的消费者和定时任务,并编写增加和移除定时任务的方法:

    public class KafkaConsumerContext {
    
    	/**
    	 * 存放所有自己创建的Kafka消费者任务
    	 * key: groupId
    	 * value: kafka消费者任务
    	 */
    	private static final Map<String, KafkaConsumer<?, ?>> consumerMap = new ConcurrentHashMap<>();
    
    	/**
    	 * 存放所有定时任务的哈希表
    	 * key: groupId
    	 * value: 定时任务对象,用于定时执行kafka消费者的消息消费任务
    	 */
    	private static final Map<String, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();
    
    	/**
    	 * 任务调度器,用于定时任务
    	 */
    	private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(24);
    
    	/**
    	 * 添加一个Kafka消费者任务
    	 * @param groupId  消费者的组名
    	 * @param consumer 消费者对象
    	 * @param <K>      消息键类型
    	 * @param <V>      消息值类型
    	 */
    	public static <K, V> void addConsumerTask(String groupId, KafkaConsumer<K, V> consumer) {
    		// 先存入消费者以便于后续管理
    		consumerMap.put(groupId, consumer);
    		// 创建定时任务,每隔1s拉取消息并处理
    		ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
    			// 每次执行拉取消息之前,先检查订阅者是否已被取消(如果订阅者不存在于订阅者列表中说明被取消了)
    			// 因为Kafka消费者对象是非线程安全的,因此在这里把取消订阅的逻辑和拉取并处理消息的逻辑写在一起并放入定时器中,判断列表中是否存在消费者对象来确定是否取消任务
    			if (!consumerMap.containsKey(groupId)) {
    				// 取消订阅并关闭消费者
    				consumer.unsubscribe();
    				consumer.close();
    				// 关闭定时任务
    				scheduleMap.remove(groupId).cancel(true);
    				return;
    			}
    			// 拉取消息
    			ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
    			for (ConsumerRecord<K, V> record : records) {
    				// 自定义处理每次拉取的消息逻辑
    				System.out.println(record.value());
    			}
    		}, 0, 1, TimeUnit.SECONDS);
    		// 将任务存入对应的列表以后续管理
    		scheduleMap.put(groupId, future);
    	}
    
    	/**
    	 * 移除Kafka消费者定时任务并关闭消费者订阅
    	 * @param groupId 消费者的组名
    	 */
    	public static void removeConsumerTask(String groupId) {
    		if (!consumerMap.containsKey(groupId)) {
    			return;
    		}
    		// 从列表中移除消费者
    		consumerMap.remove(groupId);
    	}
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67

    在定时任务中,每隔一段时间就拉取一次消息并处理,就实现了消费者实时订阅消息的效果。

  3. 测试

    @RestController
    @RequestMapping("/api/kafka")
    public class KafkaTestAPI {
    
    	@Autowired
    	private KafkaTemplate<String, String> kafkaTemplate;
    
    	@Autowired
    	private KafkaDynamicConsumerFactory factory;
    
    	@GetMapping("/send")
    	public String send() {
    		kafkaTemplate.send("my-topic", "hello!");
    		return "发送完成!";
    	}
    
    	@GetMapping("/create/{groupId}")
    	public String create(@PathVariable String groupId) throws ClassNotFoundException {
    		// 这里统一使用一个topic
    		KafkaConsumer<String, String> consumer = factory.createConsumer("my-topic", groupId);
    		KafkaConsumerContext.addConsumerTask(groupId, consumer);
    		return "创建成功!";
    	}
    
    	@GetMapping("/remove/{groupId}")
    	public String remove(@PathVariable String groupId) {
    		KafkaConsumerContext.removeConsumerTask(groupId);
    		return "移除成功!";
    	}
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    依次访问/api/kafka/create/a和/api/kafka/create/b,就创建了两个消费者,然后访问/api/kafka/send发送消息;

# 动态删除Topic

在Spring Boot集成Kafka时,默认情况下向一个Topic发送了消息,若这个Topic不存在则会自动创建。不过如果创建的Topic多了,并且后续不再使用,那会占用服务器资源。

  1. 配置AdminClient的Bean

    @Configuration
    public class KafkaAdminConfig {
    
    	/**
    	 * 读取kafka地址配置
    	 */
    	@Value("${spring.kafka.bootstrap-servers}")
    	private String kafkaServerURL;
    
    	/**
    	 * 注入一个kafka管理实例
    	 * @return kafka管理对象
    	 */
    	@Bean
    	public AdminClient adminClient() {
    		Properties properties = new Properties();
    		properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerURL);
    		return AdminClient.create(properties);
    	}
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
  2. 使用AdminClient删除

    @RestController
    @RequestMapping("/api/kafka-topic")
    public class KafkaTopicAPI {
    
    	/**
    	 * 在需要删除Topic的地方自动装配AdminClient对象
    	 */
    	@Autowired
    	private AdminClient adminClient;
    
    	@GetMapping("/delete/{topicId}")
    	public String deleteTopic(@PathVariable String topicId) {
    		adminClient.deleteTopics(Collections.singleton(topicId));
    		return "删除Topic完成!";
    	}
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    AdminClient对象的deleteTopics方法,传参为Topic名称列表,只传入一个Topic名,因此使用Collections.singleton方法将这一个名称变成列表形式;

    如果某个Topic还正在被至少一个消费者订阅着,这个Topic将无法被删除! 所以要删除一个Topic之前请先确保其现在没有被任何消费者订阅;

# 其它

Springboot使用spring-kafka以及一些思考 (opens new window)

Kafka入门
Kafka数据模型

← Kafka入门 Kafka数据模型→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式