我们在开发过程中使用Kafka会遇到topic太多,自己创建太费劲,所以想一次配置终身使用,自己去创建topic,和flyway一样自己去创建表,今天总结一下配置方式。
1.application.yml配置
kafka: # 自动创建topic topics: # topic - name: import_vulnera_topic #分区数 num-partitions: 6 #副本数 replication-factor: 2 # topic - name: import_vulnerability_topic num-partitions: 6 replication-factor: 2 # topic - name: import_vulnerability_result_topic num-partitions: 6 replication-factor: 2
2.配置Configuration
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.support.GenericWebApplicationContext; import javax.annotation.PostConstruct; import java.util.List; /** * 自动创建topic */ @Configuration @SuppressWarnings("all") public class TopicAdministrator { private final TopicConfigurations configurations; private final GenericWebApplicationContext context; public TopicAdministrator(TopicConfigurations configurations, GenericWebApplicationContext genericContext) { this.configurations = configurations; this.context = genericContext; } @PostConstruct public void init() { initializeBeans(configurations.getTopics()); } private void initializeBeans(Listtopics) { topics.forEach(t -> context.registerBean(t.name, NewTopic.class, t::toNewTopic)); } }
import lombok.Data; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.List; @Configuration @ConfigurationProperties(prefix = "kafka") @Data public class TopicConfigurations { private Listtopics; @Data static class Topic { String name; Integer numPartitions = 3; Short replicationFactor = 1; NewTopic toNewTopic() { return new NewTopic(this.name, this.numPartitions, this.replicationFactor); } } }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章