package main import ( "fmt" "log" "sync" "time" "github.com/streadway/amqp" ) func declareQueue(ch *amqp.Channel, queueName string) error { _, err := ch.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) return err } func producer(ch *amqp.Channel, wg *sync.WaitGroup, queueName string) { defer wg.Done() for i := 1; i <= 5; i++ { message := fmt.Sprintf("Message %d", i) err := ch.Publish( "", // exchange queueName, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) if err != nil { log.Fatalf("Failed to publish a message: %v", err) } fmt.Printf("Sent: %s\n", message) time.Sleep(time.Second) } } func consumer(ch *amqp.Channel, wg *sync.WaitGroup, queueName string) { defer wg.Done() msgs, err := ch.Consume( queueName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) } for msg := range msgs { fmt.Printf("Received: %s\n", msg.Body) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() queueName := "your_queue_name" // Declare the queue before using it err = declareQueue(ch, queueName) if err != nil { log.Fatalf("Failed to declare queue: %v", err) } var wg sync.WaitGroup wg.Add(2) go producer(ch, &wg, queueName) go consumer(ch, &wg, queueName) wg.Wait() }
运行结果
Sent: Message 1 Received: Message 1 Sent: Message 2 Received: Message 2 Sent: Message 3 Received: Message 3 Sent: Message 4 Received: Message 4 Sent: Message 5 Received: Message 5
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章