大家好,我是Leo!今天來和大家分享RocketMQ的一些用法。
Producer: 用於生產訊息的執行實體。
Topic: 主題,用於訊息傳輸和儲存的分組容器。
MessageQueue: 訊息傳輸和儲存的實際單元容器。
Message: 訊息傳輸的最小單元。
ConsumerGroup: 消費者組。
Consumer: 消費者。
Subscription: 訂閱關係,釋出訂閱模式中訊息過濾、重試、消費進度的規則設定。
MQ的明顯優勢有3個。
應用解耦: 以多服務為例,使用者下單,需要通知訂單服務和庫存服務,我們可以通過MQ訊息來解除下單和庫存系統的耦合。
非同步提速: 以秒殺為例,我們可以先返回秒殺結果,後續再通過MQ非同步訊息去插入記錄和扣減庫存等,減少呼叫的鏈路長度。
削峰填谷: 將某一時間內的請求量分攤到更多時間處理,比如系統A一秒只能處理10000個請求,但是我有100000個請求需要處理,我可以將請求發到MQ中,再分成10秒去消費這些請求。
當然MQ也有劣勢,系統可用性降低,系統複雜度提高,一致性問題。
主要包括Producer、Broker、Consumer、NameServer Cluster。
可以通過設定不同的消費者組
不同組通過不同的消費者組既可以實現同時收到一樣數量的訊息,那同一個消費者組需要怎樣才能收到同樣數量的訊息呢?
// 消費者消費模式
consumer.setMessageModel(MessageModel.BROADCASTING);
預設是叢集模式CLUSTERING,設定成廣播模式
既可以實現一對多的傳送。
同步訊息需要阻塞等待訊息傳送結果的返回
public class ProducerDemo {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message();
message.setTopic("MQLearn");
message.setTags("1.0.0");
message.setBody("Hello MQ!".getBytes(StandardCharsets.UTF_8));
SendResult result = producer.send(message);
if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println(result);
System.out.println("傳送成功:" + message);
}
producer.shutdown();
}
}
非同步訊息需要實現傳送成功和失敗的回撥函數。
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.246.140:9876");
producer.start();
// 非同步訊息
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic("topic7");
message.setTags("1.0.0");
message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 傳送成功的回撥方法
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
// 傳送失敗的回撥方法
System.out.println(e);
}
});
}
TimeUnit.SECONDS.sleep(10);
System.out.println("非同步傳送完成!");
}
}
單向訊息就類似UDP,只顧單向傳送,不管是否傳送成功,常用於紀錄檔收集等場景。
public class SingleDirectionProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 單向訊息
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic("topic8");
message.setTags("1.0.0");
message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
}
System.out.println("帶向傳送完成!");
}
}
RocketMQ提供的定時訊息並不能指定在什麼時間點去投遞訊息。而是根據設定的等待時間,起到延時到達的緩衝作用在RocketMQ中,延時訊息的delayTimeLevel支援以下級別:
1 1s 2 5s 3 10s 4 30s 5 1m 6 2m 7 3m 8 4m 9 5m 10 6m 11 7m 12 8m 13 9m 14 10m 15 20m 16 30m 17 1h 18 2h
// 設定訊息延時級別
message.setDelayTimeLevel(3);
批次訊息支援一次傳送多條訊息。
注意:
批次訊息需要有相同的topic
不能是延時訊息
訊息內容不能超過4M,可以通過producer.setMaxMessageSize()和broker進行設定設定(可以通過拆分多次傳送)
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
List<Message> list = new ArrayList<>();
// 批次訊息
for (int i = 0; i < 10; i++) {
Message message = new Message();
message.setTopic("topic10");
message.setTags("1.0.0");
message.setBody(("Hello World !" + i).getBytes(StandardCharsets.UTF_8));
list.add(message);
}
SendResult result = producer.send(list);
System.out.println(result);
TimeUnit.SECONDS.sleep(2);
System.out.println("傳送完成!");
}
}
順序訊息支援按照訊息的傳送訊息先後獲取訊息。
比如:我的一筆訂單有多個流程需要處理,比如建立->付款->推播->完成。
通過同一筆訂單放到一個佇列中,這樣就可以解決消費的無序問題。
通過實現MessageQueueSelector來選擇一個佇列。
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message();
// 模擬業務ID
int step = 10;
message.setTopic("topic12");
message.setTags("1.0.0");
message.setBody(("Hello World !").getBytes(StandardCharsets.UTF_8));
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 佇列數
int size = mqs.size();
// 取模
int orderId = step;
return mqs.get(orderId % size);
}
}, null);
System.out.println("傳送完成!");
}
}
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic12", "*");
// 消費者,起一個順序監聽,一個執行緒,只監聽一個佇列
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("消費者啟動了!");
}
}
RocketMQ中的事務訊息支援在分散式場景下訊息生產和本地事務的最終一致性。
大致流程為,
生產者先將訊息傳送至RocketMQ。
RocketMQBroker將訊息持久化成功後,向生產者返回ACK訊息確認已經返回成功,訊息狀態為暫時不能投遞狀態。
執行本地事務邏輯。
生產者根據事務執行結果向Broker提交commit或者rollback結果。
如果在斷網或者重啟情況下,未收到4的結果,或者返回Unknown未知狀態,在固定時間對訊息進行回查。
生產者收到訊息回查後,需要本地事務執行的最終結果。
生產者對本地事務狀態進行二次提交或確認。
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
// 設定事務監聽
producer.setTransactionListener(new TransactionListener() {
// 正常事務監聽
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 把訊息儲存到mysql資料庫
boolean ok = false;
if (ok) {
System.out.println("正常執行事務過程");
return LocalTransactionState.COMMIT_MESSAGE;
} else {
System.out.println("事務補償過程");
return LocalTransactionState.UNKNOW;
//return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事務補償事務
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("事務補償過程");
// sql select
if (true) {
} else {
}
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
String msg = "Hello Transaction Message!";
Message message = new Message("topic13", "tag", msg.getBytes(StandardCharsets.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
TimeUnit.SECONDS.sleep(2);
System.out.println(transactionSendResult);
System.out.println("傳送完成!");
}
}
在RocketMQ中的訊息過濾功能能通過生產者和消費者對訊息的屬性和Tag進行定義,在消費端可以根據過濾條件進行篩選匹配,將符合條件的訊息投遞給消費者進行消費。
支援兩種方式:Tag標籤過濾和SQL屬性過濾。
Message message = new Message();
message.setTopic("topic11");
message.setTags("tag");
message.setBody(("Hello World !" + "tag").getBytes(StandardCharsets.UTF_8));
message.putUserProperty("name", "zhangsan");
message.putUserProperty("age", "16");
SendResult result = producer.send(message);
subscribe方法subExpression引數也支援Tag過濾
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup("group1");
consumer.setNamesrvAddr("112.74.125.184:9876");
consumer.subscribe("topic11", MessageSelector.bySql("age > 16"));
在SpringBoot專案中主要通過RocketMQTemplate進行訊息的傳送。
// 普通訊息
rocketMQTemplate.convertAndSend("topic10", user);
rocketMQTemplate.send("topic10", MessageBuilder.withPayload(user).
SendResult result = rocketMQTemplate.syncSend("topic10", user);
// 非同步訊息
rocketMQTemplate.asyncSend("topic10", user, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("成功!");
}
@Override
public void onException(Throwable e) {
System.out.println(e);
}
}, 1000L);
// 單向訊息
rocketMQTemplate.sendOneWay("topic10", user);
// 延時訊息
rocketMQTemplate.syncSend("topic10", MessageBuilder.withPayload(user).build(), 2000L, 3);
// 批次訊息
rocketMQTemplate.syncSend("topic10", list, 1000);
消費者:在註解中可以實現根據Tag和SQL進行屬性的過濾。
@Service
//@RocketMQMessageListener(
// consumerGroup = "group1",
// topic = "topic10",
// selectorExpression = "tag1 || tag2"
//)
@RocketMQMessageListener(
consumerGroup = "group1",
topic = "topic10",
selectorType = SelectorType.SQL92,
selectorExpression = "age > 16",
messageModel = MessageModel.BROADCASTING
)
public class UserConsumer implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
}
}
今天主要分享了一下RocketMQ的一些基礎使用,包括各種型別的訊息的使用,偏向於程式碼實現部分,對於原理篇沒有過多涉及。