上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

基于Rabbitmq和Redis的延迟消息实现

guduadmin119小时前

1 基于Rabbitmq延迟消息实现

支付时间设置为30,未支付的消息会积压在mq中,给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第1张

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第2张

1.1定义延迟消息实体

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体

@Data
public class MultiDelayMessage {
    /**
     * 消息体
     */
    private T data;
    /**
     * 记录延迟时间的集合
     */
    private List delayMillis;
    public MultiDelayMessage(T data, List delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }
    public static  MultiDelayMessage of(T data, Long ... delayMillis){
        return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
    }
    /**
     * 获取并移除下一个延迟时间
     * @return 队列中的第一个延迟时间
     */
    public Long removeNextDelay(){
        return delayMillis.remove(0);
    }
    /**
     * 是否还有下一个延迟时间
     */
    public boolean hasNextDelay(){
        return !delayMillis.isEmpty();
    }
}

1.2 定义常量,用于记录交换机、队列、RoutingKey等常量

package com.hmall.trade.constants;
public interface MqConstants {
    String DELAY_EXCHANGE = "trade.delay.topic";
    String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
    String DELAY_ORDER_ROUTING_KEY = "order.query";
}

1.3 抽取mq配置到nacos中

spring:
  rabbitmq:
    host: ${hm.mq.host:192.168.150.101} # 主机名
    port: ${hm.mq.port:5672} # 端口
    virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机
    username: ${hm.mq.un:hmall} # 用户名
    password: ${hm.mq.pw:123} # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

1.4 定义消息处理器

使用延迟消息处理器发送消息

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第3张

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第4张

1.5 消息监听与延迟消息再次发送

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第5张

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第6张

2 延迟消息实现

DelayQueue:基于JVM,保存在内存中,会出现消息丢失

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第7张

Rabbitmq的延迟任务:基于TTL和死信交换机

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第8张

2.1 redis的延迟任务:基于zset的去重和排序功能

基于Rabbitmq和Redis的延迟消息实现,在这里插入图片描述,第9张

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑

2.为什么使用redis中的两种数据类型,list和zset?