一、实时流式计算
1. 概念
一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
2. 应用场景
- 日志分析: 网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
- 大屏看板统计: 可以实时的查看网站注册数量,订单数量,购买数量,金额等。
- 公交实时数据: 可以随时更新公交车方位,计算多久到达站牌等
- 实时文章分值计算
比如应用较广的 头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。
3. Kafka Stream
近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache Samza、Apache Storm,以及这些年火爆的 Spark 以及 Flink 等。
3.1 Kafka Streams的特点
- Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
- 除了Kafka外,无任何外部依赖
- 充分利用Kafka分区机制实现水平扩展和顺序性保证
- 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
- 支持正好一次处理语义
- 提供记录级的处理能力,从而实现毫秒级的低延迟
- 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
- 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
3.2 关键概念
一个最简单的Streaming的结构如下图所示:
从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。
那么,我们再在上面的结构之上扩展一下,假设定义了多个Source Topic及Destination Topic,那就构成如下图所示的较为复杂的拓扑结构:
- 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
- Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java就可以实现流式处理。
3.3 KStream
KStream:数据结构类似于map,如下图,key-value键值对
KStream数据流(data stream),是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
二、测试kafkaStream
先看下简单的kafkaStream的KStream测试
需求分析:求单词个数(word count)
1. pom.xml引入依赖:
org.springframework.kafka spring-kafka org.apache.kafka kafka-clients org.apache.kafka kafka-clients com.alibaba fastjson org.apache.kafka kafka-streams connect-json org.apache.kafka org.apache.kafka kafka-clients 2. 配置文件
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer compression-type: lz4 consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 编写生产者
ProducerQuickStart.java
package com.kafka.sample; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.*; import java.util.Properties; @Slf4j public class ProducerQuickStart { public static void main(String[] args) { //1. kafka的配置信息 Properties prop = new Properties(); //kafka的链接信息 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //配置重试次数 prop.put(ProducerConfig.RETRIES_CONFIG, 5); //数据压缩 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4"); //ack配置 消息确认机制 默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 // prop.put(ProducerConfig.ACKS_CONFIG,"all"); 消息key的序列化器 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //2. 生产者对象 KafkaProducer
producer = new KafkaProducer (prop); //封装发送的消息 ProducerRecord producerRecord = new ProducerRecord ("itcast-topic-input", "key_001", "hello kafka"); //3. 发送消息 for (int i = 0; i < 5; i++) { producer.send(producerRecord); } //4. 关闭消息通道 必须关闭,否则消息发不出去 producer.close(); } } 4 编写kafkaStream流式处理
KafkaStreamQuickStart.java
package com.kafka.sample; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * 流式处理 */ public class KafkaStreamQuickStart { public static void main(String[] args) { //kafka的配置信心 Properties prop = new Properties(); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart"); //stream 构建器 StreamsBuilder streamsBuilder = new StreamsBuilder(); //流式计算 streamProcessor(streamsBuilder); //创建kafkaStream对象 KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop); //开启流式计算 kafkaStreams.start(); } /** * 流式计算 * 消息的内容:hello kafka hello itcast * @param streamsBuilder */ private static void streamProcessor(StreamsBuilder streamsBuilder) { //创建kstream对象,同时指定从那个topic中接收消息 KStream
stream = streamsBuilder.stream("itcast-topic-input"); /** * 处理消息的value */ stream.flatMapValues(new ValueMapper >() { @Override public Iterable apply(String value) { return Arrays.asList(value.split(" ")); } }) //按照value进行聚合处理 .groupBy((key,value)->value) //时间窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //统计单词的个数 .count() //转换为kStream .toStream() .map((key,value)->{ System.out.println("key:"+key+",vlaue:"+value); return new KeyValue<>(key.key().toString(),value.toString()); }) //发送消息 .to("itcast-topic-out"); } } 5. 编写消费者
ConsumerQuickStart.java
package com.kafka.sample; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumerQuickStart { public static void main(String[] args) { //1. 添加kafka的配置信息 Properties properties = new Properties(); // 配置链接信息 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2"); //配置消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2. 消费者对象 KafkaConsumer
consumer = new KafkaConsumer (properties); //3. 订阅主题 consumer.subscribe(Collections.singletonList("itcast-topic-out")); //当前线程一直监听消息 while(true){ //4. 消费者拉取消息: 每秒拉取一次 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.key()); System.out.println(record.value()); } } } } 启动项目:
- 在远端(192.168.200.130:9092)启动docker中的kafka容器
- 启动消费者ConsumerQuickStart的main函数
- 启动kafkastream的mian函数
- 启动生产者ProducerQuickStart的main函数
5. 控制台打印结果:
整个过程:
生产者向kafka中发送了5条“hello kafka”消息,topic均为itcast-topic-input。kafkastream监听这个topic,每10秒进行一次流式处理,将“hello kakfa”字符串分割,并统计每个单词出现的次数。然后转为kstream,发送消息到kafka中的topic=itcast-topic-out”。消费者监听“itcast-topic-out”的topic,消费消息。
三、Springboot整合kafkaStream
1. 配置文件新增
application.yml
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer compression-type: lz4 consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # kafkaStream新增以下配置 kafka: hosts: 192.168.200.130:9092 group: ${spring.application.name}
2. 在微服务中新增配置类
KafkaStreamConfig.java
package com.kafka.config; import lombok.Getter; import lombok.Setter; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import java.util.HashMap; import java.util.Map; /** * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数 */ @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; private String group; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Map
props = new HashMap<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } } 3. 使用kafkaStream监听消息
KafkaStreamHelloListener.java
package com.kafka.stream; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Duration; import java.util.Arrays; @Configuration @Slf4j public class KafkaStreamHelloListener { @Bean public KStream
kStream(StreamsBuilder streamsBuilder){ //创建kstream对象,同时指定从那个topic中接收消息 KStream stream = streamsBuilder.stream("itcast-topic-input"); stream.flatMapValues(new ValueMapper >() { @Override public Iterable apply(String value) { return Arrays.asList(value.split(" ")); } }) //根据value进行聚合分组 .groupBy((key,value)->value) //聚合计算时间间隔 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //求单词的个数 .count() .toStream() //处理后的结果转换为string字符串 .map((key,value)->{ System.out.println("key:"+key+",value:"+value); return new KeyValue<>(key.key().toString(),value.toString()); }) //发送消息 .to("itcast-topic-out"); return stream; } } 测试:
启动springboot应用程序,运行之前的ProducerQuickStart来生产消息,约10秒后,看到kafkaStream消息的处理结果
说明kafkaStream接收到消息并将多条消息进行了统一处理。
参考(推荐阅读):
- https://cloud.tencent.com/developer/article/2100664
- https://www.cnblogs.com/tree1123/p/11457851.html
猜你喜欢
- 13小时前部署YUM仓库及NFS共享存储
- 13小时前[Flink] Flink On Yarn(yarn-session.sh)启动错误
- 13小时前kafka服务器连接出现:[NetworkClient.java:935] [Producer clientId=producer-1] Node -1 disconnected原因分析
- 13小时前数据湖架构Hudi(二)Hudi版本0.12源码编译、Hudi集成spark、使用IDEA与spark对hudi表增删改查
- 13小时前Log4j2 配置日志记录发送到 kafka 中
- 13小时前[Halcon&3D] 3D手眼标定理论与示例解析
- 10小时前柠檬英语(柠檬英语复数怎么读)
- 7小时前互联网理财(互联网理财平台排名)
- 4小时前沈阳辉山乳业(沈阳辉山乳业是国企吗)
- 3小时前查征信去哪个银行(查征信去哪个银行位置)
网友评论
- 搜索
- 最新文章
- 热门文章