RabbitMQ系列
RabbitMQ系列-概念及安裝 提到AMQP 0-9-1協定預設支援四種exchange,分別是Direct Exchange,Fanout Exchange,Topic Exchange,Headers Exchange
除了交換型別之外,交換還宣告了許多屬性
Direct Exchange根據路由資訊將訊息送到指定佇列
工作流程如下
Direct Exchange模型如下圖所示
佇列queue1和direct exchange的繫結路由有info和warn,類似的queue2和direct exchange的繫結路由有debug,queue3和direct exchange的繫結路由有error
當訊息生產者producer釋出路由值為info或者warn的訊息時,根據繫結關係,該訊息將被送到queue1,並被consumer1接收處理
同理,當訊息生產者producer釋出路由值為debug的訊息時,根據繫結關係,該訊息將被送到queue2,並被consumer2接收處理
當訊息生產者producer釋出路由值為error訊息時,根據繫結關係,該訊息將被送到queue3,並被consumer3接收處理
Fanout Exchange忽略路由且將訊息副本推播到所有繫結到該交換機的佇列,假設有N個佇列繫結到Fanout Exchange,生產者傳送到訊息經過該交換機處理,將訊息副本傳送到這個N個佇列。
因此Fanout Exchange適用於廣播的場景,Fanout Exchange模型如下圖所示
佇列queue1、queue2、queue3均繫結到了fanout型別的交換機,訊息生產者producer釋出的訊息將被fanout exchange分發到queue1、queue2、queue3,最後被各自的消費者消費。
topic exchange對訊息"分發範圍"介於direct exchange和fanout exchange之間,direct exchage要求訊息的路由鍵和佇列的繫結路由鍵完全一致才分發,fanout exchange將訊息分發到所有具有繫結關係的佇列上
一般情況下,topic exchange的路由鍵由用英文逗號隔開的多個單詞構成。其中,有兩個單詞比較特殊,*
可以代表任意的一個單詞,#
可以代表0個或多個單詞
假設,有路由鍵<地區.新聞種類.子種類>的新聞分發系統,系統模型如下圖所示
其中,佇列queue1和topic交換機的繫結關係有兩個,<us.sport.*>表示關注美國地區所有體育主題相關的訊息,<*.food.apple>表示關注所有地區關於蘋果這種水果主題的訊息
佇列queue1與topic交換機繫結路由鍵<cn.car.byd>表示關注中國地區下汽車類主題下關於比亞迪的訊息。
佇列queue3與topic交換機的繫結關係為<#.huawei>表示關注所有地區關於華為的所有主題的訊息。
headers交換機忽略路由鍵,利用x-match
引數和多個可選的headers鍵值對引數來路由訊息。x-match
有兩種型別值all
和any
當x-math=all
時,所有的headers鍵值對引數需要全部匹配,當x-math=any
時,只需要headers鍵值對引數中的一個匹配即可
假設,有學生資訊訂閱系統使用的時headers型別的交換機,模型如下圖所示
其中,佇列queue1和headers交換機的繫結關係的x-math=any
,鍵值對引數為age=18
和height=170
,因此當生產者釋出的訊息包含age=18
或height=170
時,訊息將被路由到queue1
佇列queue2和headers交換機的繫結關係的x-math=all
,鍵值對引數為age=22
和height=180
,因此當生產者釋出的訊息包含age=22
和height=180
時,訊息將被路由到queue2
佇列queue3和headers交換機的繫結關係的x-math=all
,鍵值對引數為gender=male
和score=60
,因此當生產者釋出的訊息包含gender=male
和score=60
時,訊息將被路由到queue3
以amqp091-go為例,使用Direct Exchange說明Direct Exchange的基本使用方法。
以amqp091-go為例,使用Direct Exchange說明訊息者端的基本流程。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
Dial接收AMQP URI格式的字串,建立和RabbitMQ Server的TCP連線,並返回連線Connection
。TCP握手的超時時間預設為30s
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
通過和RabbitMQ Server一次網路往返互動,建立一個唯一的輕量級連線Connection
當Channel
不在需要時,需要手動呼叫Channel.Close
關閉Channel
,以釋放Channel
佔用的資源,避免記憶體漏失
當Channel
所屬的Connection
關閉時,Channel
也會被關閉。
err = ch.ExchangeDeclare(
"log_direct", // name
amqp.ExchangeDirect, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
生產者釋出的訊息會先到達Exchange,在根據Exchange型別和繫結關係將訊息路由到特定佇列。
ExchangeDeclare共有6個引數,這裡重點看下其中幾個引數
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
第二個引數type/kind
型別,AMQP 0-9-1 broker提供了四種型別,分別是direct
,fanout
,topic
和headers
,這裡使用的是direct
第三個引數durable
是否持久化,第四個引數autoDelete
是否自動刪除
第六個引數noWait
是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose
非同步處理異常。
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
佇列Queue充當了Exchange和消費者之間緩衝區的角色
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
如果佇列不存在則建立,如果存在時需要確保引數和已經存在的Queue一致,否則會返回錯誤
當name
為空時,RabbitMQ Server會生成唯一的名稱,並返回給q
第二個引數durable
是否持久化,第三個引數autoDelete
是否自動刪除
第四個引數exclusive
是否獨佔,當該引數為true時,該佇列只能被宣告這個Queue的Connection存取,並且在Connection關閉時,佇列會被刪除
第五個引數noWait
是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose
非同步處理異常。
當QueueDeclare
返回錯誤時,說明Queue建立失敗,同時Channel也會被關閉
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"log_direct", // exchange
false, // noWait
nil // args
)
failOnError(err, "Failed to bind a queue")
使用路由建立交換機和佇列的繫結關係,可以使用多個路由建立交換機和佇列的繫結關係,交換機根據路由判斷是否將訊息推播到佇列
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
第一個引數是佇列名稱name
,第三個引數是交換機名稱exchange
,第二個引數時佇列和交換機繫結關係的表示
第三個引數noWait
是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose
非同步處理異常。
當建立繫結關係QueueBind
失敗時,會返回錯誤並且Channel會被關閉。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer tag
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
開始接受來自佇列的訊息
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
Channel.Consume
返回<-chan Delivery
,消費者不斷從需要該Channel
上接受訊息,需要注意的是,消費者需要及時處理訊息,否則將阻塞Channel
所屬Connection
上的任何操作
第三個引數 autoAck
是否自動向確認RabbitMQ確認成功投遞 當該引數為true時,寫入TCP通訊端即向abbitMQ確認成功投遞。當該引數為false,則需要消費者手動發出確認資訊,即呼叫Delivery.Ack
第四個引數exclusive
是否獨佔,當該引數為true時,消費者獨佔該佇列,當該引數為false是,RabbitMQ Server將在多個消費者之間公平地分配交付
第五個引數noWait
是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose
非同步處理異常。
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
// routingKeys 繫結的路由: debug info warning error
func RecvMsg(routingKeys []string) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"log_direct", // name
amqp.ExchangeDirect, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(routingKeys) < 1 {
log.Printf("Usage: %s [info] [warning] [error]", routingKeys)
os.Exit(0)
}
for _, s := range routingKeys {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "fruit_direct", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"log_direct", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
同樣以amqp091-go為例,說明訊息生產者端的基本流程
和消費端一樣,需要通過amqp.Dial
建立TCP連線,通過Connection.Channel
建立一個輕量級連線
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
和消費端一樣需要宣告交換機Exchange,需要注意的是,生產者和消費者都宣告了相同名稱的Exchange,需要保持兩者的引數是一致的,否則會報錯
err = ch.ExchangeDeclare(
"log_direct", // name
amqp091.ExchangeDirect, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
err = ch.PublishWithContext(ctx,
"log_direct", // exchange
severityFrom(msg), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
failOnError(err, "Failed to publish a message")
採用非同步的方式將訊息傳送到RabbitMQ server到交換機
第三個引數 mandatory
是否強制送達 當該引數為true時,且消費端佇列和交換機沒有對應的繫結路由時,訊息就無法發出,可通過Channel.NotifyReturn
處理這種被退回的訊息
第三個引數 immediate
是否理解接收 當該引數為true時,且匹配的消費端佇列沒有準備好接受此訊息時,訊息就無法發出,可通過Channel.NotifyReturn
處理這種被退回的訊息
import (
"context"
"log"
"strings"
"time"
amqp "https://www.cnblogs.com/amos01/p/github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func SendMsg(msg string) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"log_direct", // name
amqp.ExchangeDirect, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(ctx,
"log_direct", // exchange
severityFrom(msg), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", msg)
}
func severityFrom(msg string) string {
var s string
if strings.Contains(msg, "debug") {
s = "debug"
} else if strings.Contains(msg, "error") {
s = "error"
} else if strings.Contains(msg, "warn") {
s = "warn"
} else {
s = "info"
}
return s
}