#kafka部分
@KafkaListener(topics = "#{'${vsmart_alert_detection_tms_send_message_topic}'.split(',')}", groupId = "${vsmart.alert.detection.consumer.group}")
public void vsmartAlertDetectionTmsSendMessage(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
doSendMessage(record,ack);
}
private void doSendMessage(ConsumerRecord, ?> record, Acknowledgment ack) {
Optional message = Optional.ofNullable(record.value());
String key = record.topic() + "-" + record.partition() + "-offset:" + record.offset();
if (RedisUtils.isExistsKey(key)) {
ack.acknowledge();
return;
}
try {
if (message.isPresent() && (record.timestamp() > (System.currentTimeMillis() - kafkaConsumerDelayTime))) {
JSONObject msg = JSONObject.parseObject(record.value().toString());
msg.put(VSMART_KAFKA_MSG_POSITION_INFO, key);
//具体操作
}
}catch (Exception e){
}finally {
ack.acknowledge();
}
}
#redis部分
public Boolean handler(JSONObject msg) {
//解析
Boolean isOk = jsonToDetectionInfos(msg);
if (!isOk) {
return false;
}
//加锁 associatedKey()
String lockKey = associatedKey();
if (StrUtil.isEmpty(lockKey)) {
return false;
}
RLock lock = SpringUtils.getBean(RedissonClient.class).getLock(lockKey);
//锁的时间 根据业务需要进行调整
try {
boolean flag_2 = lock.tryLock(10, 300, TimeUnit.SECONDS);
if (flag_2) {
//加锁后执行前判断是否已经处理过kafka中相同位置的信息了
if (ObjectUtil.isNotNull(msg) &&
ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO)) &&
RedisUtils.isExistsKey(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO))) {
return false;
}
//具体业务操作
//...
return true;
} else {
detectionRuleBo.getLogText().append(StrUtil.format("{}-获取锁失败;", detectionRuleBo.getName())).append("
");
return false;
}
} catch (Exception e) {
} finally {
///释放锁
if (null != lock && lock.isHeldByCurrentThread()) {
if (ObjectUtil.isNotNull(msg) &&
ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO))) {
RedisUtils.setCacheStrExpire(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), 60 * 60);
}
//解锁
lock.unlock();
}
return true;
}
}
猜你喜欢
- 3天前(三亚海棠湾君悦度假酒店)三亚海棠湾君悦酒店暑期夏令营悦趣海岛游招募中
- 3天前(安徽民宿发展报告)首届安徽省乡村民宿创意设计大赛启动
- 3天前(哈弗h9优惠9万是真的吗)热浪来袭,哈弗H9超值补贴火热加码
- 3天前(安徽民航君澜大饭店装饰设计招标)集东方文化气息,品徽派隽美风韵----安徽民航君澜大饭店静待绽放
- 3天前(哥伦比亚号邮轮)爱达邮轮与哥仑比亚船舶管理集团达成合作
- 3天前(福朋喜来登酒店宴会厅)福朋喜来登品牌亮相北部湾城市群 阳江中心福朋喜来登酒店开业
- 3天前(新西兰“空降”上海:新西兰旅游局邀请你来“玩真的”!)新西兰“空降”上海:新西兰旅游局邀请你来“玩真的”!
- 3天前(曼谷丽思卡尔顿公寓价格)曼谷丽思卡尔顿酒店盛大启幕,开创泰国奢华雅致新纪元
- 3天前(澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布)澳涞山庄见证北欧零碳到中国实践,世界十佳环境保护城市榜单发布
- 3天前(锦江 iu)锦江荟APP原生鸿蒙版正式上线打造全场景旅行服务新体验
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章