1. 生产者与消费者关系
在RabbitMQ中,生产者(Producer)负责发送消息,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则负责处理接收到的这些消息。在RabbitMQ中,生产者和消费者之间使用交换器(Exchange)和队列(Queue)进行消息路由和存储。生产者将消息发送到交换器,交换器根据消息的路由键将其放入相应的队列中,最后消费者从队列中获取并处理这些消息。
2. 交换器与队列进行消息路由和存储
2.1 交换器与队列
交换器(Exchange)负责处理生产者发送的消息,并根据路由键(Routing Key)将消息分发到相应的队列(Queue)中。RabbitMQ中以下四种类型的交换器:
1. 直接交换器(Direct)
2. 分发交换器(Fanout)
3. 主题交换器(Topic)
4. 头部交换器(Headers)
队列是用于存储消息的数据结构,并为消费者准备好消息以进行消息消费。生产者发送的消息将被保存在一个或多个队列中,等待消费者进行消息处理。
2.2Java代码示例
以下是一个简单的Java代码示例,展示了如何在RabbitMQ中创建交换器、队列和绑定,并进行消息的发送与接收。
添加以下依赖到pom.xml文件
com.rabbitmq amqp-client5.13.0
RabbitMqUtils代码:
public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception{ // 创建工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); // 创建链接 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
2.2 Java代码示例与注释
创建一个生产者ProducerDemo
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ProducerDemo { public static void main(String[] args) throws IOException, TimeoutException { // 创建链接 Channel channel = RabbitMqUtils.getChannel(); // 创建交换器 String exchangeName = "test_exchange"; // 1队列名称 2队列类型 3 是否持久化交换机 true是 false否 channel.exchangeDeclare(exchangeName, "direct", true); // 创建队列 String queueName = "test_queue"; /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(queueName, true, false, false, null); // 绑定队列与交换器 String routingKey = "test.routing.key"; // 1 绑定的队列 2交换机名称 3路由key channel.queueBind(queueName, exchangeName, routingKey); // 发送消息 String message = "Hello, RabbitMQ!"; /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish(exchangeName, routingKey, null, message.getBytes("UTF-8")); // 关闭资源 channel.close(); // 关闭连接 connection.close(); } }
以上是在ProducerDemo类中,我们首先创建了一个交换器和一个队列。然后将队列与交换机进行绑定,并设置一个路由键。接着,我们将消息发送到交换器,并使用相同的路由键。
创建一个消费者ConsumerDemo
import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; public class ConsumerDemo { public static void main(String[] args) throws IOException { // 创建链接 Channel channel = RabbitMqUtils.getChannel(); // 创建队列 String queueName = "test_queue"; /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(queueName, true, false, false, null); // 创建消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received: " + message); } }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(queueName, true, consumer); } }
2.3 交换器与队列总结
交换器与队列用于实现消息的路由和存储。在具体应用中,我们需要创建不同类型的交换器、队列,并使用路由键进行绑定。 Java代码示例展示了在RabbitMQ中如何进行简单的交换器、队列创建、绑定以及消息的发送和接收
3. 消息确认机制(ACK)
RabbitMQ中的消息确认机制,即ACK(Acknowledgement),是为了确保消息成功地从生产者传递到消费者。消费者处理完一个消息后,需要向RabbitMQ服务器发送一个ACK信号,告知服务器该消息已收到且处理完毕,允许服务器删除这个消息。根据确认时机的不同,ACK分为:
3.1 自动确认(Auto Ack)
自动确认是指,当消费者接收到消息后,会立即向服务器发送ACK信号,不管消息是否处理成功。自动确认的优点是速度快,但是可能导致消息丢失。如果在消费者处理消息过程中发生异常或宕机,由于已经发送了ACK信号,服务器将认为消息已被处理,从而导致消息丢失。
3.2 手动确认(Manual Ack)
手动确认是指,当消费者接收到消息后,可以选择在消息处理成功后再向服务器发送ACK信号。如果消费者处理消息过程中发生异常,可以选择不发送ACK信号,服务器将重新分发该消息给其他消费者。这样可以降低消息丢失的风险,但是相对于自动确认,速度较慢。
Java代码示例:
生产者代码:
public class producer { public static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 发布确认 channel.confirmSelect(); // 创建队列 // 持久化 boolean durable = true; /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); // 设置生产发送消息为持久化消息(要求保存到磁盘上) /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息:"+message); } } }
消费者代码:
package cn.example.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; public class ConsumerDemo { public static void main(String[] args) throws IOException { Channel channel = RabbitMqUtils.getChannel(); String queueName = "ack_queue"; // 创建队列 channel.queueDeclare(queueName, false, false, false, null); channel.basicQos(1); // 自动确认 boolean autoAck = false; // 手动确认 // boolean autoAck = true; // 创建一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received: " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if (!autoAck) { // 手动确认 channel.basicAck(envelope.getDeliveryTag(), false); } } }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(queueName, autoAck, consumer); } }
3.3 消息确认机制总结
生产者和消费者通过交换器和队列进行消息的发送与接收。为了确保消息正常传递,RabbitMQ提供了消息确认机制。自动确认的速度较快,但存在消息丢失的风险;手动确认可以降低消息丢失风险,但速度较慢。开发者可根据实际应用场景选择合适的确认机制。
猜你喜欢
- 3小时前Modbus协议学习第三篇之协议通信规则
- 3小时前大创项目推荐 深度学习火车票识别系统
- 3小时前OpenWRT搭建个人web站点并结合内网穿透实现公网远程访问
- 3小时前网络安全(黑客)—2024自学
- 3小时前【论文阅读】One For All: Toward Training One Graph Model for All Classification Tasks
- 3小时前15.单例模式
- 3小时前udf提权
- 3小时前HTTPS:如何确保您的网站数据传输安全?
- 3小时前数据湖架构Hudi(二)Hudi版本0.12源码编译、Hudi集成spark、使用IDEA与spark对hudi表增删改查
- 2小时前手机掉厕所怎么办(手机掉蹲厕里了应该怎么处理)
网友评论
- 搜索
- 最新文章
- 热门文章