上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【超级详细】熟悉Kafka的基本使用方法的实验【Windows】

guduadmin11天前

文章目录

  • 前言
  • 一、实验平台
  • 二、实验内容
    • 一、Kafka与MySQL的组合使用
    • 1.实验要求
    • 2.在MySQL中操作
    • 3.安装Kafka
    • 4.使用Kafka
    • 5.在PyCharm中操作
    • 二、消费者手动提交
    • 1.实验要求
    • 2.在PyCharm中操作
    • 三、Kafka消费者订阅分区
    • 1.实验要求
    • 2.在终端操作
    • 3.在PyCharm中操作
    • 三、实验小bug
    • 总结

      前言

      Kafka 是由 Apache 软件基金会开发的一个开源消息队列平台,它是一种高性能、可扩展、分布式的发布-订阅消息系统。Kafka 的架构被设计为高效、低延迟,并具有高吞吐量、持久性和可靠性。

      在 Kafka 中,生产者将消息发布到主题(topic)中,消费者则从主题中消费消息,使用者可以将其看作一个 highly scalable 分布式 commit log 或者消息系统 (Messaging system),每个消息包含一个 key,一个 value 和一个额外的 timestamp。消息保留时间通过配置进行控制,当时间或空间满了的时候就根据策略来清除老数据,默认情况下老数据只保存 7 天。

      特点:

      1.高吞吐量:Kafka 在发布-订阅消息方面具有非常高的性能。它可以几乎实时地处理高速流入的大量数据。

      实时处理:Kafka 能够处理高达数以百万计的消息,并准确地将消息排序和在群组内进行调度。

      2.持久性和可靠性:与传统的消息系统不同,Kafka 具有持久性和可靠性。客户端自己提交当前偏移量,避免了可能出现的重复读取问题。

      3.可扩展性:Kafka 可以在不繁琐的配置或修改信息格式等环节就能进行扩展。

      4.多样化数据类型和来源:通过使用支持多种编程语言和操作系统的 API,Kafka 可以连接到许多各种来源的应用程序。

      总之,Kafka 具有高性能、低时延,适合处理大规模物联网设备、日志、报警信息、传感器数据、消息等。

      所以今天就来写一份关于熟悉Kafka的基本使用方法的实验,希望可以与小伙伴们一起探讨~~😉😉


      一、实验平台

      (1)操作系统:Windows7及以上(我用的是Windows 11)

      (2)Kafka版本:kafka_2.12-2.4.0

      (3)MySQL版本:8.0

      二、实验内容

      一、Kafka与MySQL的组合使用

      1.实验要求

      假设有一个学生表student,如下表所示,编写Python程序完成如下操作。

      (1)读取student表的数据内容,将其转换为JSON格式,发送给Kafka

      (2)从Kafka中获取JSON格式数据,打印出来

      snosnamessexsage
      95001JohnM23
      95002TomM23

      2.在MySQL中操作

      (1)打开MySQL

      方式一:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第1张

      方式二:

      可以通过 DOS 命令启动 MySQL 服务,windows+R,在搜索框中输入cmd,进去之后再输入services.msc,就进去服务系统里了,再启动就行

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第2张

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第3张

      进去以后输入密码就可以开始执行mysql语句了

      (2)创建数据库

      create database school001;
      

      (3)查看数据库

      show databases;
      

      发现数据库已经被创建完成

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第4张

      (4)使用该数据库

      use school001;
      

      (5)在该数据库中创建student表

      create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));
      

      (6)查询该数据库中的student表

      show tables;
      

      (7)向student表中插入值

      insert into student values("95001","John","M",23);
      
      insert into student values("95002","Tom","M",23);
      

      (8)查询student表中的数据

      select * from student;
      

      查询结果:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第5张

      (到这里我们的student表就创建成功了!)😊😊

      3.安装Kafka

      简单介绍:

      Kafka 的运行需要 Java 环境的支持,因此,安装 Kafka 前需要在 Windows 操作系统中安装 JDK

      访问 Kafka 官网,下载 Kafka 2.4.0的安装文件 kafka 2.12-2.4.0.1gz,解压缩到" C : \ "目录下(也可以放到D盘,不过最好放在D盘根目录下,不然后续代码容易报错,我试过)

      因为 Katka 的运行依赖于 Zookeeper ,因此,还需要下载并安装 Zookeeper 。当然, Kafka 也内置了 Zookeeper 服务,因此,也可以不额外安装 Zookeeper ,直接使用内置的Zookeeper 服务。为简单起见,这里直接使用内置的Zookeeper 服务。

      win+r—>输入cmd然后回车

      输入命令pip install kafka-python安装python-kafka模块

      查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)

      具体怎么安装可参考:kafka安装部署

      4.使用Kafka

      在实验中要用到Kafka就要先启动它的Zookeeper服务和Kafka,且在实验过程中,千万不可以将其关闭,一旦关闭,服务就会停止😡😡

      在 Windows 操作系统中

      打开第1个 cmd 命令行窗口,启动 Zookeeper 服务:

      cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
      
      .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties
      

      注意,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Zookeeper 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Zookeeper 服务就会停止

      如图:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第6张

      打开第2个 cmd 命令行窗口,然后输入如下命令启动 Kafka 服务:

      cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
      
      .\bin\windows\kafka-server-start.bat .\config\server.Properties
      

      同样地,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Kafka 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Kafka 服务就会停止

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第7张

      若执行上面的命令以后,如果启动失败,并且出现提示信息"此时不应有\QuickTime\QTSstem\QTJava.zip ",则需要把环境变量 CLASSPATH 的相关信息删除。具体方法是,

      右键单击"计算机",再单击"属性"一"高级系统设置"一"环境变量",然后,找到变量 CLASSPATH ,把类似下面的信息删除:

      C : Program Files (x86) QuickTime\QTSystem QTJava . zip

      然后重新启动计算机,让配置修改生效。重新启动计算机以后,再次按照上面的方法启动Zookeeper和Kafka

      为了测试 Kafka ,这里创建一个主题,名称为" topic_test ",其包含一个分区,只有一个副本。在第3个 cmd 命令行窗口中执行如下命令:

      cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
      
      .\bin\windows\kafka-topics.bat -- create -- zookeeper localhost:2181-- replication -
       factor 1-- partitions 1-- topic topic_test 
      

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第8张

      可以继续执行如下命令,查看 topic _ test 是否创建成功:

      .\bin\windows\kafka-topics.bat -- list -- zookeeper localhost:2181
      

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第9张

      如果创建成功,就可以在执行结果中看到 topic _ test

      继续在第3个 cmd 命令行窗口中执行如下命令,创建一个生产者来产生消息:

      .\bin\windows\kafka-console-producer.bat -- broker-list localhost :9092 -topic topic_test 
      

      该命令执行以后,屏幕上的光标会持续闪烁,这时,可以用键盘输入一些内容,例如:

      I love Kafka

      Kafka is good

      新建第4个 cmd 命令行窗口,执行如下命令来消费消息:

      cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
      
      .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic_test --from-beginning 
      

      该命令执行以后,屏幕上显示刚才输入的语句" I love Kafka “和” Kafka is good "

      5.在PyCharm中操作

      1. 创建一个.py文件,写入以下代码,用于实现读取student表的数据内容,将其转换为JSON格式,发送给Kafka的功能
      # 运行前先在win上启动zookeap和kafka
      # 导入相关模块
      from kafka import KafkaProducer
      import json
      # 连接kafka  json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码
      producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
      # 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开
      data = {
          'sno': '95001',
          'sname': 'John',
          'ssex': 'M',
          'sage': 23
      }
      # 发送数据
      producer.send('test001', data)
      # 关闭资源
      producer.close()
      

      运行结果如下图所示:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第10张

      1. 创建一个.py文件,写入以下代码,用于实现从Kafka中获取JSON格式数据,打印出来的功能
      # 运行前先在win上启动mysql
      # 导入消费模块
      import json
      # 导入kafka的消费模块
      from kafka import KafkaConsumer
      import json
      import pymysql.cursors
      # 连接kafka
      consumer = KafkaConsumer('test001', bootstrap_servers='localhost:9092', group_id=None, auto_offset_reset='earliest')
      # 对获取的数据进行解析
      for msg in consumer:
          # 转换为字符串类型
          msg1 = str(msg.value, encoding=('utf-8'))
          # 将字符串的数据加载为字典
          dict = json.loads(msg1)
          # 连接数据库
          connect = pymysql.Connect(
              host='localhost',
              port=3306,
              user='root',
              passwd='xxxxxxxx',#这是你MySQL数据库的密码
              db='school001',
              charset='utf8'
          )
          # 获取操作数抠库的对象<游标>
          cursor = connect.cursor()
          # 将数抠织存到mysqL(插入数掷)
          # 定义sql语句
          sql = "select * from student;"
          # 将数据作为参数传速给sqL,保存到hrgsql
          cursor.execute(sql)
          # 提交
          connect.commit()
          for row in cursor.fetchall():
              print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
          print("共查询出", cursor.rowcount, '条数据')
          connect.close()
      

      运行结果如下图所示:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第11张

      二、消费者手动提交

      1.实验要求

      生成一个data.json文件,内容如下:

      data = [

      {“name”: “Tony”, “age”: 21, “hobbies”: [“basketball”, “tennis”]},

      {“name”: “Lisa”, “age”: 20, “hobbies”: [“sing”, “dance”]},

      ]

      根据上面给出的data.json文件,执行如下操作。

      (1)编写生产者程序,将JSON文件数据发送给Kafka。

      (2)编写消费者程序,读取Kafka的JSON格式数据,并手动提交偏移量。

      2.在PyCharm中操作

      1. 创建一个Test写入以下代码,来实现生成data.json文件的功能:
      import json
      data = [
          {"name": "Tony", "age": 21, "hobbies": ["basketball", "tennis"]},
          {"name": "Lisa", "age": 20, "hobbies": ["sing", "dance"]},
      ]
      with open('../../data.json', 'w') as f:
          json.dump(data, f)
      
      1. 创建一个.py文件,编写生产者程序,来实现将JSON文件数据发送给Kafka的功能
      # 可以使用 Python 的 json 模块读取 data.json 文件,并将数据转换为字符串后发送给 Kafka
      from kafka import KafkaProducer
      import json
      data = [
          {
              "name": "Tony",
              "age": 21,
              "hobbies": ["basketball", "tennis"]
          },
          {
              "name": "Lisa",
              "age": 20,
              "hobbies": ["sing", "dance"]
          }
      ]
      producer = KafkaProducer(bootstrap_servers='localhost:9092')
      for item in data:
          # 将数据转换为字符串格式并发送给 Kafka 主题 test
          message = json.dumps(item).encode('utf-8')
          producer.send('test', value=message)
      producer.close()
      

      运行结果如下图所示:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第12张

      1. 创建一个.py文件,编写消费者程序,来实现读取Kafka的JSON格式数据,并手动提交偏移量的功能
      # 我们可以使用 Kafka 消费者 API 进行数据消费,并在处理完每个消息后手动提交偏移量。
      from kafka import KafkaConsumer
      import json
      # 配置 Kafka 消费者,指定主题和分组等信息
      consumer = KafkaConsumer(
          'test',
          bootstrap_servers=['localhost:9092'],
          auto_offset_reset='earliest',
          enable_auto_commit=False,  # 禁止自动提交偏移量
          group_id='my-group')
      # 循环消费 Kafka 消息
      for message in consumer:
          # 将传入的二进制消息内容解码为 JSON 格式的字符串
          item = json.loads(message.value.decode('utf-8'))
          print(item)
          # 手动提交偏移量,确保下次消费时从正确的位置开始
          consumer.commit()
      

      运行结果如下图所示:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第13张

      三、Kafka消费者订阅分区

      1.实验要求

      在命令行窗口中启动Kafka后,手动创建主题 “assign_topic” ,分区数量为2。具体命令如下:

      .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic
      

      根据上面给出的主题,完成如下操作。

      (1)编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic” 。

      (2)编写消费者程序1,订阅主题的分区0,只消费分区0数据。

      (3)编写消费者程序2,订阅主题的分区1,只消费分区1数据。

      2.在终端操作

      首先要完成主题以及分区的创建才能编写程序,不然程序会报错

      步骤:

      1. 使用windows+r,在弹窗中输入cmd打开终端
      2. 在终端中输入命令,创建主题和分区:
      cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
      
      .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic
      

      结果如下图(这是我之前已经创建好的结果图):【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入代码片,第14张

      3.在PyCharm中操作

      1. 创建一个.py文件,写入以下代码,用于实现编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic的功能:
      from kafka import KafkaProducer
      import uuid
      producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
      for i in range(5):
          message = str(uuid.uuid4()).encode('utf-8')
          producer.send('assign_topic', value=message)
      producer.close()
      

      运行结果如下图所示:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第15张

      1. 创建一个.py文件,写入以下代码,用于实现订阅主题的分区0,只消费分区0数据的功能:
      from kafka import KafkaConsumer, TopicPartition
      consumer = KafkaConsumer(
          bootstrap_servers=['localhost:9092'],
          auto_offset_reset='earliest',
          enable_auto_commit=False,
          consumer_timeout_ms=1000
      )
      consumer.assign([TopicPartition('assign_topic', 0)])
      for message in consumer:
          print("Partition 0 - Message value: {}".format(message.value))
      consumer.close()
      

      运行结果如下图所示:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第16张

      1. 创建一个.py文件,写入以下代码,用于实现订阅主题的分区1,只消费分区1数据的功能:
      from kafka import KafkaConsumer, TopicPartition
      consumer = KafkaConsumer(
          bootstrap_servers=['localhost:9092'],
          auto_offset_reset='earliest',
          enable_auto_commit=False,
          consumer_timeout_ms=1000
      )
      consumer.assign([TopicPartition('assign_topic', 1)])
      for message in consumer:
          print("Partition 1 - Message value: {}".format(message.value))
      consumer.close()
      

      运行结果如下图所示:

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第17张

      三、实验小bug

      1. Kafka连接报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable 是什么原因:?

      答:是因为程序运行了多次的原因

      把tmp文件和logs文件里面的东西都删掉,就可以解决了

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第18张

      【超级详细】熟悉Kafka的基本使用方法的实验【Windows】,在这里插入图片描述,第19张

      2. 为什么消费者程序1中有东西输出而消费者程序2中什么却什么也没输出?

      消费者程序1和消费者程序2是对同一个主题的两个消费者应用程序。可以针对以下情况进行分析。

      在主题 assign_topic 中,Kafka有多个分区,可用于并行处理消息。在这里,被消费的消息都来自此主题的第一个分区(即分区 0)。

      消费者程序1使用了 .subscribe() 方法来订阅主题,这将导致消费者加入到消费组中,然后通过负载均衡策略从所有分区接收消息。因此,消费者程序1输出打印了分区 0 中的消息。

      消费者程序2使用了 .assign() 方法手动分配消费者处理的分区,而且只分配了主题 assign_topic 的第一个分区(即分区 0)。但是,由于该程序没有运行足够长的时间,并且没有消费到任何未提交的偏移量,所以当应用程序终止时不会向Kafka服务器发送任何提交请求,这就可能导致在下一次启动时重复消费确认过的消息。因此,在生产环境中,请务必根据具体情况定期地提交所消费的分区的偏移量。


      总结

      以上就是对Kafka的基本使用方法的实验啦,有不明白的地方可以留言哦,希望能共同进步~~😀😀😀😀😀😀

网友评论

搜索
最新文章
热门文章
热门标签