kafka 配置类
用途:定义使用的基本 kafka 配置,以及定义Bean
下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作
import lombok.Data; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component @ConfigurationProperties(prefix = "my.kafka") @Data public class MyTaskKafkaProperties { /**r * kafka地址 */ private String serverUrl; /** * groupId */ private String groupId; /** * topic */ private String topic; private boolean enableAutoCommit; private String autoOffsetReset; @Bean KafkaListenerContainerFactory> kafkaTwoContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(6); factory.getContainerProperties().setPollTimeout(6000); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } private ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } private Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } }
@Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有
实际 kafka 监听消费
import com.dtdream.dthink.dtalent.dmall.openplat.service.opendata.OpenDataService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.Optional; @Slf4j @ConditionalOnProperty(name = "my.kafka.enable", havingValue = "true") @Component public class MyTaskConsumer { @Autowired private XxxxxService xxxxxService; @KafkaListener(topics = "${my.kafka.topic}", groupId = "${my.kafka.groupId}", containerFactory = "kafkaTwoContainerFactory") public void dxpTaskEnd(ConsumerRecordrecord, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { consume(record, ack, topic, msg -> xxxxxService.xxxxxxx(msg)); } private void consume(ConsumerRecord record, Acknowledgment ack, String topic, java.util.function.Consumer consumer) { Optional optional = Optional.ofNullable(record.value()); if (!optional.isPresent()) { log.warn("kafka收到消息 但为空,record:{}", record); return; } String msg = optional.get(); log.info("kafka收到消息 开始消费 topic:{},msg:{}", topic, msg); try { consumer.accept(msg); // 上面方法执行成功后手动提交 ack.acknowledge(); log.info("kafka收到消息消费成功 topic:{},msg:{}", topic, msg); } catch (Exception e) { log.error("kafka消费消息失败 topic:{},msg:{}", topic, msg, e); } } }
@ConditionalOnProperty spring boot 用于判断当前类是否加载的条件
XxxxxService: 为我们的业务服务层,用于消费消息
猜你喜欢
- 3天前(安徽民航君澜大饭店装饰设计招标)集东方文化气息,品徽派隽美风韵----安徽民航君澜大饭店静待绽放
- 3天前(四川推进世界重要旅游目的地建设工作)四川推进世界重要旅游目的地建设
- 3天前(甘肃文旅项目)甘肃省文旅产业链招商引资推介会在天水成功举办
- 3天前(札幌小樽市)2024年暑期飞往北海道避暑吧!札幌小樽city walk路线推荐
- 3天前(纳米比亚旅游报价)纳米比亚旅游局2024年中国推介会圆满落幕
- 3天前(天津四季酒店开业时间)天津四季酒店邀你开启灿烂暑假
- 3天前(大黄山景区高质量发展联盟成立多少年)大黄山景区高质量发展联盟成立
- 3天前(内蒙古交通旅游图)内蒙古着力提升交通与旅游服务水平
- 3天前(福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情)福州“一县一桌菜”“两马乡宴”品鉴会圆满举办,马尾美食共叙血脉亲情
- 3天前(筑格集团有限公司)洲际酒店集团旗下筑格酒店品牌正式亮相大中华区
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章