RabbitMQ是由erlang語言開發,基於AMQP(Advanced Message Queue 高階訊息佇列協定)協定實現的訊息佇列,它是一種應用程式之間的通訊方法,訊息佇列在分佈式系統開發中應用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/。
一般開發中應用場景如下:
將不需要同步處理的並且耗時長的操作由訊息佇列通知訊息接收方進行非同步處理。提高了應用程式的響應時間。
MQ相當於一箇中介,生產方通過MQ與消費方互動,它將應用程式進行解耦合。
下圖是RabbitMQ的基本結構:
組成部分說明如下:
訊息發佈接收流程:
-----發送訊息-----
----接收訊息-----
RabbitMQ由Erlang語言開發,Erlang語言用於併發及分佈式系統的開發,在電信領域應用廣泛,OTP(Open
Telecom Platform)作爲Erlang語言的一部分,包含了很多基於Erlang開發的中介軟體及工具庫,安裝RabbitMQ需
要安裝Erlang/OTP,並保持版本匹配:
RabbitMQ的下載地址:http://www.rabbitmq.com/download.html
本部落格演示使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本
1、下載erlang
http://erlang.org/download/otp_win64_20.3.exe
安裝:以管理員方式執行此檔案,一步步安裝就好,erlang安裝完成記得在系統設定erlang環境變數。
2、安裝RabbitMQ
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3
下載,安裝。
3、啓動RabbitMQ
安裝後在計算機服務中就會出現rabbitmq,而且已經啓動,也可以選擇進入安裝目錄下sbin目錄手動啓動:
rabbitmq-service.bat install 安裝服務 rabbitmq-service.bat stop 停止服務 rabbitmq-service.bat start 啓動服務
安裝管理外掛(方便在瀏覽器端管理RabbitMQ):
在sbin目錄開啓cmd以管理員方式,執行 rabbitmq-plugins.bat enable rabbitmq_management
啓動登入:
進入瀏覽器,輸入:http://localhost:15672,初始賬號和密碼:guest/guest
4、注意事項
1、安裝erlang和rabbitMQ以管理員身份執行。
2、當解除安裝重新安裝時會出現RabbitMQ服務註冊失敗,此時需要進入註冊表清理erlang
搜尋RabbitMQ、ErlSrv,將對應的項全部刪除。
RabbitMQ有以下幾種工作模式 :
工作佇列模式:
應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度。
測試:
1、使用入門程式,啓動多個消費者。
2、生產者發送多個訊息。
結果:
1、一條訊息只會被一個消費者接收;
2、rabbit採用輪詢的方式將訊息是平均發送給消費者的;
3、消費者在處理完某條訊息後,纔會收到下一條訊息。
發佈訂閱模式:
1、每個消費者監聽自己的佇列。
2、生產者將訊息發給broker,由交換機將訊息轉發到系結此交換機的每個佇列,每個系結交換機的佇列都將接收
到訊息發佈訂閱模式比工作佇列模式更強大,發佈訂閱模式可以指定自己專用的交換機
路由模式:
1、每個消費者監聽自己的佇列,並且設定routingkey。
2、生產者將訊息發給交換機,由交換機根據routingkey來轉發訊息到指定的佇列Routing模式和Publish/subscibe有啥區別?
Routing模式要求佇列在系結交換機時要指定routingkey,訊息會轉發到符合routingkey的佇列。
與路由模式一致,採用萬用字元方式,佇列系結交換機指定萬用字元。它可以實現Routing、publish/subscirbe模式的功能。
header模式與routing不同的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配佇列。
RPC即用戶端遠端呼叫伺服器端的方法 ,使用MQ可以實現RPC的非同步呼叫,基於Direct交換機實現,流程如下:
1、用戶端即是生產者就是消費者,向RPC請求佇列發送RPC呼叫訊息,同時監聽RPC響應佇列。
2、伺服器端監聽RPC請求佇列的訊息,收到訊息後執行伺服器端的方法,得到方法返回的結果
3、伺服器端將RPC方法 的結果發送到RPC響應佇列
4、用戶端(RPC呼叫方)監聽RPC響應佇列,接收到RPC呼叫結果。
<!--使用spring-boot-starter-amqp會自動新增spring-rabbit依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
1、設定application.yml:
server:
port: 30000
spring:
application:
name: test‐rabbitmq‐producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
2、定義RabbitConfig類,設定Exchange、Queue、及系結交換機:
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
/**
* 交換機設定
* ExchangeBuilder提供了fanout、direct、topic、header交換機型別的設定
* @return the exchange
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,訊息佇列重新啓動後交換機仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//宣告佇列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
Queue queue = new Queue(QUEUE_INFORM_SMS);
return queue;
}
//宣告佇列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
return queue;
}
/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
* 系結佇列到交換機 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
}
3、編寫測試RabbitMQ生產端:
使用RarbbitTemplate發送訊息:
import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendByTopics(){
for (int i=0;i<5;i++){
String message = "sms email inform to user"+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,
"inform.sms.email",message);
}
}
}
4、編寫測試RabbitMQ消費端:
新增Maven依賴同上,設定application.yml
import com.rabbitmq.client.Channel;
import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveHandler {
//監聽email佇列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg,Message message,Channel channel){
System.out.println(msg);
}
//監聽sms佇列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg,Message message,Channel channel){
System.out.println(msg);
}
}