上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

kafka入门

guduadmin86天前

kafka入门,第1张

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息

  • 生产者发送消息,多个消费者都可以接收到消息

     (1)创建kafka-demo项目,导入依赖

    
        org.apache.kafka
        kafka-clients
        3.4.0
    

    (2)生产者发送消息

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    /**
     * 生产者
     */
    public class ProducerQuickStart {
        public static void main(String[] args) {
            //1.kafka的配置信息
            Properties properties = new Properties();
            //kafka的连接地址
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.140.100:9092");
            //发送失败,失败的重试次数
            properties.put(ProducerConfig.RETRIES_CONFIG,5);
            //消息key的序列化器
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            //消息value的序列化器
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
            //2.生产者对象
            KafkaProducer producer = new KafkaProducer(properties);
            //封装发送的消息
            ProducerRecord record = new ProducerRecord("green-topic","100001","hello kafka");
            //3.发送消息
            producer.send(record);
            //4.关闭消息通道,必须关闭,否则消息发送不成功
            producer.close();
        }
    }

    (3)消费者接收消息

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    /**
     * 消费者
     */
    public class ConsumerQuickStart {
        public static void main(String[] args) {
            //1.添加kafka的配置信息
            Properties properties = new Properties();
            //kafka的连接地址
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.100:9092");
            //消费者组
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            //消息的反序列化器
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //2.消费者对象
            KafkaConsumer consumer = new KafkaConsumer(properties);
            //3.订阅主题
            consumer.subscribe(Collections.singletonList("green-topic"));
            //当前线程一直处于监听状态 每一秒拉取一次
            while (true) {
                //4.获取消息
                ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord.key());
                    System.out.println(consumerRecord.value());
                }
            }
        }
    }

    SpringBoot集成kafka

    1.导入spring-kafka依赖信息

    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
        
            org.springframework.kafka
            spring-kafka
            
                
                    org.apache.kafka
                    kafka-clients
                
            
        
        
            org.apache.kafka
            kafka-clients
        
        
            com.alibaba
            fastjson
        
    

    2.在resources下创建文件application.yml

    server:
      port: 9991
    spring:
      application:
        name: kafka-demo
      kafka:
        bootstrap-servers: 192.168.140.100:9092
        producer:
          retries: 10
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: ${spring.application.name}-test
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    3.消息生产者

    import com.alibaba.fastjson.JSON;
    import com.heima.kafka.pojo.User;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    public class HelloController {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        @GetMapping("/hello")
        public String hello(){
            kafkaTemplate.send("green-topic","hello kafka...");
            return "ok";
        }
        //传递的消息为对象
        @GetMapping("/hello2")
        public String hello2(){
            User user = new User();
            user.setUsername("小明");
            user.setAge(18);
            kafkaTemplate.send("green-topic2", JSON.toJSONString(user));
            return "ok2";
        }
    }

    4.消息消费者

    import com.alibaba.fastjson.JSON;
    import com.heima.kafka.pojo.User;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    @Component
    public class HelloListener {
        @KafkaListener(topics = "green-topic")
        public void onMessage(String message){
            if(!StringUtils.isEmpty(message)){
                System.out.println(message);
            }
        }
        @KafkaListener(topics = "green-topic2")
        public void onMessage2(String message){
            if(!StringUtils.isEmpty(message)){
                System.out.println(JSON.parseObject(message, User.class));
            }
        }
    }

网友评论

搜索
最新文章
热门文章
热门标签