RabbitMQ系列-Exchange介紹

2023-05-27 21:00:33

RabbitMQ系列

RabbitMQ系列-概念及安裝 

 

 

1. Exchange

RabbitMQ系列-概念及安裝 提到AMQP 0-9-1協定預設支援四種exchange,分別是Direct ExchangeFanout ExchangeTopic ExchangeHeaders Exchange

除了交換型別之外,交換還宣告了許多屬性

  • Name,交換機名稱,唯一的
  • Durability,永續性,RabbitMQ Server重啟後依舊存在
  • Auto-delete,自動刪除,沒有佇列繫結到交換機時,交換機自動刪除
  • Arguments,可選引數, 用於外掛和一些特定功能

 

1.1 Direct Exchange

Direct Exchange根據路由資訊將訊息送到指定佇列

工作流程如下

  1. 訊息佇列繫結到Direct Exchange,並指定路由字串K,該Direct Exchange名稱為E
  2. 當具有路由鍵R的新訊息到達交換機E時,如果K = R,則交換機將訊息副本拷貝到佇列
  3. 繼續遍歷剩餘繫結到交換機E的佇列,如果K = R,則交換機將訊息副本拷貝到佇列

Direct Exchange模型如下圖所示

佇列queue1和direct exchange的繫結路由有info和warn,類似的queue2和direct exchange的繫結路由有debug,queue3和direct exchange的繫結路由有error

當訊息生產者producer釋出路由值為info或者warn的訊息時,根據繫結關係,該訊息將被送到queue1,並被consumer1接收處理

同理,當訊息生產者producer釋出路由值為debug的訊息時,根據繫結關係,該訊息將被送到queue2,並被consumer2接收處理

當訊息生產者producer釋出路由值為error訊息時,根據繫結關係,該訊息將被送到queue3,並被consumer3接收處理

 

1.2 Fanout Exchange

Fanout Exchange忽略路由且將訊息副本推播到所有繫結到該交換機的佇列,假設有N個佇列繫結到Fanout Exchange,生產者傳送到訊息經過該交換機處理,將訊息副本傳送到這個N個佇列。

因此Fanout Exchange適用於廣播的場景,Fanout Exchange模型如下圖所示

佇列queue1、queue2、queue3均繫結到了fanout型別的交換機,訊息生產者producer釋出的訊息將被fanout exchange分發到queue1、queue2、queue3,最後被各自的消費者消費。

 

1.3 Topic Exchange

topic exchange對訊息"分發範圍"介於direct exchange和fanout exchange之間,direct exchage要求訊息的路由鍵和佇列的繫結路由鍵完全一致才分發,fanout exchange將訊息分發到所有具有繫結關係的佇列上

一般情況下,topic exchange的路由鍵由用英文逗號隔開的多個單詞構成。其中,有兩個單詞比較特殊,*可以代表任意的一個單詞,#可以代表0個或多個單詞

假設,有路由鍵<地區.新聞種類.子種類>的新聞分發系統,系統模型如下圖所示

其中,佇列queue1和topic交換機的繫結關係有兩個,<us.sport.*>表示關注美國地區所有體育主題相關的訊息,<*.food.apple>表示關注所有地區關於蘋果這種水果主題的訊息

佇列queue1與topic交換機繫結路由鍵<cn.car.byd>表示關注中國地區下汽車類主題下關於比亞迪的訊息。

佇列queue3與topic交換機的繫結關係為<#.huawei>表示關注所有地區關於華為的所有主題的訊息。

 

1.4 Headers Exchange

headers交換機忽略路由鍵,利用x-match引數和多個可選的headers鍵值對引數來路由訊息。x-match有兩種型別值allany

x-math=all時,所有的headers鍵值對引數需要全部匹配,當x-math=any時,只需要headers鍵值對引數中的一個匹配即可

假設,有學生資訊訂閱系統使用的時headers型別的交換機,模型如下圖所示

其中,佇列queue1和headers交換機的繫結關係的x-math=any,鍵值對引數為age=18height=170,因此當生產者釋出的訊息包含age=18height=170時,訊息將被路由到queue1

佇列queue2和headers交換機的繫結關係的x-math=all,鍵值對引數為age=22height=180,因此當生產者釋出的訊息包含age=22height=180時,訊息將被路由到queue2

佇列queue3和headers交換機的繫結關係的x-math=all,鍵值對引數為gender=malescore=60,因此當生產者釋出的訊息包含gender=malescore=60時,訊息將被路由到queue3

amqp091-go為例,使用Direct Exchange說明Direct Exchange的基本使用方法。

 

2. 消費者程式碼

amqp091-go為例,使用Direct Exchange說明訊息者端的基本流程。

2.1 連線到RabbitMQ Server

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

Dial接收AMQP URI格式的字串,建立和RabbitMQ Server的TCP連線,並返回連線Connection。TCP握手的超時時間預設為30s

 

2.2 建立輕量級連線Channel

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

通過和RabbitMQ Server一次網路往返互動,建立一個唯一的輕量級連線Connection

Channel不在需要時,需要手動呼叫Channel.Close關閉Channel,以釋放Channel佔用的資源,避免記憶體漏失

Channel所屬的Connection關閉時,Channel也會被關閉。

 

2.3 宣告交換機Exchange

err = ch.ExchangeDeclare(
    "log_direct",           // name
    amqp.ExchangeDirect,    // type
    true,                   // durable
    false,                  // auto-deleted
    false,                  // internal
    false,                  // no-wait
    nil,                    // arguments
)
failOnError(err, "Failed to declare an exchange")

生產者釋出的訊息會先到達Exchange,在根據Exchange型別和繫結關係將訊息路由到特定佇列。

ExchangeDeclare共有6個引數,這裡重點看下其中幾個引數

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error

第二個引數type/kind 型別,AMQP 0-9-1 broker提供了四種型別,分別是directfanouttopicheaders,這裡使用的是direct

第三個引數durable 是否持久化,第四個引數autoDelete 是否自動刪除

  • 當持久化且不自動刪除時,當RabbitMQ重啟或者沒有佇列繫結時,Exchange依舊存在
  • 當非持久化且自動刪除時,當RabbitMQ重啟或者Exchange沒有佇列繫結時,Exchange會自動刪除
  • 當非持久化且不自動刪除時,當RabbitMQ重啟後,Exchange會消失,當Exchange沒有佇列繫結時,Exchange會存在。即RabbitMQ不重啟,Exchange就會一直存在
  • 當持久化且自動刪除時,當RabbitMQ重啟後,Exchange依舊存在,但當Exchange沒有佇列繫結時,Exchange會被刪除

第六個引數noWait 是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose非同步處理異常。

 

2.4 宣告佇列Queue

q, err := ch.QueueDeclare(
    "",    // name
    false, // durable
    false, // delete when unused
    true,  // exclusive
    false, // no-wait
    nil,   // arguments
)
failOnError(err, "Failed to declare a queue")

佇列Queue充當了Exchange和消費者之間緩衝區的角色

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

如果佇列不存在則建立,如果存在時需要確保引數和已經存在的Queue一致,否則會返回錯誤

name為空時,RabbitMQ Server會生成唯一的名稱,並返回給q

第二個引數durable 是否持久化,第三個引數autoDelete 是否自動刪除

  • 當持久化且不自動刪除時,當RabbitMQ重啟或者沒有與消費者的繫結關係時,Queue依舊存在,只有持久化的Exchange才能宣告這種Queue
  • 當非持久化且自動刪除時,當RabbitMQ重啟或者沒有消費者時,Queue會自動刪除,只有非持久化的Exchange才能宣告這種Queue
  • 當非持久化且不自動刪除時,當RabbitMQ重啟後,Queue會消失,當沒有消費者時,Queue依舊存在。即RabbitMQ不重啟,Queue就會一直存在,只有非持久化的Exchange才能宣告這種Queue
  • 當持久化且自動刪除時,當RabbitMQ重啟後,Queue依舊存在,但當沒有消費者時,Queue會被刪除

第四個引數exclusive 是否獨佔,當該引數為true時,該佇列只能被宣告這個Queue的Connection存取,並且在Connection關閉時,佇列會被刪除

第五個引數noWait是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose非同步處理異常。

QueueDeclare返回錯誤時,說明Queue建立失敗,同時Channel也會被關閉

 

2.5 繫結關係Binding

err = ch.QueueBind(
    q.Name,         // queue name
    s,              // routing key
    "log_direct",   // exchange
    false,          // noWait
    nil             // args
)
failOnError(err, "Failed to bind a queue")

 使用路由建立交換機和佇列的繫結關係,可以使用多個路由建立交換機和佇列的繫結關係,交換機根據路由判斷是否將訊息推播到佇列

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error 

第一個引數是佇列名稱name,第三個引數是交換機名稱exchange,第二個引數時佇列和交換機繫結關係的表示

第三個引數noWait是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose非同步處理異常。

當建立繫結關係QueueBind失敗時,會返回錯誤並且Channel會被關閉。

 

2.6 Consume引數說明

msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer tag
    true,   // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
)
failOnError(err, "Failed to register a consumer")

開始接受來自佇列的訊息

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

Channel.Consume返回<-chan Delivery,消費者不斷從需要該Channel上接受訊息,需要注意的是,消費者需要及時處理訊息,否則將阻塞Channel所屬Connection上的任何操作

第三個引數 autoAck 是否自動向確認RabbitMQ確認成功投遞 當該引數為true時,寫入TCP通訊端即向abbitMQ確認成功投遞。當該引數為false,則需要消費者手動發出確認資訊,即呼叫Delivery.Ack

第四個引數exclusive 是否獨佔,當該引數為true時,消費者獨佔該佇列,當該引數為false是,RabbitMQ Server將在多個消費者之間公平地分配交付

第五個引數noWait是否等待伺服器的確認應答,當該引數no-wait為true是,應當給通過Channel.NotifyClose非同步處理異常。

 

2.7 消費者彙總程式碼

檢視程式碼
func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

// routingKeys 繫結的路由: debug info warning error
func RecvMsg(routingKeys []string) {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "log_direct",        // name
        amqp.ExchangeDirect, // type
        true,                // durable
        false,               // auto-deleted
        false,               // internal
        false,               // no-wait
        nil,                 // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    if len(routingKeys) < 1 {
        log.Printf("Usage: %s [info] [warning] [error]", routingKeys)
        os.Exit(0)
    }
    for _, s := range routingKeys {
        log.Printf("Binding queue %s to exchange %s with routing key %s",
            q.Name, "fruit_direct", s)
        err = ch.QueueBind(
            q.Name,       // queue name
            s,            // routing key
            "log_direct", // exchange
            false,
            nil)
        failOnError(err, "Failed to bind a queue")
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

 

3. 生產者程式碼

同樣以amqp091-go為例,說明訊息生產者端的基本流程

3.1 建立連線

和消費端一樣,需要通過amqp.Dial建立TCP連線,通過Connection.Channel建立一個輕量級連線

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

 

3.2 宣告交換機Exchange

和消費端一樣需要宣告交換機Exchange,需要注意的是,生產者和消費者都宣告了相同名稱的Exchange,需要保持兩者的引數是一致的,否則會報錯

err = ch.ExchangeDeclare(
    "log_direct",           // name
    amqp091.ExchangeDirect, // type
    true,                   // durable
    false,                  // auto-deleted
    false,                  // internal
    false,                  // no-wait
    nil,                    // arguments
)
failOnError(err, "Failed to declare an exchange")

 

3.3 釋出訊息

err = ch.PublishWithContext(ctx,
    "log_direct",      // exchange
    severityFrom(msg), // routing key
    false,             // mandatory
    false,             // immediate
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(msg),
    })
failOnError(err, "Failed to publish a message")

採用非同步的方式將訊息傳送到RabbitMQ server到交換機

第三個引數 mandatory 是否強制送達 當該引數為true時,且消費端佇列和交換機沒有對應的繫結路由時,訊息就無法發出,可通過Channel.NotifyReturn處理這種被退回的訊息

第三個引數 immediate 是否理解接收 當該引數為true時,且匹配的消費端佇列沒有準備好接受此訊息時,訊息就無法發出,可通過Channel.NotifyReturn處理這種被退回的訊息

 

3.4 生產者彙總程式碼

檢視程式碼
import (
    "context"
    "log"
    "strings"
    "time"

    amqp "https://www.cnblogs.com/amos01/p/github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func SendMsg(msg string) {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "log_direct",        // name
        amqp.ExchangeDirect, // type
        true,                // durable
        false,               // auto-deleted
        false,               // internal
        false,               // no-wait
        nil,                 // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    err = ch.PublishWithContext(ctx,
        "log_direct",      // exchange
        severityFrom(msg), // routing key
        false,             // mandatory
        false,             // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(msg),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s", msg)
}

func severityFrom(msg string) string {
    var s string
    if strings.Contains(msg, "debug") {
        s = "debug"
    } else if strings.Contains(msg, "error") {
        s = "error"
    } else if strings.Contains(msg, "warn") {
        s = "warn"
    } else {
        s = "info"
    }
    return s
}