RabbitMQ常見的訊息模型

2020-08-07 16:17:24

一、RabbitMQ

1、MQ介紹

       MQ(Message Queue):訊息佇列又叫訊息中介軟體。業務系統裏面可通過典型的生產者和消費者模型實現系統A和系統B之間的解耦。因爲訊息的生產和消費都是非同步的,而且雙方只關心訊息的發送和接收,中間沒有業務邏輯的侵入,可以輕鬆實現系統間的解耦。


2、不同MQ的特點

① ActiveMQ
       ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。它是一個完全支援JMS規範的的訊息中介軟體。豐富的API,多種叢集架構模式讓ActiveMQ在業界成爲老牌的訊息中介軟體,在中小型企業頗受歡迎!但是數據量一旦達到某個量級,效能就會很差

② Kafka
       Kafka主要特點是基於Pull的模式來處理訊息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支援複製,不支援事務,對訊息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的網際網路服務的數據收集業務(進行大數據的分析)

③ RocketMQ
       RocketMQ是阿裡開源的訊息中介軟體,它是純Java開發,具有高吞吐量、高可用性、適合大規模分佈式系統應用的特點。RocketMQ思路起源於Kafka,但並不是Kafka的一個Copy,它對訊息的可靠傳輸及事務性做了優化,目前在阿裡集團被廣泛應用於交易、充值、流計算、訊息推播、日誌流式處理、binglog分發等場景。

④ RabbitMQ
       RabbitMQ是使用Erlang語言開發的開源訊息佇列系統,基於AMQP協定來實現。AMQP的主要特徵是訊息導向、佇列、路由(包括對等和發佈/訂閱)、可靠性、安全。AMQP協定更多用在企業系統內對數據一致性、穩定性和可靠性要求很高的場景,對效能和吞吐量的要求還在其次。


3、安裝前置依賴Erlang

先執行下面 下麪的指令爲erlang設定好倉庫。

	curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash

	# 安裝erlang
	yum install erlang

Alt

出現了上面的錯誤,不急,先使用下面 下麪的指令安裝完deltarpm之後,再執行yum install erlang

	yum provides '*/applydeltarpm'    #檢視依賴包的位置
	yum -y  install deltarpm             #安裝命令

如下圖則erlang安裝成功。
Alt


可以使用下面 下麪指令檢視erlang是否成功安裝。

	erl

Alt
Alt

使用下面 下麪指令安裝socat。

	yum install socat

4、安裝RabbitMQ

安裝RabbitMQ。

	rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm 

如下圖則表示RabbitMQ安裝成功。

Alt


注意:高版本的RabbitMQ不帶組態檔,需要去github上面拷貝並放到/etc/rabbitmq目錄下面 下麪。

rabbitmq-server-3.8.5-1.el7.noarch.rpm包下載鏈接,提取碼:o9z3

rabbitmq的組態檔樣例下載鏈接,提取碼:q4tu


5、RabbitMQ常用指令

① 啓動rabbitmq中的外掛管理。

	rabbitmq-plugins enable rabbitmq_management

出現下圖則表示啓動成功。
Alt

② RabbitMQ服務的啓動、重新啓動與停止。

	# 啓動RabbitMQ服務
	systemctl start rabbitmq-server		
	
	# 重新啓動RabbitMQ服務
	systemctl restart rabbitmq-server		

	# 停止RabbitMQ服務
	systemctl stop rabbitmq-server		

③ 檢視RabbitMQ服務的狀態。

	systemctl status rabbitmq-server

下圖表示RbbitMQ處於執行狀態。

Alt


6、管理介面測試

啓用下面 下麪設定可以允許所有使用者存取,而不僅僅是本地存取。

Alt


開啓rabbitmq服務。

	service rabbitmq-server start 

開啓web管理介面。

	rabbitmq-plugins enable rabbitmq_management

使用IP:15672

Alt




二、RabbitMQ的訊息模型

       RabbitMQ中生產者和消費者機制 機製是這樣的:生產者負責生產訊息並將該訊息發送到交換機(需指定路由key),消費者根據路由key從交換機上找到對應的訊息佇列,從這個佇列中取訊息。

       注意:生產端和消費端都可以申明交換機或者佇列,只要所申明的屬性不變,RabbitMQ自己回去判斷是否一存在過,從而保證交換機或者佇列只建立一次。

Alt


1、Hello World訊息模型

       這個訊息模型不需要用到交換機,所以生產者在投遞訊息的時候,將路由key直接指定爲消費者要消費的佇列的名字即可。
Alt

       生產者程式碼:

	public class Producer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        String queueName = "hello world_queue";
	        //宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
	        //第一個參數表示佇列名,
	        //第二個參數表示佇列是否開啓持久化
	        //第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
	        //第四個參數表示是否自動刪除佇列
	        //第五個參數表示額外附加參數
	        channel.queueDeclare(queueName, false, false, false, null);
	
	        String msg = "Hello, I am rabbitMQ!";
	        //第一個參數表示交換機名字
	        //第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
	        //第三個參數表示傳遞訊息的額外設定
	        //第四個參數表示訊息的具體內容
	        channel.basicPublish("", queueName, null, msg.getBytes());
	
	        channel.close();
	        connection.close();
	    }
	}

       消費者程式碼:

	public class Consumer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        String queueName = "hello world_queue";
	        //宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
	        //第一個參數表示佇列名,
	        //第二個參數表示佇列是否開啓持久化
	        //第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
	        //第四個參數表示是否自動刪除佇列
	        //第五個參數表示額外附加參數
	        channel.queueDeclare(queueName, false, false, false, null);
	
	        //建立消費者
	        //第一個參數表示佇列名
	        //第二個參數表示是否開啓自動簽收訊息
	        //第三個訊息是消費時候的回撥介面
	        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                System.out.println("消費者消費了一條訊息: " + new String(body));
	            }
	        });
	    }
	}

2、Work Queues訊息模型

       這個模型和Hello World訊息模型本質是一樣的,只不過是佇列上面多了幾個消費者,這些消費者共同消費佇列中的這些訊息

Alt

       生產者程式碼:

	public class Producer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        String queueName = "work_queues";
	        //宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
	        //第一個參數表示佇列名,
	        //第二個參數表示佇列是否開啓持久化
	        //第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
	        //第四個參數表示是否自動刪除佇列
	        //第五個參數表示額外附加參數
	        channel.queueDeclare(queueName, false, false, false, null);
	
	        String msg = "Hello, I am rabbitMQ!";
	        //第一個參數表示交換機名字
	        //第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
	        //第三個參數表示傳遞訊息的額外設定
	        //第四個參數表示訊息的具體內容
	        for (int i = 0; i < 20; i++) {
	            channel.basicPublish("", queueName, null, msg.getBytes());
	        }
	
	        channel.close();
	        connection.close();
	    }
	}

       消費者1程式碼:

	public class Consumer1 {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        String queueName = "work_queues";
	        //宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
	        //第一個參數表示佇列名,
	        //第二個參數表示佇列是否開啓持久化
	        //第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
	        //第四個參數表示是否自動刪除佇列
	        //第五個參數表示額外附加參數
	        channel.queueDeclare(queueName, false, false, false, null);
	
	        //建立消費者
	        //第一個參數表示佇列名
	        //第二個參數表示是否開啓自動簽收訊息
	        //第三個訊息是消費時候的回撥介面
	        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                System.out.println("消費者1消費了一條訊息: " + new String(body));
	            }
	        });
	    }
	}

       消費者2程式碼:

	public class Consumer2 {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        String queueName = "work_queues";
	        //宣告通道對應的訊息佇列(表示建立佇列, 不管是消費者那宣告還是生產者那宣告都可以)
	        //第一個參數表示佇列名,
	        //第二個參數表示佇列是否開啓持久化
	        //第三個參數表示佇列是否開啓獨佔模式(獨佔表示一個佇列只能有一個channel監聽)
	        //第四個參數表示是否自動刪除佇列
	        //第五個參數表示額外附加參數
	        channel.queueDeclare(queueName, false, false, false, null);
	
	        //消費端限流, 一次只接收一條未確認的訊息
	        channel.basicQos(1);
	        //建立消費者
	        //第一個參數表示佇列名
	        //第二個參數表示是否開啓自動簽收訊息
	        //第三個訊息是消費時候的回撥介面
	        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                try {
	                    TimeUnit.SECONDS.sleep(1);
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	                //第一個參數是訊息的唯一識別符號, 第二份參數表示是否開啓批次簽收
	                channel.basicAck(envelope.getDeliveryTag(), false);
	                System.out.println("消費者2消費了一條訊息: " + new String(body));
	            }
	        });
	    }
	}

3、Publich/Subscribe訊息模型

       這個訊息模型類似廣播模式,生產者將訊息投遞給交換機,交換機不需要指定路由key,每個消費者有自己的佇列,交換機會把訊息發送給所有系結該交換機的佇列,只要消費者自己的訊息佇列系結的是同一個交換機,就可以實現一條訊息被多個消費者消費。交換機型別需要指定爲fanout

Alt

       生產者程式碼:

	public class Producer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	
	        channel.exchangeDeclare("publish subscribe_exchange", "fanout");
	
	        String msg = "Hello, I am rabbitMQ!";
	        //第一個參數表示交換機名字
	        //第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
	        //第三個參數表示傳遞訊息的額外設定
	        //第四個參數表示訊息的具體內容
	        channel.basicPublish("publish subscribe_exchange", "", null, msg.getBytes());
	
	        channel.close();
	        connection.close();
	    }
	}

       消費者1程式碼:

	public class Consumer1 {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        //建立臨時佇列, 每個消費者有自己的佇列
	        String queueName = channel.queueDeclare().getQueue();
	
	        //宣告交換機(表示建立交換機, 不管是消費者那宣告還是生產者那宣告都可以)
	        //第一個參數表示交換機的名字
	        //第二個參數表示交換機的型別
	        channel.exchangeDeclare("publish subscribe_exchange", "fanout");
	        //系結佇列和交換機, fanout即發佈訂閱模式不需要路由key
	        channel.queueBind(queueName, "publish subscribe_exchange", "");
	
	        //建立消費者
	        //第一個參數表示佇列名
	        //第二個參數表示是否開啓自動簽收訊息
	        //第三個訊息是消費時候的回撥介面
	        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                System.out.println("消費者1消費了一條訊息: " + new String(body));
	            }
	        });
	    }
	}

       消費者2程式碼:

	public class Consumer2 {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        //建立臨時佇列, 每個消費者有自己的佇列
	        String queueName = channel.queueDeclare().getQueue();
	
	        //宣告交換機(表示建立交換機, 不管是消費者那宣告還是生產者那宣告都可以)
	        //第一個參數表示交換機的名字
	        //第二個參數表示交換機的型別
	        channel.exchangeDeclare("publish subscribe_exchange", "fanout");
	        //fanout即發佈訂閱模式不需要路由key
	        channel.queueBind(queueName, "publish subscribe_exchange", "");
	
	        //建立消費者
	        //第一個參數表示佇列名
	        //第二個參數表示是否開啓自動簽收訊息
	        //第三個訊息是消費時候的回撥介面
	        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                System.out.println("消費者2消費了一條訊息: " + new String(body));
	            }
	        });
	    }
	}

4、Routing訊息模型

       生產者將訊息投遞到交換機同時指定路由key,消費者根據路由key從對應的訊息佇列中獲取訊息進行消費,這個模型必須當雙方的路由key相同時才能 纔能匹配到。
Alt

       生產者程式碼:

	public class Producer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        //宣告交換機
	        channel.exchangeDeclare("routing_exchange", "direct");
	
	        String msg = "Hello, I am rabbitMQ!";
	        //第一個參數表示交換機名字
	        //第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
	        //第三個參數表示傳遞訊息的額外設定
	        //第四個參數表示訊息的具體內容
	        channel.basicPublish("routing_exchange", "hello routing", null, msg.getBytes());
	
	        channel.close();
	        connection.close();
	    }
	}

       消費者程式碼:

	public class Consumer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        //建立臨時佇列
	        String queueName = channel.queueDeclare().getQueue();
	
	        //系結佇列和交換機
	        channel.queueBind(queueName, "routing_exchange", "hello routing");
	
	        //建立消費者
	        //第一個參數表示佇列名
	        //第二個參數表示是否開啓自動簽收訊息
	        //第三個訊息是消費時候的回撥介面
	        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                System.out.println("消費者消費了一條訊息: " + new String(body));
	            }
	        });
	    }
	}

5、Topics訊息模型

       這個訊息模型其實就是Routing訊息模型的升級版,消費者的路由key可以使用#表示匹配大於等於零個詞*表示匹配一個詞,其他都和第四種訊息模型一樣。
Alt

       生產者程式碼:

	public class Producer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        //宣告交換機
	        channel.exchangeDeclare("topics_exchange", "topic");
	
	        String msg = "Hello, I am rabbitMQ!";
	        //第一個參數表示交換機名字
	        //第二個參數表示路由key(當第一個參數爲空字串即交換機爲default時, 這個參數直接指向佇列名稱)
	        //第三個參數表示傳遞訊息的額外設定
	        //第四個參數表示訊息的具體內容
	        channel.basicPublish("topics_exchange", "user", null, msg.getBytes());
	
	        channel.close();
	        connection.close();
	    }
	}

       消費者程式碼:

	public class Consumer {
	    public static void main(String[] args) throws Exception {
	        Connection connection = RabbitMQUtils.getConnection();
	        Channel channel = connection.createChannel();
	
	        //建立臨時佇列
	        String queueName = channel.queueDeclare().getQueue();
	
	        //系結佇列和交換機, #表示匹配零個、一個或多個詞, *表示只匹配一個詞
	        channel.queueBind(queueName, "topics_exchange", "user.#");
	
	        //建立消費者
	        //第一個參數表示佇列名
	        //第二個參數表示是否開啓自動簽收訊息
	        //第三個訊息是消費時候的回撥介面
	        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
	                System.out.println("消費者消費了一條訊息: " + new String(body));
	            }
	        });
	    }
	}