上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

@RabbitListener 消息队列 消息序列化

guduadmin11天前

MessageConvert

涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析。RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等

  • 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
  • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
  • 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    @Configuration
    public class RabbitMQConfig {
        public static final String WINCALLCDR_QUEUE = "WINCHANCDR_QUEUE";
        //生产者
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            //发送消息进行序列化
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
        //消费者
        @Bean("rabbitListenerContainerFactory")
        public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory mqConnectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(mqConnectionFactory);
            //--加上这句  自定义MessageConverter
            factory.setMessageConverter(new RabbitMessageConverter());
            //反序列化
            //factory.setMessageConverter(new Jackson2JsonMessageConverter());
            //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack
            return factory;
        }
    }

    自定义MessageConverter

    在一些场景下我们希望在消息发送到MQ之前或者接受消息前对消息做一些自定义处理,这个时候就需要自定义MessageConverter了

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    public class RabbitMessageConverter implements MessageConverter {
        /**
         * 发送消息时转换
         */
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            System.out.println("=======toMessage=========");
            return new Message(object.toString().getBytes(), messageProperties);
        }
        /**
         * 接受消息时转换
         */
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            return new String(message.getBody());
        }
    }

    @RabbitListener 用法

    使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理。@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过来设置@RabbitListener的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener。

    注意

    消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

    消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常

    配置消费者

    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import net.icsoc.axt.job.config.RabbitMQConfig;
    import net.icsoc.axt.job.dto.WinCallCdrDTO;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.dao.DataAccessException;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    @Component
    @Slf4j
    public class CallListener {
        @Autowired
        RabbitTemplate rabbitTemplate;
        @PostConstruct
        public void convertAndSendOrder() {
             //创建生产数据
             String jsonStr ="{user_id:234}"
             rabbitTemplate.convertAndSend("exchange.topic", "routingKey.aa", jsonStr);
        }
        @RabbitListener(queues = RabbitMQConfig.WINCALLCDR_QUEUE, containerFactory = "rabbitListenerContainerFactory")
        public void winCallCdr(String messsageBody) {
            //log.info("winCallCdr消费者收到消息  : " + messsageBody);
            WinCallCdrDTO winCallCdrDTO = JSON.parseObject(messsageBody, WinCallCdrDTO.class);
            try {
                exectueSaveWinCallCdrData2Db(winCallCdrDTO);
                log.info("winCallCdr成功消费消息 {}", winCallCdrDTO.getCallId());
            } catch (DataAccessException e) {
                log.error("消费winCallCdr异常 {} {}", messsageBody, e);
            }
        }
    }

    和 @RabbitHandler 搭配使用

    @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用

    @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

    @Component
    @RabbitListener(queues = "consumer_queue")
    public class Receiver {
        @RabbitHandler
        public void processMessage1(String message) {
            System.out.println(message);
        }
        @RabbitHandler
        public void processMessage2(byte[] message) {
            System.out.println(new String(message));
        }
        
    }
    

    @Payload 与 @Headers

    使用 @Payload 和 @Headers 注解可以消息中的 body 与 headers 信息

    @RabbitListener(queues = "debug")
    public void processMessage1(@Payload String body, @Headers Map headers) {
        System.out.println("body:"+body);
        System.out.println("Headers:"+headers);
    }
    

    也可以获取单个 Header 属性

    @RabbitListener(queues = "debug")
    public void processMessage1(@Payload String body, @Header String token) {
        System.out.println("body:"+body);
        System.out.println("token:"+token);
    }
    

    通过 @RabbitListener 注解声明 Binding

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "topic.exchange",durable = "true",type = "topic"),
            value = @Queue(value = "consumer_queue",durable = "true"),
            key = "key.#"
    ))
    public void processMessage1(Message message) {
        System.out.println(message);
    }
    

    自动确认

    生产者产生10笔消息,自动确认模式下,消息处理成功,消费者才会去获取下一笔消息;消息处理抛出异常,那么将会消息重回队列。自动确认分四种情况(第一就是正常消费,其他三种为异常情况)

    • 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
    • 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
    • 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
    • 抛出其他的异常,消息会被拒绝,且requeue = true

      手动确认

      常用API

      • channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);   ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
      • channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);  Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
      • channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);   nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示不是重回队列

        当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。

        消息重发送到队尾

        可能会出现堆积

            //消费者处理消息缓慢
            @RabbitListener(queues = {"kinson1"})
            public void receiver3(Message msg, Channel channel) throws IOException {
                try {
                    //打印数据
                    String message = new String(msg.getBody(), StandardCharsets.UTF_8);
                    log.info("【开始】:{}",message);
                    if("0".equals(message)){
                        throw new RuntimeException("0的消息消费异常");
                    }
                    log.info("【结束】:{}", message);
                    //ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag的所有消息
                    channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
                } catch (Exception e) {
                    //捕获异常后,重新发送到指定队列,自动ack不抛出异常即为ack
                    channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                            msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                            msg.getBody());
                }
            }
        

         如何处理异常消息

        如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。解决这个问题可以采取两种方案:

        1.一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,直接丢弃该消息方案。

        2.另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,处理次数到达我们设置的值时,我们就丢弃该消息,但需要记录详细的日志。

        将业务队列绑定死信队列,当消息被丢弃后,进入到死信队列(代码修复后监听死信队列补偿消息)。可以避免我们手动的恢复消息。

        @Component
        @Slf4j
        public class CustomerRev {
            @RabbitListener(queues = {"kinson1"})
            public void receiver3(Message msg, Channel channel) throws IOException {
                try {
                    //打印数据
                    String message = new String(msg.getBody(), StandardCharsets.UTF_8);
                    log.info("【开始】:{}",message);
                    if("0".equals(message)){
                        throw new RuntimeException("0的消息消费异常");
                    }
                    log.info("【结束】:{}", message);
                } catch (Exception e) {
                    //捕获异常后,重新发送到指定队列,自动确认不抛出异常即为ack
                    Integer retryCount;
                    Map headers = msg.getMessageProperties().getHeaders();
                    if(!headers.containsKey("retry-count")){
                        retryCount=0;
                    }else {
                        retryCount = (Integer)headers.get("retry-count");
                    }
                    //判断是否满足最大重试次数(重试3次)
                    if(retryCount++<3) {
                        headers.put("retry-count",retryCount);
                        //重新发送到MQ中
                        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("text/plain").headers(headers).build();
                        channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
                                msg.getMessageProperties().getReceivedRoutingKey(), basicProperties,
                                msg.getBody());
                    }
                }
            }
        }

        重试机制如何合理配置

        重试机制能保证某些场景下消息能被消费掉。适合重试场景:大部分属于读取,如调用第三方接口、网络波动问题、暂时调用不了、网络连接等。重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。

        采坑:不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。

        spring:
          rabbitmq:
            listener:
              simple:
                retry:
                  # 开启消费者重试机制(默认就是true,false则取消重试机制)
                  enabled: true
                  # 最大重试次数
                  max-attempts: 5
                  # 重试间距(单位:秒)
                  initial-interval: 2s
        

        以上配置消息会重试5次,如果一直失败,RabbitMQ放弃消费了

网友评论

搜索
最新文章
热门文章
热门标签