方式1
只用spring-kafka依赖就行
org.springframework.kafka spring-kafka2.2.0.RELEASE
注入KafkaTemplate模板
@Configuration @EnableKafka public class KafkaConfig { private final static String CONSUMER_GROUP_ID="yd-group"; public final static String TOPIC_NAME="yd-kf-topic"; @Bean public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } /** * 消费工厂 * @return */ @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap<>(8); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } /** * 生产工厂 * @return */ @Bean public ProducerFactory producerFactory() { Map props = new HashMap<>(8); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } /** * kafka模板 * @return */ @Bean("kafkaTemplate") public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
消息发送
@Slf4j @Service public class KafkaProducer { @Autowired private KafkaTemplatekafkaTemplate; public String sendSyncMessage(String key,String msg){ String s; try { ListenableFuture > tagA = kafkaTemplate.send(KafkaConfig.TOPIC_NAME, key, msg); s = tagA.get().toString(); log.info("生产kafka消息 {}",s); return s; } catch (InterruptedException|ExecutionException e) { e.printStackTrace(); s=e.getMessage(); log.error("sendSyncMessage-->发送消息异常{}",e.getMessage()); } return s; } }
监听消息消费
@Slf4j @Component public class CustomKafkaListener /**implements MessageListener*/ { @KafkaListener(topics = {KafkaConfig.TOPIC_NAME},id = KafkaConfig.TOPIC_NAME) public void onMessage1(String msg){ log.info("onMessage1消费kafka消息 {} ",msg); } }
测试发送
@RestController public class KafkaSendController { @Autowired private KafkaProducer kafkaProducer; @GetMapping("/kafka/sendMsg") public String sendMsg(String key,String msg){ return kafkaProducer.sendSyncMessage(key,msg); } }
方式2:
spring-kafka和kafka-clients结合使用(推荐)
org.springframework.kafka spring-kafka2.2.0.RELEASE org.apache.kafka kafka-clients2.6.0
消费者组件
import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.ClassUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Map; /** * Consumer config of SSL Kafka */ @Configuration public class TestKafkaConsumerConfig { private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumerConfig.class); //读取kafka相关配置 @Value("#{configproperties_leconf['outer.kafka.bootstrap.servers']}") private String bootStrapServers; @Value("#{configproperties_leconf['outer.kafka.bootstrap.bootStrapServersNoJks']}") private String bootStrapServersNoJks; @Value("#{configproperties_leconf['outer.kafka.enable.auto.commit']}") private String enableAutoCommit; @Value("#{configproperties_leconf['outer.kafka.auto.commit.interval']}") private String autoCommitInterval; @Value("#{configproperties_leconf['outer.kafka.session.timeout']}") private String sessionTimeout; @Value("#{configproperties_leconf['outer.kafka.auto.offset.reset']}") private String autoOffsetReset; @Value("#{configproperties_leconf['outer.kafka.sasl.mechanism']}") private String saslMechanism; @Value("#{configproperties_leconf['outer.kafka.sasl.jaas.config']}") private String saslJaasConfig; @Value("#{configproperties_leconf['outer.kafka.ssl.password']}") private String sslPassword; @Value("#{configproperties_leconf['outer.kafka.sasl.truststore']}") private String kafkaJks; @Value("#{configproperties_leconf['outer.kafka.kafkaUrlType']}") private Integer kafkaUrlType; @Value("#{configproperties_leconf['outer.kafka.consumer.service.product.consumer.group']}") private String groupId; @Value("#{configproperties_leconf['outer.kafka.consumer.service.product.topic']}") private String serviceProductTopic; //消费ssl kafka 消息给price kafka ,消费组sslkafkaToPriceKafkaGroup @Value("#{configproperties_leconf['sslkafkaToPriceKafka.group']}") private String sslkafkaToPriceKafkaGroup; //ssl kafka topic和price kafka topic映射关系 @Value("#{configproperties_leconf['sslkafka2PriceKafkaTopics']}") private String sslkafka2PriceKafkaTopics; //这里构造配置,根据自己的kafka是否使用ssl加密&是否使用jks,没有的去掉if-else即可 public MapkafkaConfigs() { //使用不带有jks的kafka配置 Map configMap = new HashMap<>(); if(kafkaUrlType.equals(1)){ configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServersNoJks); configMap.put("security.protocol", "SASL_PLAINTEXT"); }else { //使用jks配置的kafka configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); configMap.put("security.protocol", "SASL_SSL"); configMap.put("ssl.endpoint.identification.algorithm", ""); //jks文件是放在项目resource下 configMap.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + kafkaJks); configMap.put("ssl.truststore.password", sslPassword); } configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); configMap.put("sasl.mechanism", saslMechanism); configMap.put("sasl.jaas.config", saslJaasConfig); configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return configMap; } @Bean public KafkaConsumer testKafkaConsumer() { KafkaConsumer consumer = new KafkaConsumer<>(kafkaConfigs()); consumer.subscribe(Arrays.asList(serviceProductTopic)); return consumer; } }
生产者组件
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.util.ClassUtils; import java.util.HashMap; import java.util.Map; /** * outer kafka config */ @Configuration public class TestKafkaProducerConfig { @Value("#{configproperties_leconf['outer.kafka.bootstrap.servers']}") private String bootStrapServers; @Value("#{configproperties_leconf['outer.kafka.bootstrap.bootStrapServersNoJks']}") private String bootStrapServersNoJks; @Value("#{configproperties_leconf['outer.kafka.request.timeout']}") private String requestTimeout; @Value("${searchengine.MQ.kafka.key.serializer:org.apache.kafka.common.serialization.StringSerializer}") private String keySerializer; @Value("${searchengine.MQ.kafka.value.serializer:org.apache.kafka.common.serialization.StringSerializer}") private String valueSerializer; @Value("#{configproperties_leconf['outer.kafka.kafkaUrlType']}") private Integer kafkaUrlType; @Value("#{configproperties_leconf['outer.kafka.sasl.mechanism']}") private String saslMechanism; @Value("#{configproperties_leconf['outer.kafka.sasl.jaas.config']}") private String saslJaasConfig; @Value("#{configproperties_leconf['outer.kafka.ssl.password']}") private String sslPassword; @Value("#{configproperties_leconf['outer.kafka.sasl.truststore']}") private String kafkaJks; @Bean public ProducerFactorysslProducerFactory() { Map properties = new HashMap<>(); if(kafkaUrlType.equals(1)){ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServersNoJks); //使用不带有jks的kafka配置 properties.put("security.protocol", "SASL_PLAINTEXT"); }else { //使用jks配置的kafka properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("ssl.endpoint.identification.algorithm", ""); properties.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + kafkaJks); properties.put("ssl.truststore.password", sslPassword); } properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout); properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); properties.put(ProducerConfig.ACKS_CONFIG, "1"); //生产者重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 3); //指定ProducerBatch(消息累加器中BufferPool中的)可复用大小 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); //消息缓存 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //数据压缩 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); //设置每条消息的最大值10M properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); properties.put("sasl.mechanism", saslMechanism); properties.put("sasl.jaas.config", saslJaasConfig); return new DefaultKafkaProducerFactory<>(properties); } @Bean public KafkaTemplate testKafkaTemplate(ProducerFactory sslProducerFactory) { return new KafkaTemplate<>(sslProducerFactory); } }
生产消息和消费消息
@Service public class KafkaServiceImpl implements KafkaService, ApplicationListener{ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class); @Autowired @Qualifier("sslKafkaTemplate") private KafkaTemplate sslKafkaTemplate; @Autowired @Qualifier("testKafkaConsumer") private KafkaConsumer testKafkaConsumer; /** * 执行 原生 kafka consumer api 监听topic,并存储数据到数据库和redis。 * Handle an application event. */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { new Thread(()->{ //拉去消息 while (true) { ConsumerRecords records = null; try { records = testKafkaConsumer.poll(5000); testKafkaConsumer.commitSync(); } catch (ConcurrentModificationException e) { //e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); LOGGER.error("消费ssl kafka 消息失败。"); LOGGER.error(e.getMessage()); } if (Objects.isNull(records)) { continue; } final ConsumerRecords consumerRecords = records; //数据太多 这里用线程池处理 threadPoolTaskExecutor.submit(() -> { for (ConsumerRecord record : consumerRecords) { if (Objects.isNull(record)) { continue; } try { LOGGER.info("records.count={},record.length={} partition= {}, offset = {}", consumerRecords.count(), record.value().length(), record.partition(), record.offset()); //拿到消息 这里处理业务 System.out.println(record.value()); //这里模拟业务场景1,直接将消息消息发到另一个topic doSend(sslKafkaTemplate,"test_topic", null,record.value()); //这里模拟业务场景2,用于设置自定义的消息头消息 List headers = new ArrayList (); if(record.headers() != null){ record.headers().forEach(header -> { if(header.key().equals("region")){ headers.add(new RecordHeader("region",header.value())); } }); } doSend(sslKafkaTemplate, "test_topic2", record.value(),headers); } catch (Exception e) { LOGGER.error("Kafka error: ", e); } } }); } },"ssl Kafka ").start(); } private ListenableFuture > doSend(KafkaTemplate kafkaTemplate, String topic, String key, String message) { try { ListenableFuture > future = kafkaTemplate.send(topic, key, message); future.get(1000 * 15, TimeUnit.MILLISECONDS); //TODO 异常处理 future.addCallback(result -> LOGGER.info("send success,topic:" + topic), throwable -> LOGGER.info("send error:\n" + throwable)); } catch (Exception e) { LOGGER.error(ExceptionUtil.getExceptionStackTrace(e)); LOGGER.error("+++++++++++++++++++++++: ", e); LOGGER.error("not_send_topic:={}", topic); LOGGER.error("not_send_message:={}", message); } return null; } private ListenableFuture > doSend(KafkaTemplate kafkaTemplate, String topic, String message,List headers){ try { ProducerRecord record = new ProducerRecord (topic,null, null,message,headers); ListenableFuture > future = kafkaTemplate.send(record); //future.addCallback(result -> LOGGER.info("send success,ssl topic:" + topic), throwable -> LOGGER.info("send ssl error:\n" + throwable)); }catch (Exception e){ LOGGER.error(ExceptionUtil.getExceptionStackTrace(e)); LOGGER.error("ssl exception+++++++++++++++++++++++: ", e); LOGGER.error("not_send_ssl_topic:={}", topic); LOGGER.error("not_send_ssl_message:={}", message); } return null; } }
注:
这里记录一下生产发生的问题
关于max.poll.interval.ms配置的问题,根据自己的业务配置poll拉去间隔等待时间
kafka一次性拉多条数据,然后循环用线程池处理的,线程池数量有限,我们配置的线程池拒绝策略是CallerRunsPolicy,当线程池满的时候就会用当前线程处理请求,导致下次poll方法无法立即执行,当超过最大时间max.poll.interval.ms时候,服务端会认为当前消费者已经无效,就会踢掉消费者,导致后续不再消费了
关于kafka配置内容参考文章
https://blog.csdn.net/qq_34491508/article/details/126029810
springboot整合参考地址
https://blog.csdn.net/qq_34491508/article/details/120831627
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章