上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Kafka+redis分布式锁结合使用心得总结

guduadmin44小时前

#kafka部分

@KafkaListener(topics = "#{'${vsmart_alert_detection_tms_send_message_topic}'.split(',')}", groupId = "${vsmart.alert.detection.consumer.group}")

public void vsmartAlertDetectionTmsSendMessage(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    doSendMessage(record,ack);

}

private void doSendMessage(ConsumerRecord record, Acknowledgment ack) {

    Optional message = Optional.ofNullable(record.value());

    String key = record.topic() + "-" + record.partition() + "-offset:" + record.offset();

    if (RedisUtils.isExistsKey(key)) {

        ack.acknowledge();

        return;

    }

    try {

        if (message.isPresent() && (record.timestamp() > (System.currentTimeMillis() - kafkaConsumerDelayTime))) {

            JSONObject msg = JSONObject.parseObject(record.value().toString());

            msg.put(VSMART_KAFKA_MSG_POSITION_INFO, key);

            //具体操作

        }

    }catch (Exception e){

        

    }finally {

        ack.acknowledge();

    }

}    

#redis部分 

public Boolean handler(JSONObject msg) {

    //解析

    Boolean isOk = jsonToDetectionInfos(msg);

    if (!isOk) {

        return false;

    }

    //加锁 associatedKey()

    

    String lockKey = associatedKey();

    if (StrUtil.isEmpty(lockKey)) {

        return false;

    }

    RLock lock = SpringUtils.getBean(RedissonClient.class).getLock(lockKey);

    //锁的时间 根据业务需要进行调整

    try {

        boolean flag_2 = lock.tryLock(10, 300, TimeUnit.SECONDS);

        if (flag_2) {        

            //加锁后执行前判断是否已经处理过kafka中相同位置的信息了

            if (ObjectUtil.isNotNull(msg) &&

                    ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO)) &&

                    RedisUtils.isExistsKey(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO))) {            

                return false;

            }

            //具体业务操作

            //...

                

            return true;

        } else {

            detectionRuleBo.getLogText().append(StrUtil.format("{}-获取锁失败;", detectionRuleBo.getName())).append("
");

            return false;

        }

    } catch (Exception e) {

        

    } finally {

        ///释放锁

        if (null != lock && lock.isHeldByCurrentThread()) {        

            if (ObjectUtil.isNotNull(msg) &&

                    ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO))) {

                RedisUtils.setCacheStrExpire(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), 60 * 60);

            }

            //解锁

            lock.unlock();

        }

        return true;

    }

}

网友评论

搜索
最新文章
热门文章
热门标签