kafka 配置类
用途:定义使用的基本 kafka 配置,以及定义Bean
下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作
import lombok.Data; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component @ConfigurationProperties(prefix = "my.kafka") @Data public class MyTaskKafkaProperties { /**r * kafka地址 */ private String serverUrl; /** * groupId */ private String groupId; /** * topic */ private String topic; private boolean enableAutoCommit; private String autoOffsetReset; @Bean KafkaListenerContainerFactory> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(6); factory.getContainerProperties().setPollTimeout(6000); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } private ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } }
@Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有
实际 kafka 监听消费
import com.dtdream.dthink.dtalent.dmall.openplat.service.opendata.OpenDataService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.Optional; @Slf4j @ConditionalOnProperty(name = "my.kafka.enable", havingValue = "true") @Component public class MyTaskConsumer { @Autowired private XxxxxService xxxxxService; @KafkaListener(topics = "${my.kafka.topic}", groupId = "${my.kafka.groupId}", containerFactory = "kafkaTwoContainerFactory") public void dxpTaskEnd(ConsumerRecordrecord, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { consume(record, ack, topic, msg -> xxxxxService.xxxxxxx(msg)); } private void consume(ConsumerRecord record, Acknowledgment ack, String topic, java.util.function.Consumer consumer) { Optional optional = Optional.ofNullable(record.value()); if (!optional.isPresent()) { log.warn("kafka收到消息 但为空,record:{}", record); return; } String msg = optional.get(); log.info("kafka收到消息 开始消费 topic:{},msg:{}", topic, msg); try { consumer.accept(msg); // 上面方法执行成功后手动提交 ack.acknowledge(); log.info("kafka收到消息消费成功 topic:{},msg:{}", topic, msg); } catch (Exception e) { log.error("kafka消费消息失败 topic:{},msg:{}", topic, msg, e); } } }
@ConditionalOnProperty spring boot 用于判断当前类是否加载的条件
XxxxxService: 为我们的业务服务层,用于消费消息
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章