-
生产者发送消息,多个消费者只能有一个消费者接收到消息
-
生产者发送消息,多个消费者都可以接收到消息
(1)创建kafka-demo项目,导入依赖
org.apache.kafka kafka-clients3.4.0 (2)生产者发送消息
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.140.100:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //2.生产者对象 KafkaProducer
producer = new KafkaProducer (properties); //封装发送的消息 ProducerRecord record = new ProducerRecord ("green-topic","100001","hello kafka"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } } (3)消费者接收消息
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; /** * 消费者 */ public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.100:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumer
consumer = new KafkaConsumer (properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("green-topic")); //当前线程一直处于监听状态 每一秒拉取一次 while (true) { //4.获取消息 ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } } SpringBoot集成kafka
1.导入spring-kafka依赖信息
org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.apache.kafka kafka-clientsorg.apache.kafka kafka-clientscom.alibaba fastjson2.在resources下创建文件application.yml
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.140.100:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
import com.alibaba.fastjson.JSON; import com.heima.kafka.pojo.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate
kafkaTemplate; @GetMapping("/hello") public String hello(){ kafkaTemplate.send("green-topic","hello kafka..."); return "ok"; } //传递的消息为对象 @GetMapping("/hello2") public String hello2(){ User user = new User(); user.setUsername("小明"); user.setAge(18); kafkaTemplate.send("green-topic2", JSON.toJSONString(user)); return "ok2"; } } 4.消息消费者
import com.alibaba.fastjson.JSON; import com.heima.kafka.pojo.User; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "green-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(message); } } @KafkaListener(topics = "green-topic2") public void onMessage2(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(JSON.parseObject(message, User.class)); } } }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章