上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【Kafka】Kafka Stream简单使用

guduadmin11天前

一、实时流式计算

1. 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算

【Kafka】Kafka Stream简单使用,在这里插入图片描述,第1张

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

  • 日志分析: 网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计: 可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据: 可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算

比如应用较广的 头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

3. Kafka Stream

近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache Samza、Apache Storm,以及这些年火爆的 Spark 以及 Flink 等。

3.1 Kafka Streams的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

    【Kafka】Kafka Stream简单使用,在这里插入图片描述,第2张

    3.2 关键概念

    一个最简单的Streaming的结构如下图所示:

    【Kafka】Kafka Stream简单使用,在这里插入图片描述,第3张

    从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。

    那么,我们再在上面的结构之上扩展一下,假设定义了多个Source Topic及Destination Topic,那就构成如下图所示的较为复杂的拓扑结构:

    【Kafka】Kafka Stream简单使用,在这里插入图片描述,第4张

    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
    • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。

      【Kafka】Kafka Stream简单使用,在这里插入图片描述,第5张

      Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java就可以实现流式处理。

      3.3 KStream

      KStream:数据结构类似于map,如下图,key-value键值对

      【Kafka】Kafka Stream简单使用,在这里插入图片描述,第6张

      KStream数据流(data stream),是一段顺序的,可以无限长,不断更新的数据集。

      数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

      KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

      二、测试kafkaStream

      先看下简单的kafkaStream的KStream测试

      需求分析:求单词个数(word count)

      【Kafka】Kafka Stream简单使用,在这里插入图片描述,第7张

      1. pom.xml引入依赖:

             
              
                  org.springframework.kafka
                  spring-kafka
                  
                      
                          org.apache.kafka
                          kafka-clients
                      
                  
              
              
                  org.apache.kafka
                  kafka-clients
              
              
                  com.alibaba
                  fastjson
              
              
                  org.apache.kafka
                  kafka-streams
                  
                      
                          connect-json
                          org.apache.kafka
                      
                      
                          org.apache.kafka
                          kafka-clients
                      
                  
              
      

      2. 配置文件

      server:
        port: 9991
      spring:
        application:
          name: kafka-demo
        kafka:
          bootstrap-servers: 192.168.200.130:9092
          producer:
            retries: 10
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            compression-type: lz4
          consumer:
            group-id: ${spring.application.name}-test
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      

      3. 编写生产者

      ProducerQuickStart.java

      package com.kafka.sample;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.kafka.clients.producer.*;
      import java.util.Properties;
      @Slf4j
      public class ProducerQuickStart {
          public static void main(String[] args) {
              //1. kafka的配置信息
              Properties prop = new Properties();
              //kafka的链接信息
              prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
              //配置重试次数
              prop.put(ProducerConfig.RETRIES_CONFIG, 5);
              //数据压缩
              prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
              //ack配置  消息确认机制   默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
      //        prop.put(ProducerConfig.ACKS_CONFIG,"all");
              消息key的序列化器
              prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
              //消息value的序列化器
              prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
              //2. 生产者对象
              KafkaProducer producer = new KafkaProducer(prop);
              //封装发送的消息
              ProducerRecord producerRecord = new ProducerRecord("itcast-topic-input", "key_001", "hello kafka");
              //3. 发送消息
              for (int i = 0; i < 5; i++) {
                  producer.send(producerRecord);
              }
              //4. 关闭消息通道  必须关闭,否则消息发不出去
              producer.close();
          }
      }
      

      4 编写kafkaStream流式处理

      KafkaStreamQuickStart.java

      package com.kafka.sample;
      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.KafkaStreams;
      import org.apache.kafka.streams.KeyValue;
      import org.apache.kafka.streams.StreamsBuilder;
      import org.apache.kafka.streams.StreamsConfig;
      import org.apache.kafka.streams.kstream.KStream;
      import org.apache.kafka.streams.kstream.TimeWindows;
      import org.apache.kafka.streams.kstream.ValueMapper;
      import java.time.Duration;
      import java.util.Arrays;
      import java.util.Properties;
      /**
       * 流式处理
       */
      public class KafkaStreamQuickStart {
          public static void main(String[] args) {
              //kafka的配置信心
              Properties prop = new Properties();
              prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
              prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
              prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
              prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");
              //stream 构建器
              StreamsBuilder streamsBuilder = new StreamsBuilder();
              //流式计算
              streamProcessor(streamsBuilder);
              //创建kafkaStream对象
              KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
              //开启流式计算
              kafkaStreams.start();
          }
          /**
           * 流式计算
           * 消息的内容:hello kafka  hello itcast
           * @param streamsBuilder
           */
          private static void streamProcessor(StreamsBuilder streamsBuilder) {
              //创建kstream对象,同时指定从那个topic中接收消息
              KStream stream = streamsBuilder.stream("itcast-topic-input");
              /**
               * 处理消息的value
               */
              stream.flatMapValues(new ValueMapper>() {
                  @Override
                  public Iterable apply(String value) {
                      return Arrays.asList(value.split(" "));
                  }
              })
                      //按照value进行聚合处理
                      .groupBy((key,value)->value)
                      //时间窗口
                      .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                      //统计单词的个数
                      .count()
                      //转换为kStream
                      .toStream()
                      .map((key,value)->{
                          System.out.println("key:"+key+",vlaue:"+value);
                          return new KeyValue<>(key.key().toString(),value.toString());
                      })
                      //发送消息
                      .to("itcast-topic-out");
          }
      }
      

      5. 编写消费者

      ConsumerQuickStart.java

      package com.kafka.sample;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import java.time.Duration;
      import java.util.Collections;
      import java.util.Properties;
      public class ConsumerQuickStart {
          public static void main(String[] args) {
              //1. 添加kafka的配置信息
              Properties properties = new Properties();
              // 配置链接信息
              properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
              //配置消费者组
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");
              //配置消息的反序列化器
              properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              //2. 消费者对象
              KafkaConsumer consumer = new KafkaConsumer(properties);
              //3. 订阅主题
              consumer.subscribe(Collections.singletonList("itcast-topic-out"));
              //当前线程一直监听消息
              while(true){
                  //4. 消费者拉取消息: 每秒拉取一次
                  ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                  for (ConsumerRecord record : records) {
                      System.out.println(record.key());
                      System.out.println(record.value());
                  }
              }
          }
      }
      

      启动项目:

      1. 在远端(192.168.200.130:9092)启动docker中的kafka容器
      2. 启动消费者ConsumerQuickStart的main函数
      3. 启动kafkastream的mian函数
      4. 启动生产者ProducerQuickStart的main函数

      5. 控制台打印结果:

      【Kafka】Kafka Stream简单使用,在这里插入图片描述,第8张

      【Kafka】Kafka Stream简单使用,在这里插入图片描述,第9张

      整个过程:

      生产者向kafka中发送了5条“hello kafka”消息,topic均为itcast-topic-input。kafkastream监听这个topic,每10秒进行一次流式处理,将“hello kakfa”字符串分割,并统计每个单词出现的次数。然后转为kstream,发送消息到kafka中的topic=itcast-topic-out”。消费者监听“itcast-topic-out”的topic,消费消息。

      三、Springboot整合kafkaStream

      1. 配置文件新增

      application.yml

      server:
        port: 9991
      spring:
        application:
          name: kafka-demo
        kafka:
          bootstrap-servers: 192.168.200.130:9092
          producer:
            retries: 10
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            compression-type: lz4
          consumer:
            group-id: ${spring.application.name}-test
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # kafkaStream新增以下配置
      kafka:
        hosts: 192.168.200.130:9092
        group: ${spring.application.name}
      

      2. 在微服务中新增配置类

      KafkaStreamConfig.java

      package com.kafka.config;
      import lombok.Getter;
      import lombok.Setter;
      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.StreamsConfig;
      import org.springframework.boot.context.properties.ConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.annotation.EnableKafkaStreams;
      import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
      import org.springframework.kafka.config.KafkaStreamsConfiguration;
      import java.util.HashMap;
      import java.util.Map;
      /**
       * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
       */
      @Setter
      @Getter
      @Configuration
      @EnableKafkaStreams
      @ConfigurationProperties(prefix="kafka")
      public class KafkaStreamConfig {
          private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
          private String hosts;
          private String group;
          @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
          public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
              Map props = new HashMap<>();
              props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
              props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
              props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
              props.put(StreamsConfig.RETRIES_CONFIG, 10);
              props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
              props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
              return new KafkaStreamsConfiguration(props);
          }
      }
      

      3. 使用kafkaStream监听消息

      KafkaStreamHelloListener.java

      package com.kafka.stream;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.kafka.streams.KeyValue;
      import org.apache.kafka.streams.StreamsBuilder;
      import org.apache.kafka.streams.kstream.KStream;
      import org.apache.kafka.streams.kstream.TimeWindows;
      import org.apache.kafka.streams.kstream.ValueMapper;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import java.time.Duration;
      import java.util.Arrays;
      @Configuration
      @Slf4j
      public class KafkaStreamHelloListener {
          @Bean
          public KStream kStream(StreamsBuilder streamsBuilder){
              //创建kstream对象,同时指定从那个topic中接收消息
              KStream stream = streamsBuilder.stream("itcast-topic-input");
              stream.flatMapValues(new ValueMapper>() {
                  @Override
                  public Iterable apply(String value) {
                      return Arrays.asList(value.split(" "));
                  }
              })
                      //根据value进行聚合分组
                      .groupBy((key,value)->value)
                      //聚合计算时间间隔
                      .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                      //求单词的个数
                      .count()
                      .toStream()
                      //处理后的结果转换为string字符串
                      .map((key,value)->{
                          System.out.println("key:"+key+",value:"+value);
                          return new KeyValue<>(key.key().toString(),value.toString());
                      })
                      //发送消息
                      .to("itcast-topic-out");
              return stream;
          }
      }
      

      测试:

      启动springboot应用程序,运行之前的ProducerQuickStart来生产消息,约10秒后,看到kafkaStream消息的处理结果

      【Kafka】Kafka Stream简单使用,在这里插入图片描述,第10张

      说明kafkaStream接收到消息并将多条消息进行了统一处理。

      参考(推荐阅读):

      1. https://cloud.tencent.com/developer/article/2100664
      2. https://www.cnblogs.com/tree1123/p/11457851.html

网友评论

搜索
最新文章
热门文章
热门标签