目录
1.SSL证书
2.全局配置
3.Producer配置
4.Consumer配置
5.运行异常汇总
1.SSL证书
使用特定的SSL证书才可以进行访问,可用于多个项目间中转或项目中使用到的Kafka。
client.keystore.jks、client.truststore.jks
2.全局配置
demo: kafka: address: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092 password: xxxxxxxx group-id: xxxx
3.Producer配置
KafkaTemplate配置
@Configuration @EnableKafka @Slf4j public class ProducerConfig { @Value("${demo.kafka.address}") private String addressStr; @Value("${demo.kafka.password}") private String password; //注入kafkaTemplate @Bean KafkaTemplatemyKafkaTemplate() throws FileNotFoundException { DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerProperties(addressStr)); return new KafkaTemplate<>(producerFactory); } //kafka的配置 private Map producerProperties(String addressStr) throws FileNotFoundException { // kafka的相关参数 比如ip地址和分组这些参数 Map properties = new HashMap<>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //ssl加密和认证配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS"); properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS"); //获取Resources配置文件中client.keystore.jks properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath()); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); //设置为空字符串来禁用服务器主机名验证 properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,""); properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password); //获取Resources配置文件中client.truststore.jks properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath()); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); return properties; } }
发送至对应Kafka服务器
@Service public class ProductService { //配置的KafkaTemplate @Autowired private KafkaTemplatekafkaTemplate; //发送至对于Kafka public void sendXxxxx() throws Exception { ListenableFuture > demo_topic = kafkaTemplate.send("demo_topic", "message"); demo_topic.addCallback(success -> { log.info("推送成功!!!"); },error -> { log.info("推送失败!!!"); }); } }
4.Consumer配置
KafkaListenerContainerFactory配置
@Configuration @EnableKafka @Slf4j public class ProducerConfig { @Value("${demo.kafka.address}") private String addressStr; @Value("${demo.kafka.password}") private String password; @Value("${demo.kafka.group-id}") private String groupId; @Bean KafkaListenerContainerFactory> kafkaFactory() throws FileNotFoundException { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // 创建消费的kafka工厂对象 ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties(dataCenterKafkaIps,groupId)); factory.setConsumerFactory(consumerFactory); //手动提交 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } // kafka的配置 private Map consumerProperties(String addressStr,String groupId) throws FileNotFoundException { // kafka的相关参数 比如ip地址和分组这些参数 Map properties = new HashMap<>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //ssl加密和认证配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS"); properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS"); //获取Resources配置文件中client.keystore.jks properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath()); properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); //设置为空字符串来禁用服务器主机名验证 properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,""); properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password); //获取Resources配置文件中client.truststore.jks properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath()); properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password); return properties; } }
监听Topic
@Component @Slf4j public class DemoListener { @KafkaListener(topics = "demo_topic",containerFactory = "kafkaFactory") public void demo(ConsumerRecordrecord, Acknowledgment ack) throws Exception { try{ log.info(record.value()); //业务逻辑 ack.acknowledge(); }catch(Exception e){ throw new Exception("程序异常"); } } }
5.运行异常汇总
Error connecting to node host:9092 (id: 1003 rack: null)
需在服务器/etc/hosts中配置【ip:host】对应关系
使用docker、docker-compose等部署须在容器内部配置
其他问题还有待发现,不太记得了😂
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章