上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

基于Rabbitmq和Redis的延迟消息实现

guduadmin59月前

1 基于Rabbitmq延迟消息实现

支付时间设置为30,未支付的消息会积压在mq中,给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第1张

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第2张

1.1定义延迟消息实体

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体

@Data
public class MultiDelayMessage {
    /**
     * 消息体
     */
    private T data;
    /**
     * 记录延迟时间的集合
     */
    private List delayMillis;
    public MultiDelayMessage(T data, List delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }
    public static  MultiDelayMessage of(T data, Long ... delayMillis){
        return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
    }
    /**
     * 获取并移除下一个延迟时间
     * @return 队列中的第一个延迟时间
     */
    public Long removeNextDelay(){
        return delayMillis.remove(0);
    }
    /**
     * 是否还有下一个延迟时间
     */
    public boolean hasNextDelay(){
        return !delayMillis.isEmpty();
    }
}

1.2 定义常量,用于记录交换机、队列、RoutingKey等常量

package com.hmall.trade.constants;
public interface MqConstants {
    String DELAY_EXCHANGE = "trade.delay.topic";
    String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
    String DELAY_ORDER_ROUTING_KEY = "order.query";
}

1.3 抽取mq配置到nacos中

spring:
  rabbitmq:
    host: ${hm.mq.host:192.168.150.101} # 主机名
    port: ${hm.mq.port:5672} # 端口
    virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机
    username: ${hm.mq.un:hmall} # 用户名
    password: ${hm.mq.pw:123} # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

1.4 定义消息处理器

使用延迟消息处理器发送消息

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第3张

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第4张

1.5 消息监听与延迟消息再次发送

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第5张

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第6张

2 延迟消息实现

DelayQueue:基于JVM,保存在内存中,会出现消息丢失

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第7张

Rabbitmq的延迟任务:基于TTL和死信交换机

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第8张

2.1 redis的延迟任务:基于zset的去重和排序功能

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第9张

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑

2.为什么使用redis中的两种数据类型,list和zset?

  • 原因一: list存储立即执行的任务,zset存储未来的数据
  • 原因二:任务量过大以后,zset的性能会下降

    时间复杂度:执行时间(次数) 随着数据规模增长的变化趋势

    • 操作redis中的list命令LPUSH: 时间复杂度: O(1)
    • 操作redis中的zset命令zadd: 时间复杂度: (Mlog(n))

      2.2 设计mybatis映射实体类:

      /**
           * 版本号,用乐观锁
           */
          @Version
          private Integer version;
      乐观锁支持:
      /**
           * mybatis-plus乐观锁支持
           * @return
           */
      @Bean
      public MybatisPlusInterceptor optimisticLockerInterceptor(){
          MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
          interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
          return interceptor;
      }
      

      2.3 创建task类,用于接收添加任务的参数

      @Data
      public class Task implements Serializable {
          /**
           * 任务id
           */
          private Long taskId;
          /**
           * 类型
           */
          private Integer taskType;
          /**
           * 优先级
           */
          private Integer priority;
          /**
           * 执行id
           */
          private long executeTime;
          /**
           * task参数
           */
          private byte[] parameters;
          
      }
      

      2.4 添加任务

      2.4.1 添加任务到数据库中

      addTaskToDb(task);修改任务表和日志表

      @Autowired
          private TaskinfoMapper taskinfoMapper;
          @Autowired
          private TaskinfoLogsMapper taskinfoLogsMapper;
          /**
           * 添加任务到数据库中
           *
           * @param task
           * @return
           */
          private boolean addTaskToDb(Task task) {
              boolean flag = false;
              try {
                  //保存任务表
                  Taskinfo taskinfo = new Taskinfo();
                  BeanUtils.copyProperties(task, taskinfo);
                  taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
                  taskinfoMapper.insert(taskinfo);
                  //设置taskID
                  task.setTaskId(taskinfo.getTaskId());
                  //保存任务日志数据
                  TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
                  BeanUtils.copyProperties(taskinfo, taskinfoLogs);
                  taskinfoLogs.setVersion(1);
                  taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
                  taskinfoLogsMapper.insert(taskinfoLogs);
                  flag = true;
              } catch (Exception e) {
                  e.printStackTrace();
              }
              return flag;
          }
      
      2.4.2 添加任务到redis

      addTaskToCache(task);判断任务执行之间是否在现在还是未来五分钟内

      @Autowired
          private CacheService cacheService;
          /**
           * 把任务添加到redis中
           *
           * @param task
           */
          private void addTaskToCache(Task task) {
              String key = task.getTaskType() + "_" + task.getPriority();
              //获取5分钟之后的时间  毫秒值
              Calendar calendar = Calendar.getInstance();
              calendar.add(Calendar.MINUTE, 5);
              long nextScheduleTime = calendar.getTimeInMillis();
              //2.1 如果任务的执行时间小于等于当前时间,存入list
              if (task.getExecuteTime() <= System.currentTimeMillis()) {
                  cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
              } else if (task.getExecuteTime() <= nextScheduleTime) {
                  //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
                  cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
              }
          }
      

      2.5 删除任务

      1、删除数据库任务表,更改日志表任务状态

      2、删除list或者zset中的任务

      在TaskService中添加方法

      /**
           * 取消任务
           * @param taskId        任务id
           * @return              取消结果
           */
      public boolean cancelTask(long taskId);
      
      /**
           * 取消任务
           * @param taskId
           * @return
           */
      @Override
      public boolean cancelTask(long taskId) {
          boolean flag = false;
          //删除任务,更新日志
          Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
          //删除redis的数据
          if(task != null){
              removeTaskFromCache(task);
              flag = true;
          }
          return false;
      }
      /**
           * 删除redis中的任务数据
           * @param task
           */
      private void removeTaskFromCache(Task task) {
          String key = task.getTaskType()+"_"+task.getPriority();
          if(task.getExecuteTime()<=System.currentTimeMillis()){
              cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
          }else {
              cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
          }
      }
      /**
           * 删除任务,更新任务日志状态
           * @param taskId
           * @param status
           * @return
           */
      private Task updateDb(long taskId, int status) {
          Task task = null;
          try {
              //删除任务
              taskinfoMapper.deleteById(taskId);
              TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
              taskinfoLogs.setStatus(status);
              taskinfoLogsMapper.updateById(taskinfoLogs);
              task = new Task();
              BeanUtils.copyProperties(taskinfoLogs,task);
              task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
          }catch (Exception e){
              log.error("task cancel exception taskid={}",taskId);
          }
          return task;
      }
      

      2.6 消费任务

      1、删除list中的数据

      2、使用updateDB删除任务表、跟新日志表

      在TaskService中添加方法

      /**
       * 按照类型和优先级来拉取任务
       * @param type
       * @param priority
       * @return
       */
      public Task poll(int type,int priority);
      

      实现

      /**
           * 按照类型和优先级拉取任务
           * @return
           */
      @Override
      public Task poll(int type,int priority) {
          Task task = null;
          try {
              String key = type+"_"+priority;
              String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
              if(StringUtils.isNotBlank(task_json)){
                  task = JSON.parseObject(task_json, Task.class);
                  //更新数据库信息
                  updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
              }
          }catch (Exception e){
              e.printStackTrace();
              log.error("poll task exception");
          }
          return task;
      }
      

      2.7 未来定时任务更新-reids管道

      减少与redis的交互次数

      1、在引导类中添加开启任务调度注解:@EnableScheduling

      2、在service中添加定时任务 @Scheduled(cron = “0 */1 * * * ?”),每分钟一次

      @Scheduled(cron = "0 */1 * * * ?")
      public void refresh() {
          System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
          // 获取所有未来数据集合的key值
          Set futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
          for (String futureKey : futureKeys) { // future_250_250
              String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
              //获取该组key下当前需要消费的任务数据
              Set tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
              if (!tasks.isEmpty()) {
                  //将这些任务数据添加到消费者队列中
                  cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
                  System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
              }
          }
      }
      
      public List refreshWithPipeline(String future_key,String topic_key,Collection values){
              List objects = stringRedisTemplate.executePipelined(new RedisCallback() {
                  @Nullable
                  @Override
                  public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                      StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
                      String[] strings = values.toArray(new String[values.size()]);
                      stringRedisConnection.rPush(topic_key,strings);
                      stringRedisConnection.zRem(future_key,strings);
                      return null;
                  }
              });
              return objects;
          }
       
      

      总结

      1、使用rebbitmq使用的场景是在支付和订单微服务中,用于实现消息可以延迟30分钟付款的功能。并借用该中间件的插件实现支付的异步下单功能,并可以快速处理前几分钟,防止消息堆积

      2、使用redis是基于zset的去重和排序功能,相当于将一定数据的保存在数据库,使用定时任务同步数据库符合五分钟的任务到zset中,然后,在在zest中定时更新可以运行的任务到list集合中,相当于实现了延迟功能和缓存功能。

      3、第二种还可以扩展为将rabbitmq中等待时间较长的数据存到redis中,然后定时的去同步redis中的数据到数据库中,防止消息堆积。

      网友评论