上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Kafka事务

guduadmin11天前

Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

Kafka事务,第1张

事务操作API

Producer接口中定义了以下5个事务相关方法:

  • initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作
  • beginTransaction(开始事务):启动一个Kafka事务
  • sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交
  • commitTransaction(提交事务):提交事务
  • abortTransaction(放弃事务):取消事务

    事务相关属性配置 

    生产者

    // 配置事务的id,开启了事务会默认开启幂等性
    props.put("transactional.id", "first-transactional");

    消费者

    // 1. 消费者需要设置隔离级别
    props.put("isolation.level","read_committed");
    //  2. 关闭自动提交
     props.put("enable.auto.commit", "false");

    Kafka事务编程

    在Kafka的topic 「ods_user」中有一些用户数据,数据格式如下:

    姓名,性别,出生日期
    张三,1,1980-10-09
    李四,0,1985-11-01

    我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic 「dwd_user」中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

    启动生产者控制台程序模拟数据

    # 创建名为ods_user和dwd_user的主题
    bin/kafka-topics.sh --create --bootstrap-server 192.168.2.3:9092 --topic ods_user
    bin/kafka-topics.sh --create --bootstrap-server 192.168.2.3:9092 --topic dwd_user
    # 生产数据到 ods_user
    bin/kafka-console-producer.sh --broker-list 192.168.2.3:9092 --topic ods_user
    # 从dwd_user消费数据
    bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.3:9092 --topic dwd_user --from-beginning  --isolation-level read_committed

    编写创建消费者代码

    编写一个方法 createConsumer,该方法中返回一个消费者,订阅「ods_user」主题。注意:需要配置事务隔离级别、关闭自动提交。

    实现步骤:

    创建Kafka消费者配置

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "192.168.2.3:9092");
     props.setProperty("group.id", "ods_user");
     props.put("isolation.level","read_committed");
     props.setProperty("enable.auto.commit", "false");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    创建消费者,并订阅 ods_user 主题

      // 1. 创建消费者
        public static Consumer createConsumer() {
            // 1. 创建Kafka消费者配置
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "192.168.2.3:9092");
            props.setProperty("group.id", "ods_user");
            props.put("isolation.level","read_committed");
            props.setProperty("enable.auto.commit", "false");
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 2. 创建Kafka消费者
            KafkaConsumer consumer = new KafkaConsumer<>(props);
            // 3. 订阅要消费的主题
            consumer.subscribe(Arrays.asList("ods_user"));
            
            return consumer;
    }

    编写创建生产者代码

    编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

    创建生产者配置

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.2.3:9092");
    props.put("transactional.id", "dwd_user");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    创建生产者对象

    public static Producer createProduceer() {
            // 1. 创建生产者配置
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.2.3:9092");
            props.put("transactional.id", "dwd_user");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 2. 创建生产者
            Producer producer = new KafkaProducer<>(props);
            return producer;
        }

    编写代码消费并生产数据

    实现步骤:

    1. 调用之前实现的方法,创建消费者、生产者对象
    2. 生产者调用initTransactions初始化事务
    3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
      1. 生产者开启事务
      2. 消费者拉取消息
      3. 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)
      4. 生产消息到dwd_user topic中
      5. 提交偏移量到事务中
      6. 提交事务
      7. 捕获异常,如果出现异常,则取消事务
    public static void main(String[] args) {
            Consumer consumer = createConsumer();
            Producer producer = createProducer();
            // 初始化事务
            producer.initTransactions();
            while(true) {
                try {
                    // 1. 开启事务
                    producer.beginTransaction();
                    // 2. 定义Map结构,用于保存分区对应的offset
                    Map offsetCommits = new HashMap<>();
                    // 2. 拉取消息
                    ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));
                    for (ConsumerRecord record : records) {
                        // 3. 保存偏移量
                        offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset() + 1));
                        // 4. 进行转换处理
                        String[] fields = record.value().split(",");
                        fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
                        String message = fields[0] + "," + fields[1] + "," + fields[2];
                        // 5. 生产消息到dwd_user
                        producer.send(new ProducerRecord<>("dwd_user", message));
                    }
                    // 6. 提交偏移量到事务
                    producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
                    // 7. 提交事务
                    producer.commitTransaction();
                } catch (Exception e) {
                    // 8. 放弃事务
                    producer.abortTransaction();
                }
            }
        }

    测试

    往之前启动的console-producer中写入消息进行测试,同时检查console-consumer是否能够接收到消息:

    Kafka事务,第2张

    逐个测试一下消息:

    张三,1,1980-10-09
    李四,0,1985-11-01

    模拟异常测试事务

    // 3. 保存偏移量
    offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1));
    // 4. 进行转换处理
    String[] fields = record.value().split(",");
    fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
    String message = fields[0] + "," + fields[1] + "," + fields[2];
    // 模拟异常
    int i = 1/0;
    // 5. 生产消息到dwd_user
    producer.send(new ProducerRecord<>("dwd_user", message));

    启动程序一次,抛出异常。

    再启动程序一次,还是抛出异常。

    直到我们处理该异常为止。

    我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。

网友评论

搜索
最新文章
热门文章
热门标签