#kafka部分
@KafkaListener(topics = "#{'${vsmart_alert_detection_tms_send_message_topic}'.split(',')}", groupId = "${vsmart.alert.detection.consumer.group}")
public void vsmartAlertDetectionTmsSendMessage(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
doSendMessage(record,ack);
}
private void doSendMessage(ConsumerRecord, ?> record, Acknowledgment ack) {
Optional message = Optional.ofNullable(record.value());
String key = record.topic() + "-" + record.partition() + "-offset:" + record.offset();
if (RedisUtils.isExistsKey(key)) {
ack.acknowledge();
return;
}
try {
if (message.isPresent() && (record.timestamp() > (System.currentTimeMillis() - kafkaConsumerDelayTime))) {
JSONObject msg = JSONObject.parseObject(record.value().toString());
msg.put(VSMART_KAFKA_MSG_POSITION_INFO, key);
//具体操作
}
}catch (Exception e){
}finally {
ack.acknowledge();
}
}
#redis部分
public Boolean handler(JSONObject msg) {
//解析
Boolean isOk = jsonToDetectionInfos(msg);
if (!isOk) {
return false;
}
//加锁 associatedKey()
String lockKey = associatedKey();
if (StrUtil.isEmpty(lockKey)) {
return false;
}
RLock lock = SpringUtils.getBean(RedissonClient.class).getLock(lockKey);
//锁的时间 根据业务需要进行调整
try {
boolean flag_2 = lock.tryLock(10, 300, TimeUnit.SECONDS);
if (flag_2) {
//加锁后执行前判断是否已经处理过kafka中相同位置的信息了
if (ObjectUtil.isNotNull(msg) &&
ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO)) &&
RedisUtils.isExistsKey(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO))) {
return false;
}
//具体业务操作
//...
return true;
} else {
detectionRuleBo.getLogText().append(StrUtil.format("{}-获取锁失败;", detectionRuleBo.getName())).append("
");
return false;
}
} catch (Exception e) {
} finally {
///释放锁
if (null != lock && lock.isHeldByCurrentThread()) {
if (ObjectUtil.isNotNull(msg) &&
ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO))) {
RedisUtils.setCacheStrExpire(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), 60 * 60);
}
//解锁
lock.unlock();
}
return true;
}
}
猜你喜欢
- 4小时前【深度学习目标检测】十六、基于深度学习的麦穗头系统-含GUI和源码(python,yolov8)
- 4小时前Session详解,学习 Session对象一篇文章就够了
- 4小时前VUE登录注册页面,完整vue,直接复制
- 4小时前阿里云OSS存储图片在上传的时候设置过期时间
- 4小时前网络安全(黑客技术)—2024自学
- 4小时前数据湖架构Hudi(二)Hudi版本0.12源码编译、Hudi集成spark、使用IDEA与spark对hudi表增删改查
- 4小时前分布式消息队列:Kafka vs RabbitMQ vs ActiveMQ
- 3小时前汽车座椅空调(汽车座椅空调出风口可以封掉吗)
- 2小时前手机掉厕所怎么办(手机掉蹲厕里了应该怎么处理)
- 1小时前tnf羽绒服(tnf羽绒服充绒量多少克)
网友评论
- 搜索
- 最新文章
- 热门文章