目录
1、代码
2、结果
1、代码
package com.zsh.kafkatest.topic; import com.zsh.kafkatest.connect.KafkaConnection; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartitionInfo; import java.util.*; import java.util.concurrent.ExecutionException; /** * @Author ZhaoShuHao * @Date 2023/7/21 16:58 */ public class GetTopicAboutDatasource { public static void main(String[] args) { String kafka = "192.168.140.65:9092"; String[] kafkas = kafka.split(";"); for(int i=0;i> tVos = new ArrayList<>(); List list = new ArrayList<>(); AdminClient adminiClient = KafkaConnection.kafkaTestConnection(kafka); ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult topicsResult = adminiClient.listTopics(options); try { Set topicNames = topicsResult.names().get(); Iterator it = topicNames.iterator(); while (it.hasNext()){ Map map = new HashMap<>(); String topicName = it.next().toString(); Map topicInfo = getTopicInfo(kafka,topicName); map.put("tableName",topicName); map.put("issame","0"); map.put("Partitions", String.valueOf(topicInfo.get("Partitions"))); map.put("PartitionSize", String.valueOf(topicInfo.get("PartitionSize"))); map.put("ReplicationFactor", String.valueOf(topicInfo.get("ReplicationFactor"))); tVos.add(map); } } catch (Exception e) { System.out.println("获取topic失败"); }finally { KafkaConnection.close(adminiClient); } System.out.println("所有信息查询成功tVos:"+tVos); tVos.stream().forEach(maptopic -> { System.out.println("————————————————————————————————————"); System.out.println("topic主题名称:"+maptopic.get("tableName")); maptopic.get("issame"); maptopic.get("Partitions"); System.out.println("topic分区信息:"+maptopic.get("Partitions")); maptopic.get("PartitionSize"); System.out.println("topic分区数量:"+maptopic.get("PartitionSize")); maptopic.get("ReplicationFactor"); System.out.println("topic副本数量:"+maptopic.get("ReplicationFactor")); System.out.println("————————————————————————————————————"); }); } //获取topic的详细信息 public static Map getTopicInfo(String ipAndPort,String topic){ Map map = new HashMap<>(); Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ipAndPort); AdminClient adminClient = AdminClient.create(props); String topicName = topic; DescribeTopicsOptions describeTopicsOptions = new DescribeTopicsOptions().timeoutMs(5000); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(topicName), describeTopicsOptions); KafkaFuture topicDescriptionFuture = describeTopicsResult.values().get(topicName); try { TopicDescription topicDescription = topicDescriptionFuture.get(); List partitions = topicDescription.partitions(); int replicationFactor = partitions.get(0).replicas().size(); map.put("Partitions", partitions); map.put("PartitionSize", partitions.size()); map.put("ReplicationFactor", replicationFactor); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return map; } }
2、结果
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章