说明
本示例只配置了Consumer没有配置Producer,可参考配置文件_1中注释内容部分
1.引入依赖
org.springframework.kafka spring-kafka
2.yml配置
spring: kafka: one: #测试环境 bootstrap-servers: 127.0.0.1:9092 topic: default_topic properties: security: protocol: SASL_PLAINTEXT sasl: mechanism: SCRAM-SHA-512 jaas: config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password"; consumer: # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名 group-id: defaultName #关闭自动提交 enable-auto-commit: false #重置消费者的offset # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest auto-offset-reset: latest #key value 的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 5 two: #测试环境 bootstrap-servers: 127.0.0.1:9092 topic: default_topic_two consumer: # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名 group-id: defaultName_two #关闭自动提交 enable-auto-commit: false #重置消费者的offset # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest auto-offset-reset: latest #key value 的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 5
3.新建配置文件
3.1配置文件_1
@Configuration @EnableKafka public class K1kafkaConfiguration { @Value("${spring.kafka.one.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.one.consumer.group-id}") private String groupId; @Value("${spring.kafka.one.consumer.enable-auto-commit}") private String enableAutoCommit; @Value("${spring.kafka.one.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.one.consumer.max-poll-records}") private String maxPollRecords; @Value("${spring.kafka.one.properties.security.protocol}") private String securityprotocol; @Value("${spring.kafka.one.properties.sasl.mechanism}") private String mechanism; @Value("${spring.kafka.one.properties.sasl.jaas.config}") private String jaasconfig; //@Value("${spring.kafka.one.producer.linger-ms}") //private Integer lingerMs; //@Value("${spring.kafka.one.producer.max-request-size}") //private Integer maxRequestSize; //@Value("${spring.kafka.one.producer.batch-size}") //private Integer batchSize; //@Value("${spring.kafka.one.producer.buffer-memory}") //private Integer bufferMemory; //@Bean //public KafkaTemplatekafkaOneTemplate() { // return new KafkaTemplate<>(producerFactory()); //} @Bean @Primary //理解为默认优先选择当前容器下的消费者工厂 KafkaListenerContainerFactory > kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //并发数量 factory.setConcurrency(1); //开启批量监听 //factory.setBatchListener(type); // 被过滤的消息将被丢弃 // factory.setAckDiscarded(true); factory.getContainerProperties().setPollTimeout(3000); //设置手动提交ackMode factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties().setMissingTopicsFatal(false); // 设置记录筛选策略 //factory.setRecordFilterStrategy(new RecordFilterStrategy() { // @Override // public boolean filter(ConsumerRecord consumerRecord) { // String msg = consumerRecord.value().toString(); // if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){ // return false; // } // // 返回true消息将会被丢弃 // return true; // } //}); return factory; } //private ProducerFactory producerFactory() { // return new DefaultKafkaProducerFactory<>(producerConfigs()); //} @Bean//第一个消费者工厂的bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } //private Map producerConfigs() { // Map props = new HashMap<>(); // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs); // props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize); // props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize); // props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory); // props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // return props; //} @Bean public Map consumerConfigs() { Map props = new HashMap<>(); props.put("security.protocol",securityprotocol); props.put("sasl.mechanism",mechanism); props.put("sasl.jaas.config",jaasconfig); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
3.2配置文件_2
@Configuration @EnableKafka public class K2kafkaConfiguration { @Value("${spring.kafka.two.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.two.consumer.group-id}") private String groupId; @Value("${spring.kafka.two.consumer.enable-auto-commit}") private String enableAutoCommit; @Value("${spring.kafka.two.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.two.consumer.max-poll-records}") private String maxPollRecords; @Bean KafkaListenerContainerFactory> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //并发数量 factory.setConcurrency(1); //开启批量监听 //factory.setBatchListener(type); // 被过滤的消息将被丢弃 // factory.setAckDiscarded(true); factory.getContainerProperties().setPollTimeout(3000); //设置手动提交ackMode factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties().setMissingTopicsFatal(false); return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; }
4.设置消费
4.1 设置消费_1
@Component @Slf4j(topic = "KAFKALOG") public class Consumer { @KafkaListener(topics = "#{'${spring.kafka.one.topic}'}", groupId = "defaultName",containerFactory = "kafkaListenerContainerFactory") public void listenGroup(ConsumerRecordrecord, Acknowledgment ack) { log.info("[Consumer] 接收到kafka消息:{}",record.value()); System.out.println(record); System.out.println(record.value()); //手动提交offset //ack.acknowledge(); }
4.2 设置消费_2
@Component @Slf4j(topic = "KAFKALOG") public class Consumer2 { @KafkaListener(topics = "#{'${spring.kafka.two.topic}'}", groupId = "defaultName_two",containerFactory = "kafkaTwoContainerFactory") public void listenGroup(ConsumerRecordrecord, Acknowledgment ack) { log.info("[Consumer2 ] 接收到kafka消息:{}",record.value()); System.out.println(record); System.out.println(record.value()); //手动提交offset //ack.acknowledge(); }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章