上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

RabbitMQ初级篇:生产者与消费者关系、消息确认机制(ACK)、交换器与队列进行消息路由和存储

guduadmin33小时前

1. 生产者与消费者关系

在RabbitMQ中,生产者(Producer)负责发送消息,通常是应用程序向RabbitMQ服务器发送具有特定路由键的消息;消费者(Consumer)则负责处理接收到的这些消息。在RabbitMQ中,生产者和消费者之间使用交换器(Exchange)和队列(Queue)进行消息路由和存储。生产者将消息发送到交换器,交换器根据消息的路由键将其放入相应的队列中,最后消费者从队列中获取并处理这些消息。

2. 交换器与队列进行消息路由和存储

2.1 交换器与队列

交换器(Exchange)负责处理生产者发送的消息,并根据路由键(Routing Key)将消息分发到相应的队列(Queue)中。RabbitMQ中以下四种类型的交换器:

1. 直接交换器(Direct)

2. 分发交换器(Fanout)

3. 主题交换器(Topic)

4. 头部交换器(Headers)

队列是用于存储消息的数据结构,并为消费者准备好消息以进行消息消费。生产者发送的消息将被保存在一个或多个队列中,等待消费者进行消息处理。

2.2Java代码示例

以下是一个简单的Java代码示例,展示了如何在RabbitMQ中创建交换器、队列和绑定,并进行消息的发送与接收。

添加以下依赖到pom.xml文件


  com.rabbitmq
  amqp-client
  5.13.0

RabbitMqUtils代码:

public class RabbitMqUtils {
    //得到一个连接的 channel
    public static Channel getChannel() throws Exception{
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 创建链接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

2.2 Java代码示例与注释

创建一个生产者ProducerDemo

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建链接
        Channel channel = RabbitMqUtils.getChannel();
        
        // 创建交换器
        String exchangeName = "test_exchange";
        // 1队列名称 2队列类型 3 是否持久化交换机 true是 false否
        channel.exchangeDeclare(exchangeName, "direct", true);
        
        // 创建队列
        String queueName = "test_queue";
        /**
           * 生成一个队列
           * 1.队列名称
           * 2.队列里面的消息是否持久化 默认消息存储在内存中
           * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
           * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
           * 5.其他参数
           */
        channel.queueDeclare(queueName, true, false, false, null);
        
        // 绑定队列与交换器
        String routingKey = "test.routing.key";
        // 1 绑定的队列  2交换机名称  3路由key
        channel.queueBind(queueName, exchangeName, routingKey);
        
        // 发送消息
        String message = "Hello, RabbitMQ!";
        /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
        channel.basicPublish(exchangeName, routingKey, null, message.getBytes("UTF-8"));
        
        // 关闭资源
        channel.close();
        // 关闭连接
        connection.close();
    }
}

以上是在ProducerDemo类中,我们首先创建了一个交换器和一个队列。然后将队列与交换机进行绑定,并设置一个路由键。接着,我们将消息发送到交换器,并使用相同的路由键。

创建一个消费者ConsumerDemo

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class ConsumerDemo {
    public static void main(String[] args) throws IOException {
        // 创建链接
        Channel channel = RabbitMqUtils.getChannel();
        
        // 创建队列
        String queueName = "test_queue";
        /**
           * 生成一个队列
           * 1.队列名称
           * 2.队列里面的消息是否持久化 默认消息存储在内存中
           * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
           * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
           * 5.其他参数
           */
        channel.queueDeclare(queueName, true, false, false, null);
        
        // 创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received: " + message);
            }
        };
         /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         */
        channel.basicConsume(queueName, true, consumer);
    }
}

2.3 交换器与队列总结

交换器与队列用于实现消息的路由和存储。在具体应用中,我们需要创建不同类型的交换器、队列,并使用路由键进行绑定。 Java代码示例展示了在RabbitMQ中如何进行简单的交换器、队列创建、绑定以及消息的发送和接收

3. 消息确认机制(ACK)

RabbitMQ中的消息确认机制,即ACK(Acknowledgement),是为了确保消息成功地从生产者传递到消费者。消费者处理完一个消息后,需要向RabbitMQ服务器发送一个ACK信号,告知服务器该消息已收到且处理完毕,允许服务器删除这个消息。根据确认时机的不同,ACK分为:

3.1 自动确认(Auto Ack)

自动确认是指,当消费者接收到消息后,会立即向服务器发送ACK信号,不管消息是否处理成功。自动确认的优点是速度快,但是可能导致消息丢失。如果在消费者处理消息过程中发生异常或宕机,由于已经发送了ACK信号,服务器将认为消息已被处理,从而导致消息丢失。

3.2 手动确认(Manual Ack)

手动确认是指,当消费者接收到消息后,可以选择在消息处理成功后再向服务器发送ACK信号。如果消费者处理消息过程中发生异常,可以选择不发送ACK信号,服务器将重新分发该消息给其他消费者。这样可以降低消息丢失的风险,但是相对于自动确认,速度较慢。

Java代码示例:

生产者代码:

public class producer {
    public static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 发布确认
        channel.confirmSelect();
        // 创建队列
        // 持久化
        boolean durable = true;
        /**
           * 生成一个队列
           * 1.队列名称
           * 2.队列里面的消息是否持久化 默认消息存储在内存中
           * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
           * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
           * 5.其他参数
           */
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            // 设置生产发送消息为持久化消息(要求保存到磁盘上)
             /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }
}

消费者代码:

package cn.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.io.IOException;
public class ConsumerDemo {
    public static void main(String[] args) throws IOException {
          Channel channel = RabbitMqUtils.getChannel();
        String queueName = "ack_queue";
        // 创建队列
        channel.queueDeclare(queueName, false, false, false, null);
        channel.basicQos(1);
        // 自动确认
        boolean autoAck = false;
        // 手动确认
        // boolean autoAck = true;
        // 创建一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received: " + message);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (!autoAck) {
                    // 手动确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
         /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         */
        channel.basicConsume(queueName, autoAck, consumer);
    }
}

3.3 消息确认机制总结

生产者和消费者通过交换器和队列进行消息的发送与接收。为了确保消息正常传递,RabbitMQ提供了消息确认机制。自动确认的速度较快,但存在消息丢失的风险;手动确认可以降低消息丢失风险,但速度较慢。开发者可根据实际应用场景选择合适的确认机制。

网友评论

搜索
最新文章
热门文章
热门标签