上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

《Kafka系列》Kafka常见问题处理记录

guduadmin11天前

文章目录

  • Kafka常见问题处理记录
  • 一、kafka创建topic失败, Replication factor: 1 larger than available brokers: 0
  • 二、服务器Cannot allocate memory
  • 三、Offset Explorer连接Kafka问题集合,(Timeout expired while fetching topic metadata),(Uable to find any brokers)
    • 1. Timeout expired while fetching topic metadata
    • 2.Uable to find any brokers
    • 四、kafka数据到hudi丢失数据问题
      • 1.报错问题
      • 2.根据提示添加配置文件 -> option("failOnDataLoss","false")

        Kafka常见问题处理记录

        一、kafka创建topic失败, Replication factor: 1 larger than available brokers: 0

        1.创建语句如下所示,按照习惯在添加zookeeper参数的时候,指定了zxy:2181/kafka,但是却创建失败,Error while executing topic command : Replication factor: 1 larger than available brokers: 0.

        [root@zxy bin]# kafka-topics.sh --create --topic tp1 --zookeeper zxy:2181/kafka --partitions 3 --replication-factor 1
        Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
        [2023-03-27 17:15:46,605] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
         (kafka.admin.TopicCommand$)
        

        2.检查各个broker的server.properties文件

        发现在配置参数的时候,zookeeper.connect指定的是zxy:2181,zxy:2182,zxy:2183

        [root@zxy config]# cat server01.properties
        broker.id=1
        port=9091
        listeners=PLAINTEXT://localhost:9091
        log.dirs=/zxy/apps/kafkaCluster/kafkaLog01
        zookeeper.connect=zxy:2181,zxy:2182,zxy:2183
        

        3.指定zookeeper参数为zxy:2181,创建成功

        [root@zxy bin]# kafka-topics.sh --create --topic tp1 --zookeeper zxy:2181 --partitions 3 --replication-factor 1
        Created topic "tp1".
        

        二、服务器Cannot allocate memory

        问题:在使用服务器中遇到内存无法分配到问题导致启动不了Kafka

        思路一:根据查找方法,最后将vm.overcommit_memory设为1,但是对我这个问题没有太大帮助

        管理员下三种方案

        1.编辑/etc/sysctl.conf ,改vm.overcommit_memory=1,然后sysctl -p使配置文件生效

        2.sysctl vm.overcommit_memory=1

        3.echo 1 > /proc/sys/vm/overcommit_memory,然后sysctl –p永久生效

        思路二:然后通过修改为kafka分配的内存,还是没有解决问题

        if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
            export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        fi
        

        修改为

        if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
            export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
        fi
        

        思路三:最后想到zookeeper已经启动了,是不是因为它分配的内存太多了,减少为zookeeper分配的内存

        if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
            export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
        fi
        

        三、Offset Explorer连接Kafka问题集合,(Timeout expired while fetching topic metadata),(Uable to find any brokers)

        1. Timeout expired while fetching topic metadata

        1.Offset Explorer配置好zookeeper的连接地址后

        《Kafka系列》Kafka常见问题处理记录,在这里插入图片描述,第1张

        2.在查看Topics的时候,报错Timeout expired while fetching topic metadata

        《Kafka系列》Kafka常见问题处理记录,在这里插入图片描述,第2张

        3.排查发现应该是kafka的server.properties文件中的advertised.listeners问题

        • 修改前是

          advertised.listeners=PLAINTEXT://localhost:9091
          
        • 修改后

          advertised.listeners=PLAINTEXT://:9091
          

          4.修改好配置文件后,重启Kafka即可

          2.Uable to find any brokers

          1.重新连接后,又遇到Uable to find any brokers问题,访问不到的原因是,Offset Explorer访问Kafka的时候,是因为Kafka tool是通过主机名访问的,所以要在windows的hosts文件中配置上Kafka服务器的IP映射,配置目录在C:\Windows\System32\drivers\etc

          《Kafka系列》Kafka常见问题处理记录,在这里插入图片描述,第3张

          2.打开C:\Windows\System32\drivers\etc,修改配置文件hosts,添加Kafka主机的映射关系

          C:\Windows\System32\drivers\etc\hosts
          125.262.96.387	zxy
          

          3.断开连接后,在Advanced的Bootstrap servers处配置好Kafka服务,重新连接即可

          《Kafka系列》Kafka常见问题处理记录,在这里插入图片描述,第4张

          4.重新连接后,即可以看到已经可以正常看到Topics了

          《Kafka系列》Kafka常见问题处理记录,在这里插入图片描述,第5张

          四、kafka数据到hudi丢失数据问题

          1.报错问题

          Caused by: java.lang.IllegalStateException: Cannot fetch offset 196 (GroupId: spark-kafka-source-6f1df211-fdcb-4bcc-813d-55c4f9661c9d-1732697149-executor, TopicPartition: news-0).
          Some data may have been lost because they are not available in Kafka any more; either the
           data was aged out by Kafka or the topic may have been deleted before all the data in the
           topic was processed. If you don't want your streaming query to fail on such cases, set the
           source option "failOnDataLoss" to "false".
                  at org.apache.spark.sql.kafka010.InternalKafkaConsumer$.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss0(KafkaDataConsumer.scala:642)
                  at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss(KafkaDataConsumer.scala:448)
                  at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get.apply(KafkaDataConsumer.scala:269)
                  at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get.apply(KafkaDataConsumer.scala:234)
                  at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
                  at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
                  at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
          
          • 翻译结果

            最终应用程序状态:失败,exitCode:15,(原因:用户类引发异常:org.apache.spark.sql.streaming.StreamingQueryException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近的失败:阶段2.0中的任务0.3丢失(TID 5,hadoop,executor 1):java.lang.IllegalStateException:无法获取偏移量196(GroupId:spark-kafka-source-e2868915-6d7a-4aef-99a8-3d1c5ef45147-1732697149-executor,主题分区:news-0)。

            一些数据可能已经丢失,因为它们在卡夫卡不再可用;要么是数据被卡夫卡过时了,要么是主题在处理完主题中的所有数据之前被删除了。如果您不希望流式查询在这种情况下失败,请将源选项“failOnDataLoss”设置为“false”。

            2.根据提示添加配置文件 -> option(“failOnDataLoss”,“false”)

            //5.读取Kafka源数据
                    val df: DataFrame = spark.readStream.format("kafka")
                        .option("kafka.bootstrap.servers", params.brokerList)
                        .option("subscribe", params.topic)
                        .option("startingOffsets", "latest")
                        .option("kafka.consumer.commit.groupid", "action-log-group01")
                        .option("failOnDataLoss","false")
                        .load()
            

            tips:认为添加这个配置不太妥当,但尚未找到适宜的方法

            哪位博主知道的,希望可以指点指点

网友评论

搜索
最新文章
热门文章
热门标签