上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Kafka-Topic&Partition

guduadmin18小时前

Kafka主题与分区

主题与分区

topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等

主题的管理

主题的管理

  • 创建主题

  • 查看主题信息

  • 修改主题

  • 删除主题

    上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成,也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下 Kafka-Topic&Partition,image,第1张

    创建主题

    创建主题的命令格式如下:

    kafka-topics.sh --bootstrap-server  \
        --create --topic  \
        --partitions  \
        --replication-factor 
    

    创建一个分区数为4、副本因子为2的主题

    kafka-topics.sh --bootstrap-server localhost:9092 \
        --create --topic topic-create \
        --partitions 4 \
        --replication-factor 2
    

    创建一个分区数为4、副本因子为2的主题,并且指定主题的配置信息

    kafka-topics.sh --bootstrap-server localhost:9092 \
        --create --topic topic-create \
        --partitions 4 \
        --replication-factor 2 \
        --config max.message.bytes=128000
    

    通过describe指令来查看分区副本的分配细节

    kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create
    

    使用replica-assignment参数手动指定分区副本的分配方案

    使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列

    例如:0:1:2,0:1:2,0:1:2,0:1:2

    • 分区与分区之间用逗号分隔

    • 分区与副本之间用冒号分隔

      kafka-topics.sh --bootstrap-server localhost:9092 \
          --create --topic topic-create-same \
          --replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2
      

      注意:

      • 同一个分区内的副本不能有重复,比如0:0,1:1这样,就会报出AdminCommandFailedException异常

      • 分区之间所指定的副本数不同,比如0:0,1:1这样,就会报出AdminOperationException异常

        主题命名规范

        • 主题名称只能包含ASCII字母、数字、点、减号和下划线

        • 主题名称长度不能超过249个字符

        • 主题名称不能以点开头

        • 不能以__开头,这是Kafka内部使用的主题前缀

        • 不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符

        • 主题名称应该全部小写,因为Kafka在区分主题名称时是不区分大小写的

        • 主题名称不能与Kafka保留的名称冲突,比如__consumer_offsets

        • 主题名称不能与已经存在的消费者组名称冲突

        • 主题名称不能与已经存在的主题名称冲突

          查看主题信息

          通过list指令来查看当前Kafka集群中所有可用的主题

          kafka-topics.sh --bootstrap-server localhost:9092 --list
          

          Kafka-Topic&Partition,image,第2张

          通过describe指令来查看主题的详细信息

          kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create
          

          Kafka-Topic&Partition,image,第3张

          修改主题

          当主题被创建之后,依然允许我们对其做一定的修改,比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息

          # 修改主题的最大消息字节数,配置值从10000修改为20000
          kafka-topics.sh --bootstrap-server localhost:9092 \
              --alter --topic topic-config \
              --config max.message.bytes=20000
          

          通过alter指令来修改主题的分区数

          kafka-topics.sh --bootstrap-server localhost:9092 \
              --alter --topic topic-create \
              --partitions 6
          

          删除主题

          通过delete指令来删除主题

          kafka-topics.sh --bootstrap-server localhost:9092 \
              --delete --topic topic-delete
          

          通过delete-config参数来删除之前设置的配置信息

          kafka-topics.sh --bootstrap-server localhost:9092 \
              --alter --topic topic-config \
              --delete-config max.message.bytes
          

          手动删除主题

          • 主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下

          • 主题中的消息数据存储在log.dir或log.dirs配置的路径下,只需要手动删除这些地方的数据即可。

            配置管理

            kafka-configs.sh脚本用于管理Kafka的配置信息,该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令

            # 变更主题的配置信息
            kafka-configs.sh --bootstrap-server localhost:9092 \
                --alter --entity-type topics --entity-name topic-config \
                --add-config max.message.bytes=128000
            # 添加主题的配置信息
            kafka-configs.sh --bootstrap-server localhost:9092 \
                --alter --entity-type topics --entity-name topic-config \
                --add-config max.message.bytes=128000
            # 查看主题的配置信息
            kafka-configs.sh --bootstrap-server localhost:9092 \
                --describe --entity-type topics --entity-name topic-config    
            

            KafkaAdminClient

            KafkaAdminClient是Kafka提供的一个管理客户端,用于管理Kafka集群中的资源,比如主题、分区、消费者组等。

            TopicCommand基本使用

            使用KafkaAdminClient来完成TopicCommand的基本操作

            查看主题信息

            public class demo{
                public static void describeTopic(){
                    String[ ] options = new String[ ]{
                            "--bootstrap-server localhost:9092",
                            "--describe",
                            "--topic", "topic-create"
                    };
                    kafka.admin.TopicCommand.main(options);
                }
            }
            

            创建主题

            public class demo{
                public static void createTopic(){
                    String[ ] options = new String[ ]{
                            "--bootstrap-server localhost:9092",
                            "--create",
                            "--replication-factor", "1",
                            "--partitions", "1",
                            "--topic", "topic-create-api"
                    };
                    kafka.admin.TopicCommand.main(options);
                }
            }
            

            查看所有可用主题

            public class demo{
                public static void listTopic(){
                    String[ ] options = new String[ ]{
                            "--bootstrap-server localhost:9092",
                            "--list"
                    };
                    kafka.admin.TopicCommand.main(options);
                }
            }
            

            KafkaAdminClient基本使用

            KafkaAdminClient可以用来管理broker、配置和ACL(Access Control List),以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient,提供了一系列的API来管理Kafka集群中的资源。

            AdminClient常见的方法

            • createTopics:创建主题

              • CreateTopicsResult createTopics(Collection newTopics)
              • deleteTopics:删除主题

                • DeleteTopicsResult deleteTopics(Collection topics)
                • listTopics:列出所有可用的主题

                  • ListTopicsResult listTopics()
                  • describeTopics:查看主题的详细信息

                    • DescribeTopicsResult describeTopics(Collection topicNames)
                    • describeCluster:查看集群的详细信息

                      • DescribeClusterResult describeCluster()
                      • describeConfigs:查看配置的详细信息

                        • DescribeConfigsResult describeConfigs(Collection resources)
                        • alterConfigs:修改配置信息

                          • AlterConfigsResult alterConfigs(Map configs)
                          • describeConsumerGroups:查看消费者组的详细信息

                            • DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds)
                            • listConsumerGroups:列出所有可用的消费者组

                              • ListConsumerGroupsResult listConsumerGroups()
                              • createPartitions:创建分区

                                • CreatePartitionsResult createPartitions(Map newPartitions)
                                  使用KafkaAdminClient创建主题
                                  public class KafkaAdminClientCreateTopic {
                                      /**
                                       * 使用AdminClient创建Topic
                                       *
                                       * 创建完成之后使用如下脚本进行检查
                                       * 进入KAFKA_HOME/bin
                                       * 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list
                                       */
                                      public static void createTopic(){
                                          Properties props = new Properties();
                                          props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                          AdminClient adminClient = AdminClient.create(props);
                                          NewTopic newTopic = new NewTopic("topic-create-api", 1, (short) 1);
                                          // 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的
                                          CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
                                          try {
                                              result.all().get();
                                          } catch (InterruptedException e) {
                                              e.printStackTrace();
                                          } catch (ExecutionException e) {
                                              e.printStackTrace();
                                          }
                                          // 使用完之后需要关闭AdminClient,释放资源
                                          adminClient.close();
                                      }
                                      public static void main(String[ ] args) {
                                          createTopic();
                                      }
                                  }
                                  
                                  使用KafkaAdminClient查看主题信息
                                  public class KafkaAdminClientDescribeTopic {
                                      /**
                                       * 使用AdminClient查看Topic信息
                                       */
                                      public static void describeTopic(){
                                          Properties props = new Properties();
                                          props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                          AdminClient adminClient = AdminClient.create(props);
                                          DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic-create-api"));
                                          try {
                                              Map map = result.all().get();
                                              for (Map.Entry entry : map.entrySet()) {
                                                  System.out.println(entry.getKey() + " : " + entry.getValue());
                                              }
                                          } catch (InterruptedException e) {
                                              e.printStackTrace();
                                          } catch (ExecutionException e) {
                                              e.printStackTrace();
                                          }
                                          // 使用完之后需要关闭AdminClient,释放资源
                                          adminClient.close();
                                      }
                                      public static void main(String[ ] args) {
                                          describeTopic();
                                      }
                                  }
                                  
                                  使用KafkaAdminClient查看所有可用的主题
                                  public class KafkaAdminClientListTopic {
                                      /**
                                       * 使用AdminClient查看所有可用的Topic
                                       */
                                      public static void listTopic(){
                                          Properties props = new Properties();
                                          props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                          AdminClient adminClient = AdminClient.create(props);
                                          ListTopicsResult result = adminClient.listTopics();
                                          try {
                                              Set set = result.names().get();
                                              for (String s : set) {
                                                  System.out.println(s);
                                              }
                                          } catch (InterruptedException e) {
                                              e.printStackTrace();
                                          } catch (ExecutionException e) {
                                              e.printStackTrace();
                                          }
                                          // 使用完之后需要关闭AdminClient,释放资源
                                          adminClient.close();
                                      }
                                      public static void main(String[ ] args) {
                                          listTopic();
                                      }
                                  }
                                  
                                  使用KafkaAdminClient创建分区
                                  public class KafkaAdminClientCreatePartition {
                                      /**
                                       * 使用AdminClient创建分区
                                       */
                                      public static void createPartition(){
                                          Properties props = new Properties();
                                          props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                          AdminClient adminClient = AdminClient.create(props);
                                          Map map = new HashMap<>();
                                          NewPartitions newPartitions = NewPartitions.increaseTo(2);
                                          map.put("topic-create-api", newPartitions);
                                          CreatePartitionsResult result = adminClient.createPartitions(map);
                                          try {
                                              result.all().get();
                                          } catch (InterruptedException e) {
                                              e.printStackTrace();
                                          } catch (ExecutionException e) {
                                              e.printStackTrace();
                                          }
                                          // 使用完之后需要关闭AdminClient,释放资源
                                          adminClient.close();
                                      }
                                      public static void main(String[ ] args) {
                                          createPartition();
                                      }
                                  }
                                  
                                  使用KafkaAdminClient删除主题
                                  public class KafkaAdminClientDeleteTopic {
                                      /**
                                       * 使用AdminClient删除Topic
                                       */
                                      public static void deleteTopic(){
                                          Properties props = new Properties();
                                          props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                          AdminClient adminClient = AdminClient.create(props);
                                          DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic-create-api"));
                                          try {
                                              result.all().get();
                                          } catch (InterruptedException e) {
                                              e.printStackTrace();
                                          } catch (ExecutionException e) {
                                              e.printStackTrace();
                                          }
                                          // 使用完之后需要关闭AdminClient,释放资源
                                          adminClient.close();
                                      }
                                      public static void main(String[ ] args) {
                                          deleteTopic();
                                      }
                                  }
                                  
                                  使用KafkaAdminClient修改主题配置
                                  public class KafkaAdminClientAlterTopic {
                                      /**
                                       * 使用AdminClient修改Topic配置
                                       */
                                      public static void alterTopic(){
                                          Properties props = new Properties();
                                          props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                          AdminClient adminClient = AdminClient.create(props);
                                          ConfigEntry configEntry = new ConfigEntry("max.message.bytes", "128000");
                                          Config config = new Config(Arrays.asList(configEntry));
                                          Map map = new HashMap<>();
                                          ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");
                                          map.put(configResource, config);
                                          AlterConfigsResult result = adminClient.alterConfigs(map);
                                          try {
                                              result.all().get();
                                          } catch (InterruptedException e) {
                                              e.printStackTrace();
                                          } catch (ExecutionException e) {
                                              e.printStackTrace();
                                          }
                                          // 使用完之后需要关闭AdminClient,释放资源
                                          adminClient.close();
                                      }
                                      public static void main(String[ ] args) {
                                          alterTopic();
                                      }
                                  }
                                  
                                  使用KafkaAdminClient查看主题配置
                                  public class KafkaAdminClientDescribeTopicConfig {
                                      /**
                                       * 使用AdminClient查看Topic配置
                                       */
                                      public static void describeTopicConfig(){
                                          Properties props = new Properties();
                                          props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                                          AdminClient adminClient = AdminClient.create(props);
                                          ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");
                                          DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(configResource));
                                          try {
                                              Map map = result.all().get();
                                              for (Map.Entry entry : map.entrySet()) {
                                                  System.out.println(entry.getKey() + " : " + entry.getValue());
                                              }
                                          } catch (InterruptedException e) {
                                              e.printStackTrace();
                                          } catch (ExecutionException e) {
                                              e.printStackTrace();
                                          }
                                          // 使用完之后需要关闭AdminClient,释放资源
                                          adminClient.close();
                                      }
                                      public static void main(String[ ] args) {
                                          describeTopicConfig();
                                      }
                                  }
                                  

网友评论

搜索
最新文章
热门文章
热门标签