上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Kubernetes 部署 Kafka 集群

guduadmin19小时前

docker-compose 部署 kafka

  • 镜像地址

    kafka官网

    kafka镜像

    zookeeper镜像

  • Kafka 4.0 将移除zookeeper,仅支持KRaft

    所以我们使用KRaft模式,这也是kafka:3.4的默认模式.

  • 由于这是一个非 root 的容器,挂载的文件和目录必须具有 UID 1001 的适当权限

    sudo chown -R 1001:1001 ./kafka_data

  • 创建kafka容器 docker compose -f docker-compose.yml up -d
    version: "3"
    services:
      kafka:
        image: bitnami/kafka:3.4.1
        ports:
          - "9092:9092"
        volumes:
          - "./kafka_data:/bitnami"
        environment:
          # KRaft settings
          - KAFKA_CFG_NODE_ID=0
          - KAFKA_CFG_PROCESS_ROLES=controller,broker
          - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
          # Listeners
          - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
          - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
          - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
          - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
          - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    
    • 集群方式可参考链接

      在 Kubernetes 上部署 Kafka 集群

      1. yaml示例使用的是KRaft模式
      2. 创建无头Service kafka-headless,用于kafka间相互通信
      3. 创建Service kafka, 用于外部访问kafka
      4. 启动脚本写入ConfigMap, 只是在默认的启动脚本前增加了环境变量KAFKA_CFG_NODE_ID赋值:
        1. 集群中每个副本都需要设置KAFKA_CFG_NODE_ID,且必须为整数
        2. StatefulSet中将副本名称metadata.name赋值给环境变量MY_POD_NAME参考文档。(也可以直接使用环境变量HOSTNAME,副本名称是默认的主机名)
        3. 截取环境变量MY_POD_NAME最后的序号,赋值给环境变量KAFKA_CFG_NODE_ID,比如:MY_POD_NAME=kafka-0,那么 KAFKA_CFG_NODE_ID=0
        4. 环境变量KAFKA_CFG_CONTROLLER_QUORUM_VOTERS也可以在此脚本中自动生成,可参考。yaml示例中直接设置成了3个。
      5. 非root容器需要设置securityContext
      6. 使用模板volumeClaimTemplates动态创建存储,每个副本挂载单独的存储。例子里使用的是华为云现有的storageClassName: csi-disk,如果没有声明StorageClass,可以参考文档提前创建。
      apiVersion: v1
      kind: Service
      metadata:
        name: kafka-headless
        labels:
          app: kafka
      spec:
        type: ClusterIP
        clusterIP: None
        ports:
        - name: kafka-client
          port: 9092
          targetPort: kafka-client
        - name: controller
          port: 9093
          targetPort: controller   
        selector:
          app: kafka
      ---
      #部署 Service,用于外部访问 Kafka
      apiVersion: v1
      kind: Service
      metadata:
        name: kafka
        labels:
          app: kafka
      spec:
        type: ClusterIP
        ports:
        - name: kafka-client
          port: 9092
          targetPort: kafka-client
        selector:
          app: kafka
      ---
      # 分别在 StatefulSet 中的每个 Pod 中获取相应的序号作为 KAFKA_CFG_NODE_ID(只能是整数),然后再执行启动脚本
      apiVersion: v1
      kind: ConfigMap
      metadata:
        name: ldc-kafka-scripts
      data:
        setup.sh: |-
          #!/bin/bash
          export KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} 
          exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
      ---
      apiVersion: apps/v1
      kind: StatefulSet
      metadata:
        name: kafka
        labels:
          app: kafka
      spec:
        selector:
          matchLabels:
            app: kafka
        serviceName: kafka-headless
        podManagementPolicy: Parallel
        replicas: 3 # 部署完成后,将会创建 3 个 Kafka 副本
        updateStrategy:
          type: RollingUpdate
        template:
          metadata:
            labels:
              app: kafka
          spec:
            affinity:
              podAntiAffinity: # 工作负载反亲和
                preferredDuringSchedulingIgnoredDuringExecution: # 尽量满足如下条件
                - weight: 1
                  podAffinityTerm:
                    labelSelector: # 选择Pod的标签,与工作负载本身反亲和
                      matchExpressions:
                        - key: "app"
                          operator: In
                          values:
                            - kafka
                    topologyKey: "kubernetes.io/hostname"  # 在节点上起作用    
            containers:
            - name: kafka
              image: bitnami/kafka:3.4.1
              imagePullPolicy: "IfNotPresent"
              command:
              - /opt/leaderchain/setup.sh
              env:
              - name: BITNAMI_DEBUG
                value: "true" # true 详细日志
              # KRaft settings 
              - name: MY_POD_NAME # 用于生成 KAFKA_CFG_NODE_ID
                valueFrom:
                  fieldRef:
                    fieldPath: metadata.name            
              - name: KAFKA_CFG_PROCESS_ROLES
                value: "controller,broker"
              - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
                value: "0@kafka-0.kafka-headless:9093,1@kafka-1.kafka-headless:9093,2@kafka-2.kafka-headless:9093"
              - name: KAFKA_KRAFT_CLUSTER_ID
                value: "Jc7hwCMorEyPprSI1Iw4sW"  
              # Listeners            
              - name: KAFKA_CFG_LISTENERS
                value: "PLAINTEXT://:9092,CONTROLLER://:9093"
              - name: KAFKA_CFG_ADVERTISED_LISTENERS
                value: "PLAINTEXT://:9092"
              - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
                value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
              - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
                value: "CONTROLLER"
              - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
                value: "PLAINTEXT"
        
              ports:
              - containerPort: 9092
                name: kafka-client                  
              - containerPort: 9093
                name: controller
                protocol: TCP                     
              volumeMounts:
              - mountPath: /bitnami/kafka
                name: data
              - mountPath: /opt/leaderchain/setup.sh
                name: scripts
                subPath: setup.sh
                readOnly: true      
            securityContext:
              fsGroup: 1001
              runAsUser: 1001
            volumes:    
            - configMap:
                defaultMode: 493
                name: ldc-kafka-scripts
              name: scripts                   
        volumeClaimTemplates:
        - apiVersion: v1
          kind: PersistentVolumeClaim
          metadata:
            name: data
            annotations:
              everest.io/disk-volume-type: SAS
            labels:
              failure-domain.beta.kubernetes.io/region: cn-south-1
              failure-domain.beta.kubernetes.io/zone: cn-south-2b      
          spec:
            accessModes: [ "ReadWriteOnce" ] 
            storageClassName: csi-disk
            resources:
              requests:
                storage: 10Gi
      

      主题Topic

      1. 查看帮助(容器中kafka的脚本目录为:/opt/bitnami/kafka/bin)

        sh kafka-topics.sh --help

      2. 获取所有的主题

        sh kafka-topics.sh --bootstrap-server localhost:9092 --list

      3. 创建一个Topic

        –partitions(分区数量)

        –topic(主题名)

        –replication-factor(副本数量,不能大于broker的数量)

        sh kafka-topics.sh --create --topic myTopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092

      4. 查询 Topic 的详细信息

        sh kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic myTopic

      5. 删除 Topic (Topic 中所有的消息数据都将被永久删除,且无法恢复)

        sh kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic myTopic

      6. 增加主题分区数量 (如果要减少分区的数量,只能删除Topic,然后重新创建)

        sh kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --alter --partitions 3

      7. 修改数据过期时间 (kafka默认的只保存7天的数据,retention.ms=-1表示不过期)

        sh kafka-topics.sh --bootstrap-server localhost:9092 -topic myTopic --alter --config retention.ms=259200000

      8. 修改多字段

        sh kafka-topics.sh --bootstrap-server localhost:9092 -topic myTopic --alter --config retention.ms=259200000 max.message.bytes=128000

      9. 修改 Topic 副本数
        1. 编写分配脚本get-reassign-tpl.json

          echo '{"topics":[{"topic":"myTopic"}],"version": 1}' > get-reassign-tpl.json

        2. 执行分配计划,用于生成json格式的文件

          sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file get-reassign-tpl.json --broker-list "0,1,2" --generate

        3. 复制上一步返回结果中的json,修改副本字段replicas,填写broker.id,生成reassign.json文件

          echo '{"version":1,"partitions":[{"topic":"myTopic","partition":0,"replicas":[1,2]}]}' > reassign.json

        4. 利用上一步生成的reassign.json,进行topic的重新分配

          sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --execute

        5. 查看分配的进度

          sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --verify

        6. 分配完成,再次查询详情

          sh kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic myTopic

      测试发送消息和接收消息

      1. 开启一个 Producer(生产者)窗口,然后生产几条信息
      sh kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic
      >hello
      >world
      
      1. 创建一个 Consumer(消费者)窗口:

        sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --consumer-property group.id=myGroup --from-beginning

        –from-beginning 如果消费者尚未建立消费偏移量(offset),那么就从Topic的第一条消息开始消费

        –consumer-property group.id=myGroup 消费者的group.id,不设置会自动生成

        如果存在group.id相同的多个消费者窗口,只会有其中一个消费者收到消息

      2. 列出所有主题中的所有用户组

        sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

      3. 查询消费者组详情(数据积压情况)

        sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup

        LogEndOffset:下一条将要被加入到日志的消息的位移

        CurrentOffset:当前消费的位移

        LAG:消息堆积量:消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量也称之为消费滞后量

      4. 更多操作,可以参考官方文档

      Open-Source Web UI for Apache Kafka

      KRaft模式的kafka没有zookeeper,图形客户端工具offsetexplorer无法连接,找到一套开源的 Web UI,Docker、Helm下的安装方式可参考官方文档。

      使用kubectl apply命令安装provectuslabs/kafka-ui:

      1. kubectl apply -f k8s.kafka-ui.yaml
      2. k8s.kafka-ui.yaml示例如下:
      apiVersion: v1
      kind: Service
      metadata:
        name: kafka-ui
        labels:
          app: kafka-ui
      spec:
        type: NodePort
        ports:
        - name: web
          port: 8080
          targetPort: web
          nodePort: 0
        selector:
          app: kafka-ui
      ---
      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: kafka-ui
        labels:
          app: kafka-ui
      spec:
        selector:
          matchLabels:
            app: kafka-ui
        replicas: 1
        template:
          metadata:
            labels:
              app: kafka-ui
          spec:
            containers:
            - name: kafka-ui
              image: provectuslabs/kafka-ui:latest
              # imagePullPolicy: "IfNotPresent"
              env:
              - name: KAFKA_CLUSTERS_0_NAME
                value: "kafka-c0"
              - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
                value: "kafka-0.kafka-headless:9092"              
              - name: DYNAMIC_CONFIG_ENABLED
                value: "true"
              - name: AUTH_TYPE # https://docs.kafka-ui.provectus.io/configuration/authentication/basic-authentication
                value: "LOGIN_FORM"
              - name: SPRING_SECURITY_USER_NAME
                value: "name_admin"    
              - name: SPRING_SECURITY_USER_PASSWORD
                value: "password_123456"                                   
              ports:
              - name: web
                containerPort: 8080
      

网友评论

搜索
最新文章
热门文章
热门标签