一、配置文件
application.properties配置文件如下
#kafka多数据源配置 #kafka数据源一,日志审计推送 spring.kafka.one.bootstrap-servers=172.19.12.109:32182 spring.kafka.one.producer.retries=0 spring.kafka.one.producer.properties.max.block.ms=5000 #kafka数据源二,动环数据消费 spring.kafka.two.bootstrap-servers=172.19.12.109:32182 spring.kafka.two.producer.retries=0 spring.kafka.two.producer.properties.max.block.ms=5000 spring.kafka.two.consumer.group-id=bw-convert-data spring.kafka.two.consumer.enable-auto-commit=true
二、pom依赖
org.springframework.kafka spring-kafka
三、生产者、消费者配置
1.第一个kakfa
package com.gstanzer.convert.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaOneConfig { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.producer.retries}") private String retries; @Value("${spring.kafka.one.producer.properties.max.block.ms}") private String maxBlockMs; @Bean public KafkaTemplatekafkaOneTemplate() { return new KafkaTemplate<>(producerFactory()); } private ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } private Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
2.第二个kakfa
package com.gstanzer.convert.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaTwoConfig { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.producer.retries}") private String retries; @Value("${spring.kafka.two.producer.properties.max.block.ms}") private String maxBlockMs; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Value("${spring.kafka.two.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Bean public KafkaTemplatekafkaTwoTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean KafkaListenerContainerFactory > kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } private ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } private Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
四.生产者
@Controller public class TestController { @Autowired private KafkaTemplate kafkaOneTemplate; @Autowired private KafkaTemplate kafkaTwoTemplate; @RequestMapping("/send") @ResponseBody public String send() { final String TOPIC = "TOPIC_1"; kafkaOneTemplate.send(TOPIC, "kafka one"); kafkaTwoTemplate.send(TOPIC, "kafka two"); return "success"; } }
五.消费者
@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); final String TOPIC = "TOPIC_1"; // containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同 @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory") public void listenerOne(ConsumerRecord, ?> record) { LOGGER.info(" kafka one 接收到消息:{}", record.value()); } @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory") public void listenerTwo(ConsumerRecord, ?> record) { LOGGER.info(" kafka two 接收到消息:{}", record.value()); } }
备注:
生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:
Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客
猜你喜欢
- 18天前(万达酒店及度假村连续五年荣获“中国饭店集团60强”)万达酒店及度假村连续五年荣获“中国饭店集团60强”
- 18天前(夏日旅行海报)夏日旅行|精简行囊 向快乐进发
- 18天前(瑞虎7plus2021款)重塑10万级SUV价值标杆,全新一代瑞虎7PLUS冠军版给你惊喜
- 18天前(“为人民绽放——国家艺术基金优秀剧目展演”在合肥开幕)“为人民绽放——国家艺术基金优秀剧目展演”在合肥开幕
- 18天前(安岚度假村及酒店推出"山海之约"目的地婚礼计划)安岚度假村及酒店推出"山海之约"目的地婚礼计划
- 18天前(“百场黄梅唱响百家景区”示范演出活动在黄山风景区举行)“百场黄梅唱响百家景区”示范演出活动在黄山风景区举行
- 18天前(殷建祥简历)全国十大牛商解码:殷建祥如何用178天技术突围打造星空梦星空房
- 18天前(携程租车加盟合作)携程租车加盟优势全解析:开启旅游出行市场新篇章
- 18天前(曹妃甸美仑华府哪个楼层好)曹妃甸新城教育经济新引擎启动—美仑国际酒店盛大开业
- 18天前(阿斯塔纳航空属于哪个联盟)阿斯塔纳航空荣获Skytrax世界航空公司大奖,将继续助力中哈交流往来
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章