springboot项目配置多个kafka的示例代码

其他教程   发布日期:2025年04月22日   浏览次数:226

这篇文章主要介绍了springboot项目配置多个kafka的示例代码的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇springboot项目配置多个kafka的示例代码文章都会有所收获,下面我们一起来看看吧。

1.spring-kafka

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>1.3.5.RELEASE</version>
  5. </dependency>

2.配置文件相关信息

  1. kafka.bootstrap-servers=localhost:9092
  2. kafka.consumer.group.id=20230321
  3. #可以并发消费的线程数 (通常与partition数量一致)
  4. kafka.consumer.concurrency=10
  5. kafka.consumer.enable.auto.commit=false
  6. kafka.bootstrap-servers.pic=localhost:29092
  7. kafka.consumer.group.id.pic=20230322_pic
  8. kafka.consumer.concurrency.pic=10
  9. kafka.consumer.enable.auto.commit.pic=false

3.kafka配置类

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConsumerConfig {
  4. @Value("${kafka.consumer.group.id}")
  5. private String groupId;
  6. @Value("${kafka.consumer.concurrency}")
  7. private int concurrency;
  8. @Value("${kafka.consumer.enable.auto.commit}")
  9. private String autoCommit;
  10. @Value("${kafka.bootstrap-servers}")
  11. private String bootstrapServer;
  12. @Value("${kafka.consumer.group.id.pic}")
  13. private String groupIdPic;
  14. @Value("${kafka.consumer.concurrency.pic}")
  15. private int concurrencyPic;
  16. @Value("${kafka.consumer.enable.auto.commit.pic}")
  17. private String autoCommitPic;
  18. @Value("${kafka.bootstrap-servers.pic}")
  19. private String bootstrapServerPic;
  20. @Bean
  21. public ConsumerFactory<String, String> consumerFactory() {
  22. String bootstrapServers = bootstrapServer;
  23. Map<String, Object> configProps = new HashMap<>(16);
  24. configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  25. configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  28. configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
  29. return new DefaultKafkaConsumerFactory<>(configProps);
  30. }
  31. @Bean
  32. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  33. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  34. new ConcurrentKafkaListenerContainerFactory<>();
  35. factory.setConsumerFactory(consumerFactory());
  36. factory.setConcurrency(concurrency);
  37. factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
  38. return factory;
  39. }
  40. @Bean
  41. public ConsumerFactory<String, String> consumerFactoryPic() {
  42. String bootstrapServers = bootstrapServerPic;
  43. Map<String, Object> configProps = new HashMap<>(16);
  44. configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  45. configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  46. configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  47. configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
  48. configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
  49. return new DefaultKafkaConsumerFactory<>(configProps);
  50. }
  51. @Bean
  52. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
  53. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  54. new ConcurrentKafkaListenerContainerFactory<>();
  55. factory.setConsumerFactory(consumerFactoryPic());
  56. factory.setConcurrency(concurrencyPic);
  57. factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
  58. return factory;
  59. }
  60. }

4.消费主题消息

  1. @KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
  2. public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
  3. try {
  4. String jsonString = message.value();
  5. if (StringUtils.isNoneBlank(jsonString)) {
  6. log.info("消费:{}",jsonString);
  7. //TODO ....
  8. }
  9. } catch (Exception e) {
  10. log.error(" receive topic error ", e);
  11. } finally {
  12. ack.acknowledge();
  13. }
  14. }
  15. @KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
  16. public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
  17. try {
  18. if (StringUtils.isNoneBlank(message.value())) {
  19. //TODO ....
  20. }
  21. } catch (Exception e) {
  22. logger.error(" receive topic error ", e);
  23. } finally {
  24. ack.acknowledge();
  25. }
  26. }

以上就是springboot项目配置多个kafka的示例代码的详细内容,更多关于springboot项目配置多个kafka的示例代码的资料请关注九品源码其它相关文章!