開源AMQP實現,Erlang語言編寫,支援多種用戶端
分佈式、高可用、持久化、可靠、安全
支援多種協定:AMQP、STOMP、MQTT、HTTP
適用於多系統之間的業務解耦的訊息中介軟體
1、exchange:交換器,負責接收訊息,轉發訊息至系結的佇列,有四種類型:
direct:完全匹配的路由
topic:模式匹配的路由
fanout:廣播模式
headers:鍵值對匹配路由
Exchange屬性:
持久化:如果啓用,那麼rabbit服務重新啓動之後仍然存在
自動刪除:如果啓用,那麼交換器將會在其系結的佇列都被刪除掉之後自動刪除掉自身
2、Queue:佇列,rabbitmq的內部物件,用於儲存訊息,其屬性類似於Exchange,同樣可以設定是否持久化、自動刪除等。
消費者從Queue中獲取訊息並消費。多個消費者可以訂閱同一個Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。
3、Binding:系結,根據路由規則系結交換器與佇列
4、Routing:路由鍵,路由的關鍵字
Message acknowledgment:訊息確認,在訊息確認機制 機製下,收到回執纔會刪除訊息,未收到回執而斷開了連線,訊息會轉發給其他消費者,如果忘記回執,會導致訊息堆積,消費者重新啓動後會重複消費這些訊息並重復執行業務邏輯。
Message durability:訊息持久化,設定訊息持久化可以避免絕大部分訊息丟失,比如rabbitmq服務重新啓動,但是採用非持久化可以提升佇列的處理效率。如果要確保訊息的持久化,那麼訊息對應的Exchange和Queue同樣要設定爲持久化。
Prefetch count,每次發送給消費者訊息的數量,預設爲1
另外,如果需要可靠性業務,需要設定持久化和ack機制 機製,如果系統高吞吐,可以設定爲非持久化、noack、自動刪除機制 機製。
模擬這樣一個業務場景,使用者下單成功後,需要給使用者增加積分,同時還需要給使用者發送下單成功的訊息,這是在電商業務中很常見的一個業務場景。
如果系統是微服務架構,可能使用者下單功能在訂單服務,給使用者增加積分的功能在積分服務,給使用者發送通知訊息的功能在通知服務,各個服務之間解耦,互不影響。那麼要實現上述的業務場景,訊息中介軟體rabbitmq是一個很好的選擇。
原因如下:
高效能,它的實現語言是天生具備高併發高可用的erlang 語言
支援訊息的持久化,即使伺服器掛了,也不會丟失訊息
訊息應答(ack)機制 機製,消費者消費完訊息後發送一個訊息應答,rabbitmq纔會刪除訊息,確保訊息的可靠性
支援高可用叢集
靈活的路由
實現思路:
使用者下單成功後,rabbitmq發送一條訊息至EXCHANGE.ORDER_CREATE
交換器,該交換器系結了兩個佇列,QUEUE.ORDER_INCREASESCORE
、QUEUE.ORDER_NOTIFY
,消費者訂閱這兩個佇列分別用來處理增加積分、發送使用者通知。如果後續日誌系統還需要記錄下單的相關日誌,那麼我們只需要再定義一個佇列並將其系結到EXCHANGE.ORDER_CREATE
即可。
下單發rabbitmq訊息
package com.robot.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* @author: 會跳舞的機器人
* @date: 2017/10/13 10:46
* @description: 模擬使用者下單之後發送rabbitmq訊息
*/
public class OrderCreator {
// 交換器名稱
private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
// 訊息內容
private static String msg = "create order success";
/**
* 模擬建立訂單後發送mq訊息
*/
public void createOrder() {
System.out.println("下單成功,開始發送rabbitmq訊息");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.12.44");
connectionFactory.setPort(56720);
connectionFactory.setUsername("baibei");
connectionFactory.setPassword("baibei");
Connection connection;
Channel channel;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 持久化
boolean durable = true;
// topic型別
String type = "topic";
// 宣告交換器,如果交換器不存在則建立之
channel.exchangeDeclare(EXCHANGE, type, durable);
String messgeId = UUID.randomUUID().toString();
// deliveryMode>=2表示設定訊息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
// 發佈訊息
String routingKey = "order_create";
channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
積分系統訂閱訊息
package com.robot.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: 會跳舞的機器人
* @date: 2017/10/13 16:02
* @description: rabbitmq消費者,模擬下單成功後給使用者增加積分
*/
public class IncreaseScoreConsumer implements Consumer {
private Connection connection;
private Channel channel;
// 交換器名稱
private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
// 增加積分佇列名稱
private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";
public void consume() {
// 初始化rabbitmq連線資訊
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.12.44");
connectionFactory.setPort(56720);
connectionFactory.setUsername("baibei");
connectionFactory.setPassword("baibei");
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 宣告交換器
channel.exchangeDeclare(EXCHANGE, "topic", true);
// 宣告佇列
channel.queueDeclare(QUEUENAME, true, false, false, null);
// 交換器與佇列系結並設定routingKey
channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
// 消費訊息,callback是該類,關閉自動確認訊息,在完成業務邏輯後手動確認確認
channel.basicConsume(QUEUENAME, false, this);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("《積分系統》收到訂單訊息:" + msg + ",給使用者增加積分......");
// 手動確認訊息
channel.basicAck(envelope.getDeliveryTag(), false);
/**
* channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉佇列中的這條訊息
* channel.basicReject(envelope.getDeliveryTag(), true);該方法會把訊息重新放回佇列
* 一般系統會設定一個重試次數,如果超過重試次數,則會丟棄訊息,反之則會把訊息再放入佇列
*/
}
public void handleConsumeOk(String consumerTag) {
}
public void handleCancelOk(String consumerTag) {
}
public void handleCancel(String consumerTag) throws IOException {
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
public void handleRecoverOk(String consumerTag) {
}
}
通知系統訂閱訊息
package com.robot.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author: 會跳舞的機器人
* @date: 2017/10/13 16:20
* @description: rabbitmq消費者,模擬下單成功後給使用者發送通知
*/
public class NotifyConsumer implements Consumer {
private Connection connection;
private Channel channel;
// 交換器名稱
private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
// 通知使用者下單成功通知佇列名稱
private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";
public void consume() {
// 初始化rabbitmq連線資訊
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.12.44");
connectionFactory.setPort(56720);
connectionFactory.setUsername("baibei");
connectionFactory.setPassword("baibei");
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 宣告交換器
channel.exchangeDeclare(EXCHANGE, "topic", true);
// 宣告佇列
channel.queueDeclare(QUEUENAME, true, false, false, null);
// 交換器與佇列系結並設定routingKey
channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
// 消費訊息,callback是該類,關閉自動確認訊息,在完成業務邏輯後手動確認確認
channel.basicConsume(QUEUENAME, false, this);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("《通知系統》收到訂單訊息:" + msg + ",開始給使用者發送通知......");
// 手動確認訊息
channel.basicAck(envelope.getDeliveryTag(), false);
/**
* channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉佇列中的這條訊息
* channel.basicReject(envelope.getDeliveryTag(), true);該方法會把訊息重新放回佇列
* 一般系統會設定一個重試次數,如果超過重試次數,則會丟棄訊息,反之則會把訊息再放入佇列
*/
}
public void handleConsumeOk(String consumerTag) {
}
public void handleCancelOk(String consumerTag) {
}
public void handleCancel(String consumerTag) throws IOException {
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
}
public void handleRecoverOk(String consumerTag) {
}
}
測試
package com.robot.rabbitmq;
/**
* @author: 會跳舞的機器人
* @date: 2017/10/13 16:27
* @description:
*/
public class Test {
public static void main(String[] args) {
IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
increaseScoreConsumer.consume();
NotifyConsumer notifyConsumer = new NotifyConsumer();
notifyConsumer.consume();
OrderCreator orderCreator = new OrderCreator();
for (int i = 0; i < 3; i++) {
orderCreator.createOrder();
}
}
}
輸出:
下單成功,開始發送rabbitmq訊息
《積分系統》收到訂單訊息:create order success,給使用者增加積分......
《通知系統》收到訂單訊息:create order success,開始給使用者發送通知......
下單成功,開始發送rabbitmq訊息
《積分系統》收到訂單訊息:create order success,給使用者增加積分......
《通知系統》收到訂單訊息:create order success,開始給使用者發送通知......
下單成功,開始發送rabbitmq訊息
《積分系統》收到訂單訊息:create order success,給使用者增加積分......
《通知系統》收到訂單訊息:create order success,開始給使用者發送通知...