-
生产者发送消息,多个消费者只能有一个消费者接收到消息
-
生产者发送消息,多个消费者都可以接收到消息
(1)创建kafka-demo项目,导入依赖
org.apache.kafka kafka-clients3.4.0 (2)生产者发送消息
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.140.100:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //2.生产者对象 KafkaProducer
producer = new KafkaProducer (properties); //封装发送的消息 ProducerRecord record = new ProducerRecord ("green-topic","100001","hello kafka"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } } (3)消费者接收消息
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * 消费者 */ public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.100:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumer
consumer = new KafkaConsumer (properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("green-topic")); //当前线程一直处于监听状态 每一秒拉取一次 while (true) { //4.获取消息 ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } } SpringBoot集成kafka
1.导入spring-kafka依赖信息
org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.apache.kafka kafka-clientsorg.apache.kafka kafka-clientscom.alibaba fastjson2.在resources下创建文件application.yml
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.140.100:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
import com.alibaba.fastjson.JSON; import com.heima.kafka.pojo.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate
kafkaTemplate; @GetMapping("/hello") public String hello(){ kafkaTemplate.send("green-topic","hello kafka..."); return "ok"; } //传递的消息为对象 @GetMapping("/hello2") public String hello2(){ User user = new User(); user.setUsername("小明"); user.setAge(18); kafkaTemplate.send("green-topic2", JSON.toJSONString(user)); return "ok2"; } } 4.消息消费者
import com.alibaba.fastjson.JSON; import com.heima.kafka.pojo.User; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "green-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(message); } } @KafkaListener(topics = "green-topic2") public void onMessage2(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(JSON.parseObject(message, User.class)); } } }
猜你喜欢
- 6天前(a级景区评定机构)全国A级旅游景区创建与提升培训班在敦煌市举办
- 6天前(瑞虎7plus2021款)重塑10万级SUV价值标杆,全新一代瑞虎7PLUS冠军版给你惊喜
- 6天前(屿见不一样是哪个酒店)屿见白纱,遇见自己 “佳能PhotoGirls屿见白纱”摄影派对玩转海岛
- 6天前(东北地区全域旅游)东北三省一区宣传贯彻研学旅游行业标准
- 6天前(曼谷丽思卡尔顿公寓价格)在曼谷丽思卡尔顿酒店CALEŌ 邂逅鸡尾酒的浪漫艺术
- 6天前(071 圣安东尼奥)秋季 圣安东尼奥交出了私藏活动清单
- 6天前(当科学邂逅喜剧:科技馆喜剧嘉年华背后的"文旅破壁者")当科学邂逅喜剧:科技馆喜剧嘉年华背后的"文旅破壁者"
- 6天前(安岚度假村及酒店推出"山海之约"目的地婚礼计划)安岚度假村及酒店推出"山海之约"目的地婚礼计划
- 6天前(福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情)福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情
- 6天前(我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章