上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Kafka:主题创建、分区修改查看、生产者、消费者

guduadmin11天前

文章目录

  • Kafka后台操作
    • 1)主题
    • 2)分区
    • 3)生产者
    • 4)消费者组

      Kafka后台操作

      1)主题

      1.创建主题

      ./bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 3 --partitions 1 --topic second
      

      2.查看所有主题

      ./bin/kafka-topics.sh --list --bootstrap-server hadoop102:9092
      

      3.查看详细主题

      ./bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic test
      

      Kafka:主题创建、分区修改查看、生产者、消费者,在这里插入图片描述,第1张

      序号从0开始计算

      Partition:分区数,该主题有3个分区

      Replica:副本数,该主题有3个副本

      Leader:副本数中的主的序号,生产消费的对象

      2)分区

      1.修改分区数

      ./bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --partitions 3 --topic test4
      

      修改的分区数量不可以小于或者等于当前主题分区的数量,否则会报错

      在根目录kafka-logs文件中存放了储消费者组的偏移量信息的文件。所有创建的主题都会存放在该文件夹中

      Kafka:主题创建、分区修改查看、生产者、消费者,[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1CMWEFhe-1681268256863)(C:\Users\HongYongX\Desktop\norebook\实习计划\kafka\image-20230412101450737.png)],第2张

      __consumer_offsets-xx:当中的xx是根据分区主题数量来确定的,里面存储了当前分区的消费者组的偏移量信息。例如个话题有5个分区,那么他就会有5个对应的文件。而-xx前面代表的是主题的名称。

      在创建Kafka集群时__consumer_offsets默认会创建50个分区,文件内的 __consumer_offsets-xx的数量应该大于50个

      选择一个文件进去查看

      Kafka:主题创建、分区修改查看、生产者、消费者,[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Z1eqY2j-1681268256864)(C:\Users\HongYongX\Desktop\norebook\实习计划\kafka\image-20230412103712106.png)],第3张

      log: 分区的日志文件,用于存储消费者组的偏移量信息。每个日志文件由多个消息记录组成,其中包含了消费者组的偏移量提交记录。

      index:分区的索引文件,用于加速偏移量查询操作。索引文件保存了消息偏移量和日志文件中的物理偏移量之间的映射关系。

      timeindex:分区的时间索引文件,用于加速偏移量查询操作。时间索引文件保存了消息的时间戳和日志文件中的物理偏移量之间的映射关系。

      snapshot:分区的快照文件,用于记录偏移量提交的快照信息。快照文件包含了偏移量提交记录的摘要信息,以便在恢复时加快恢复速度。

      3)生产者

      1.启动生产者

      ./bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
      

      kafka-console-producer.sh 是 Kafka 提供的一个 shell 脚本,用于在命令行中启动一个控制台生产者,用于向 Kafka 集群发送消息

      --broker-list: 指定 Kafka brokers 的地址和端口号,用冒号分隔

      --topic: 指定要发送消息的主题名。

      4)消费者组

      1.查看消费者组

      ./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --list
      

      2.创建消费者从头开始读并设置消费者组

      ./bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --from-beginning --group testGroup
      

      bootstrap-server :参数用于指定 Kafka 集群的地址

      topic :参数用于指定要消费的主题名称

      group :参数用于指定消费者组的名称

      from-beginning :读取最早的偏移量

      group :指定消费者组名称

      3.创建消费者时设置偏移量为最新

      ./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group testGroup --topic first --reset-offsets --to-latest --execute
      #启动
      ./bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first  --group testGroup
      

      reset-offsets:表示要重置偏移量

      to-latest:表示将偏移量移到最新位置

      execute:表示执行偏移量重置操作

      执行偏移量重置后,会移动到最新位置,如果需要修改,得重新设置,谨慎操作。

      Kafka:主题创建、分区修改查看、生产者、消费者,在这里插入图片描述,第4张

      NEW-OFF:表示设置的新的偏移量

      4.创建消费者时设置具体的偏移量数据(重置偏移量到6)

      ./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group testGroup --topic first --reset-offsets --to-offset 6 --execute
      #启动
      ./bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first  --group testGroup
      

      Kafka:主题创建、分区修改查看、生产者、消费者,在这里插入图片描述,第5张

      5.创建消费者时设置相对的偏移量数据(偏移量向前偏移2个位置)

      ./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group testGroup --topic first --reset-offsets --shift-by 2 --execute
      

      Kafka:主题创建、分区修改查看、生产者、消费者,[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3oN5RByd-1681268256866)(C:\Users\HongYongX\Desktop\norebook\实习计划\kafka\image-20230412092613689.png)],第6张

      reset-offsets:表示要重置偏移量

      to-offset:参数可以将偏移量重置到指定的偏移量位置

      shift-by:参数可以将偏移量进行相对偏移量重置

      6.查看消费者组主具体信息

      ./bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group mentugroup
      

      Kafka:主题创建、分区修改查看、生产者、消费者,在这里插入图片描述,第7张

      TOPIC:消费主题名称

      PARTITION:分区Id

      CURRENT-OFFSET: 当前消费偏移量,即消费者当前消费到的消息的偏移量

      LOG-END-OFFSET: 分区的最新偏移量,即分区中最新消息的偏移量

      LAG:消费者落后于最新偏移量的消息数量,即LAG = LOG-END-OFFSET - CURRENT-OFFSET(最新-当前)

      CONSUMER-ID:消费者ID

      HOST:消费者所在主机

      CLIENT-ID:消费者客户端ID

      7.删除消费者组

      ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testGroupNew1 --delete
      

网友评论

搜索
最新文章
热门文章
热门标签