rabbitMQ相对来说功能比较完善,吞吐量会低一点。
持续更新……
安装
docker
测试选择docker安装
官方安装操作
1、docker pull rabbitmq:latest
2、docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq
3、docker exec -it 容器id /bin/bash
3.1、rabbitmq-plugins enable rabbitmq_management
消费者确认机制ack
如果一个消费者在没有发送ack的情况下死亡(它的信道被关闭、连接被关闭或TCP连接丢失),RabbitMQ会认为消息没有被完全处理,并将其重新排队。如果同时有其他消费者在线,它会很快将其重新交付给另一个消费者。
关键代码案例-消费
// 一次只接受一个未被ack的消息 channel.basicQos(1);// 手动ack channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 设置不自动ack boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
消费者未ack时将会被阻塞,在消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测那些从不承认交付的有问题(卡住)的消费者。
生产者确认机制confirms和事务
生产者将信道设置成confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID)。confirm模式最大的好处在于他是异步的,生产者可以在等信道返回确认的同时继续发送下一条消息。
同步确认:publisher-confirm-type: simple
异步确认:publisher-confirm-type: correlated
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。
消息持久化
关键代码案例-发布
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);// 标记持久消息
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
需要注意的是:RabbitMQ不允许你用不同的参数重新定义一个已经存在的队列,任何尝试这样做的程序都会返回错误。
任务分配
在不配置情况下它不会查看消费者的未确认消息数量。它只是盲目地将第n条消息分发给第n个消费者。它会均匀地分发消息,而不考虑消费者的消费能力。
为了克服这个问题,我们可以使用prefetchCount = 1的basicQos方法。这告诉RabbitMQ不要一次给worker发送多条消息。换句话说,在worker处理并确认前一条消息之前,不要将新消息分发给worker。相反,它会将其分发给下一个不忙的worker。防止消费能力差/任务重的消费者堆积。
关键代码案例-消费
// 一次只接受一个未被ack的消息 channel.basicQos(1);
Exchanges交换器
RabbitMQ消息模型的核心思想是生产者从不直接将任何消息发送到队列。实际上,很多情况下,生产者根本不知道消息是否会被传递到任何队列。
生产者只能向exchange发送消息(通常情况下,也可以直接向队列发消息,不推荐),exchange必须确切地知道如何处理接收到的消息。这些规则由exchange类型定义。
交换类型分为:direct、topic、headers和fanout
- fanout:它将接收到的所有消息广播给它知道的所有队列。
- direct:消息将发送到绑定键与消息的路由键完全匹配的队列。
- topic: 路由键是由点分隔的单词,不超过255 bytes。特殊符号*表示替代一个词;特殊符号#表示替代零个或多个词。
- headers:设置key和value的规则和queue绑定,需要设置验证消息的header。性能差,一般不用。
-
临时队列
队列用完即删。
服务器为我们选择一个随机的队列名称。一旦我们断开了消费者的连接,队列被自动删除。
String queueName = channel.queueDeclare().getQueue();
Bindings绑定
交换器和队列之间的关系称为绑定(binding)。
模板
channel.queueBind(queueName, exchangesName, "");
从${exchangesName}里来的消息将会发往${queueName}
参数routingKey
上边模板可以看出来bind还有第三个参数,这个参数是routingKey。如果交换类型是fanout则会忽略此值。可以使用direct模式
需要注意的是使用同一个绑定键绑定多个队列是完全可以的。
模拟实现RPC
常用消息属性
deliveryMode:将消息标记为持久的(值为2)或临时的(任何其他值)
contentType:用于描述编码的mime-type。
replyTo:通常用于命名回调队列
correlationId:用于将RPC响应与请求关联起来
总览
生产者向request-queue发送消息并且为每个消息创建单独的reply-queue供消费者发送响应,生产者通过CompletableFuture阻塞获取响应。
消费者消费request-queue消息并且拿到消息中的reply-queue-name,向其发送调用响应。
官方代码案例:RPCClient.java and RPCServer.java.
死信队列
死信队列需要配置和队列的关联。
来自队列的消息可能是死信(dead-lettered)意味着当以下事件发生时,消会息重新发布到交换器。
- 消息通过被消费者使用basic.reject或者basic.nack以及requeue参数设置为false去否定
- 消息生存时间过期。
- 因为队列限制,消息被丢弃。
注意:如果是队列过期,则不会使消息变为死信。
如果没配置死信队列出现以上情况,消息则会被丢弃
消息TTL和队列TTL
可以和死信队列配合用作设计超时订单,延时消息。
消息TTL决定消息在队列中可以保留多长时间。如果消息在队列中的保留时间超过了该消息的TTL,则该消息将过期而被丢弃。
懒惰队列
定义:队列中的消息实际上尽可能早地移动到磁盘。这些消息只有在使用者请求时才加载到内存中。
使用场景:消费能力差,队列消息太多,内存不足导致队列进程阻塞。
优先级队列
队列优先级是0~255可选的一个整数,不建议设置太大,原因是:每个队列的每个优先级都有一些内存和磁盘开销。这还会产生额外的CPU成本。强烈建议在1~5之间。
批量发送消息
rabbitMQ批量发送的消息和其他mq不同,在broker存储的是一条聚合消息。另外,BatchingRabbitTemplate 提供的批量发送消息的能力比较弱。对于同一个 BatchingRabbitTemplate 对象来说,同一时刻只能有一个批次(保证 Exchange + RoutingKey 相同),否则会报错。
spring中消费异常处理
在执行顺序上,RabbitListenerErrorHandler 先于 ErrorHandler 执行。不过这个需要建立在一个前提上,RabbitListenerErrorHandler 需要继续抛出异常。
另外,RabbitListenerErrorHandler 需要每个 @RabbitListener 注解上,需要每个手动设置下 errorHandler 属性。而 ErrorHandler 是相对全局的,所有 SimpleRabbitListenerContainerFactory 创建的 SimpleMessageListenerContainer 都会生效。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章