docker-compose 部署 kafka
- 镜像地址
kafka官网
kafka镜像
zookeeper镜像
- Kafka 4.0 将移除zookeeper,仅支持KRaft
所以我们使用KRaft模式,这也是kafka:3.4的默认模式.
- 由于这是一个非 root 的容器,挂载的文件和目录必须具有 UID 1001 的适当权限
sudo chown -R 1001:1001 ./kafka_data
- 创建kafka容器 docker compose -f docker-compose.yml up -d
version: "3" services: kafka: image: bitnami/kafka:3.4.1 ports: - "9092:9092" volumes: - "./kafka_data:/bitnami" environment: # KRaft settings - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 # Listeners - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- 集群方式可参考链接
在 Kubernetes 上部署 Kafka 集群
- yaml示例使用的是KRaft模式
- 创建无头Service kafka-headless,用于kafka间相互通信
- 创建Service kafka, 用于外部访问kafka
- 启动脚本写入ConfigMap, 只是在默认的启动脚本前增加了环境变量KAFKA_CFG_NODE_ID赋值:
- 集群中每个副本都需要设置KAFKA_CFG_NODE_ID,且必须为整数
- StatefulSet中将副本名称metadata.name赋值给环境变量MY_POD_NAME参考文档。(也可以直接使用环境变量HOSTNAME,副本名称是默认的主机名)
- 截取环境变量MY_POD_NAME最后的序号,赋值给环境变量KAFKA_CFG_NODE_ID,比如:MY_POD_NAME=kafka-0,那么 KAFKA_CFG_NODE_ID=0
- 环境变量KAFKA_CFG_CONTROLLER_QUORUM_VOTERS也可以在此脚本中自动生成,可参考。yaml示例中直接设置成了3个。
- 非root容器需要设置securityContext
- 使用模板volumeClaimTemplates动态创建存储,每个副本挂载单独的存储。例子里使用的是华为云现有的storageClassName: csi-disk,如果没有声明StorageClass,可以参考文档提前创建。
apiVersion: v1 kind: Service metadata: name: kafka-headless labels: app: kafka spec: type: ClusterIP clusterIP: None ports: - name: kafka-client port: 9092 targetPort: kafka-client - name: controller port: 9093 targetPort: controller selector: app: kafka --- #部署 Service,用于外部访问 Kafka apiVersion: v1 kind: Service metadata: name: kafka labels: app: kafka spec: type: ClusterIP ports: - name: kafka-client port: 9092 targetPort: kafka-client selector: app: kafka --- # 分别在 StatefulSet 中的每个 Pod 中获取相应的序号作为 KAFKA_CFG_NODE_ID(只能是整数),然后再执行启动脚本 apiVersion: v1 kind: ConfigMap metadata: name: ldc-kafka-scripts data: setup.sh: |- #!/bin/bash export KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh --- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka labels: app: kafka spec: selector: matchLabels: app: kafka serviceName: kafka-headless podManagementPolicy: Parallel replicas: 3 # 部署完成后,将会创建 3 个 Kafka 副本 updateStrategy: type: RollingUpdate template: metadata: labels: app: kafka spec: affinity: podAntiAffinity: # 工作负载反亲和 preferredDuringSchedulingIgnoredDuringExecution: # 尽量满足如下条件 - weight: 1 podAffinityTerm: labelSelector: # 选择Pod的标签,与工作负载本身反亲和 matchExpressions: - key: "app" operator: In values: - kafka topologyKey: "kubernetes.io/hostname" # 在节点上起作用 containers: - name: kafka image: bitnami/kafka:3.4.1 imagePullPolicy: "IfNotPresent" command: - /opt/leaderchain/setup.sh env: - name: BITNAMI_DEBUG value: "true" # true 详细日志 # KRaft settings - name: MY_POD_NAME # 用于生成 KAFKA_CFG_NODE_ID valueFrom: fieldRef: fieldPath: metadata.name - name: KAFKA_CFG_PROCESS_ROLES value: "controller,broker" - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS value: "0@kafka-0.kafka-headless:9093,1@kafka-1.kafka-headless:9093,2@kafka-2.kafka-headless:9093" - name: KAFKA_KRAFT_CLUSTER_ID value: "Jc7hwCMorEyPprSI1Iw4sW" # Listeners - name: KAFKA_CFG_LISTENERS value: "PLAINTEXT://:9092,CONTROLLER://:9093" - name: KAFKA_CFG_ADVERTISED_LISTENERS value: "PLAINTEXT://:9092" - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES value: "CONTROLLER" - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME value: "PLAINTEXT" ports: - containerPort: 9092 name: kafka-client - containerPort: 9093 name: controller protocol: TCP volumeMounts: - mountPath: /bitnami/kafka name: data - mountPath: /opt/leaderchain/setup.sh name: scripts subPath: setup.sh readOnly: true securityContext: fsGroup: 1001 runAsUser: 1001 volumes: - configMap: defaultMode: 493 name: ldc-kafka-scripts name: scripts volumeClaimTemplates: - apiVersion: v1 kind: PersistentVolumeClaim metadata: name: data annotations: everest.io/disk-volume-type: SAS labels: failure-domain.beta.kubernetes.io/region: cn-south-1 failure-domain.beta.kubernetes.io/zone: cn-south-2b spec: accessModes: [ "ReadWriteOnce" ] storageClassName: csi-disk resources: requests: storage: 10Gi
主题Topic
- 查看帮助(容器中kafka的脚本目录为:/opt/bitnami/kafka/bin)
sh kafka-topics.sh --help
- 获取所有的主题
sh kafka-topics.sh --bootstrap-server localhost:9092 --list
- 创建一个Topic
–partitions(分区数量)
–topic(主题名)
–replication-factor(副本数量,不能大于broker的数量)
sh kafka-topics.sh --create --topic myTopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
- 查询 Topic 的详细信息
sh kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic myTopic
- 删除 Topic (Topic 中所有的消息数据都将被永久删除,且无法恢复)
sh kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic myTopic
- 增加主题分区数量 (如果要减少分区的数量,只能删除Topic,然后重新创建)
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --alter --partitions 3
- 修改数据过期时间 (kafka默认的只保存7天的数据,retention.ms=-1表示不过期)
sh kafka-topics.sh --bootstrap-server localhost:9092 -topic myTopic --alter --config retention.ms=259200000
- 修改多字段
sh kafka-topics.sh --bootstrap-server localhost:9092 -topic myTopic --alter --config retention.ms=259200000 max.message.bytes=128000
- 修改 Topic 副本数
- 编写分配脚本get-reassign-tpl.json
echo '{"topics":[{"topic":"myTopic"}],"version": 1}' > get-reassign-tpl.json
- 执行分配计划,用于生成json格式的文件
sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file get-reassign-tpl.json --broker-list "0,1,2" --generate
- 复制上一步返回结果中的json,修改副本字段replicas,填写broker.id,生成reassign.json文件
echo '{"version":1,"partitions":[{"topic":"myTopic","partition":0,"replicas":[1,2]}]}' > reassign.json
- 利用上一步生成的reassign.json,进行topic的重新分配
sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --execute
- 查看分配的进度
sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --verify
- 分配完成,再次查询详情
sh kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic myTopic
- 编写分配脚本get-reassign-tpl.json
测试发送消息和接收消息
- 开启一个 Producer(生产者)窗口,然后生产几条信息
sh kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic >hello >world
- 创建一个 Consumer(消费者)窗口:
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --consumer-property group.id=myGroup --from-beginning
–from-beginning 如果消费者尚未建立消费偏移量(offset),那么就从Topic的第一条消息开始消费
–consumer-property group.id=myGroup 消费者的group.id,不设置会自动生成
如果存在group.id相同的多个消费者窗口,只会有其中一个消费者收到消息
- 列出所有主题中的所有用户组
sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
- 查询消费者组详情(数据积压情况)
sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
LogEndOffset:下一条将要被加入到日志的消息的位移
CurrentOffset:当前消费的位移
LAG:消息堆积量:消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量也称之为消费滞后量
- 更多操作,可以参考官方文档
Open-Source Web UI for Apache Kafka
KRaft模式的kafka没有zookeeper,图形客户端工具offsetexplorer无法连接,找到一套开源的 Web UI,Docker、Helm下的安装方式可参考官方文档。
使用kubectl apply命令安装provectuslabs/kafka-ui:
- kubectl apply -f k8s.kafka-ui.yaml
- k8s.kafka-ui.yaml示例如下:
apiVersion: v1 kind: Service metadata: name: kafka-ui labels: app: kafka-ui spec: type: NodePort ports: - name: web port: 8080 targetPort: web nodePort: 0 selector: app: kafka-ui --- apiVersion: apps/v1 kind: Deployment metadata: name: kafka-ui labels: app: kafka-ui spec: selector: matchLabels: app: kafka-ui replicas: 1 template: metadata: labels: app: kafka-ui spec: containers: - name: kafka-ui image: provectuslabs/kafka-ui:latest # imagePullPolicy: "IfNotPresent" env: - name: KAFKA_CLUSTERS_0_NAME value: "kafka-c0" - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS value: "kafka-0.kafka-headless:9092" - name: DYNAMIC_CONFIG_ENABLED value: "true" - name: AUTH_TYPE # https://docs.kafka-ui.provectus.io/configuration/authentication/basic-authentication value: "LOGIN_FORM" - name: SPRING_SECURITY_USER_NAME value: "name_admin" - name: SPRING_SECURITY_USER_PASSWORD value: "password_123456" ports: - name: web containerPort: 8080
- 集群方式可参考链接
猜你喜欢
- 9小时前【Apache-StreamPark】Flink 开发利器 StreamPark 的介绍、安装、使用
- 9小时前hadoop报错ERROR: Cannot set priority of namenode process
- 9小时前【flink番外篇】13、Broadcast State 模式示例(完整版)
- 9小时前Prpmetheus监控rabbitmq
- 9小时前Linux环境如何彻底卸载感干净RabbitMQ
- 8小时前嵩山少林寺在哪(嵩山少林寺地址在哪里)
- 6小时前全聚德烤鸭(全聚德烤鸭店王府井店)
- 4小时前水泥和沙子的比例是多少(耐火水泥和沙子的比例是多少)
- 2小时前戒指尺寸表(测量手指戒指尺寸表)
- 32分钟前桑蚕丝(桑蚕丝会缩水吗 用不用买宽松点的呢)
网友评论
- 搜索
- 最新文章
- 热门文章