上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

SpringBoot 集成 Kafka (SSL证书)

guduadmin17小时前

目录

1.SSL证书

2.全局配置

3.Producer配置

4.Consumer配置

 5.运行异常汇总


1.SSL证书

使用特定的SSL证书才可以进行访问,可用于多个项目间中转或项目中使用到的Kafka。

client.keystore.jks、client.truststore.jks

2.全局配置

demo:
    kafka:
        address: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
        password: xxxxxxxx
        group-id: xxxx

3.Producer配置

 KafkaTemplate配置

@Configuration
@EnableKafka
@Slf4j
public class ProducerConfig {
    @Value("${demo.kafka.address}")
    private String addressStr;
    @Value("${demo.kafka.password}")
    private String password;
    //注入kafkaTemplate
    @Bean
    KafkaTemplate myKafkaTemplate() throws FileNotFoundException {
        DefaultKafkaProducerFactory producerFactory =
           new DefaultKafkaProducerFactory<>(producerProperties(addressStr));
        return new KafkaTemplate<>(producerFactory);
    }
    //kafka的配置
    private Map producerProperties(String addressStr) throws FileNotFoundException {
        // kafka的相关参数 比如ip地址和分组这些参数
        Map properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr);
             properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //ssl加密和认证配置
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");
        properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");
        //获取Resources配置文件中client.keystore.jks
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath());
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
        //设置为空字符串来禁用服务器主机名验证
        properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,"");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
        //获取Resources配置文件中client.truststore.jks
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath());
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);
        return properties;
    }
}

发送至对应Kafka服务器

@Service
public class ProductService {
    
    //配置的KafkaTemplate
    @Autowired
    private KafkaTemplate kafkaTemplate;
    //发送至对于Kafka
    public void sendXxxxx() throws Exception {
        ListenableFuture> demo_topic = kafkaTemplate.send("demo_topic", "message");
        demo_topic.addCallback(success -> {
            log.info("推送成功!!!");
        },error -> {
            log.info("推送失败!!!");
        });
    }
}

4.Consumer配置

KafkaListenerContainerFactory配置

@Configuration
@EnableKafka
@Slf4j
public class ProducerConfig {
    @Value("${demo.kafka.address}")
    private String addressStr;
    @Value("${demo.kafka.password}")
    private String password;
    @Value("${demo.kafka.group-id}")
    private String groupId;
    @Bean
    KafkaListenerContainerFactory> kafkaFactory() throws FileNotFoundException {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        // 创建消费的kafka工厂对象
        ConsumerFactory consumerFactory = new
                DefaultKafkaConsumerFactory<>(consumerProperties(dataCenterKafkaIps,groupId));
        factory.setConsumerFactory(consumerFactory);
        //手动提交
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
    // kafka的配置
    private Map consumerProperties(String addressStr,String groupId) throws FileNotFoundException {
        // kafka的相关参数 比如ip地址和分组这些参数
        Map properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, addressStr);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //ssl加密和认证配置
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        properties.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,"JKS");
        properties.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,"JKS");
        //获取Resources配置文件中client.keystore.jks
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.keystore.jks").getPath());
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
        //设置为空字符串来禁用服务器主机名验证
        properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG ,"");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, password);
        //获取Resources配置文件中client.truststore.jks
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ResourceUtils.getFile("classpath:client.truststore.jks").getPath());
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, password);
        return properties;
    }
}

监听Topic

@Component
@Slf4j
public class DemoListener {
    @KafkaListener(topics = "demo_topic",containerFactory = "kafkaFactory")
    public void demo(ConsumerRecord record, Acknowledgment ack) throws Exception {
        try{
        
        log.info(record.value());
        
        //业务逻辑
        ack.acknowledge();
        }catch(Exception e){
        
        throw new Exception("程序异常");
        }
    }
}

 5.运行异常汇总

Error connecting to node host:9092 (id: 1003 rack: null)

需在服务器/etc/hosts中配置【ip:host】对应关系

使用docker、docker-compose等部署须在容器内部配置

其他问题还有待发现,不太记得了😂 

网友评论

搜索
最新文章
热门文章
热门标签