訊息中介軟體--JMS--ActiveMQ--01

2020-08-12 18:35:51

訊息中介軟體–JMS–ActiveMQ–01

1、什麼是MQ

​ MQ全稱Message Queue,訊息佇列的意思,是訊息導向中介層的一種(message-oriented middleware),它底層使用Queue儲存訊息,遵循先進先出的原則。系統與系統之間進行資訊交流時,可以把訊息發送到MQ中,然後由MQ完成訊息的推播,訊息佇列可以在分佈式環境下提供應用解耦,彈性伸縮,冗餘儲存、流量削峯,非同步通訊,數據同步等功能。

​ 大致的過程是這樣的:
​ 發送者把訊息發送給訊息伺服器,訊息伺服器將訊息存放在若幹佇列/主題topic中,在合適的時候,訊息伺服器回將訊息轉發給接受者。在這個過程中,發送和接收是非同步的,也就是發送無需等待,而且發送者和接受者的生命週期也沒有必然的關係;尤其在發佈pub/訂閱sub模式下,也可以完成一對多的通訊,即讓一個訊息有多個接受者。

2、爲什麼使用MQ

​ 以電商系統舉個例子:
在这里插入图片描述

​ 在沒有MQ情況下,當使用者完成下訂單成功之後,訂單系統需要呼叫支付系統完成支付,還需要呼叫倉儲系統來安排物流,一個下訂單的操作可能需要級聯的呼叫許多個微服務,當某一時刻訂單系統的併發量急劇增大,比如秒殺活動,此時因爲後臺要呼叫的服務較多,造成訂單系統的整體負荷急劇增加,最終可能會導致訂單系統的崩潰。

​ 那麼,我們可不可以把倉儲系統及之後的微服務剝離開來,因爲使用者訂單下發成功之後是不用馬上就安排物流配送的,整個物流鏈路可以與訂單系統非同步執行,只需要訂單系統發送過來一個確認訊息即可。

​ 這種情況下就可以引入MQ,當使用者下訂單成功之後,訂單系統向MQ發送一個訊息,通知倉儲系統安排物流配送,MQ不必等待倉儲系統鏈路的完成,只需要接着處理支付系統鏈路的業務即可,這樣可以很大程度的減輕訂單系統乃至整個應用的壓力。

​ 那麼,我們可不可以繞過MQ,直接把訊息發送給倉儲系統呢?

​ 原理上是行得通的,但是這是就要求倉儲系統和訂單系統必須同時線上,否則訊息是無法發送成功的。並且這樣一來,上下遊的耦合度較高,如果後續增加了一個XX系統,也需要接收訂單系統的訊息,此時就需要修改訂單系統發送訊息的程式碼,不符合開閉原則。

​ 而加入了訊息中介軟體之後,上下遊之間的資訊交流就不用直接耦合了,訊息生產者把訊息發送給MQ,MQ在合適的時機把訊息推播給訊息消費者,消費者和生產者之間是非同步進行的,並且當大流量來襲時,消費者可以根據自身的能力消費MQ中的資訊,不會被大流量沖垮 衝垮,達到流量削峯的目的。

​ 總的來講,在分佈式系統中引入MQ有以下幾點好處:

系統解耦

​ 當新的模組接進來時,可以做到程式碼改動最小;因爲訊息生產者是把訊息發送到一條虛擬的通道(主題或者佇列)上的,當有新的模組接入時,只需要在新的模組中設定要監聽/訂閱的通道即可,原來的上遊模組是不需要進行改動的。

執行非同步

​ MQ採用非同步處理模式,通過梳理服務之間的強弱依賴關係,將非關鍵呼叫鏈路的操作非同步化,提升整體系統的吞吐能力;訊息發送者可以發送一個訊息而無須等待響應。訊息發送者將訊息發送到一條虛擬的通道(主題或者佇列)上;訊息接收者則訂閱或者監聽該愛通道。一條訊息可能最終轉發給一個或者多個訊息接收者,這些訊息接收者都無需對訊息發送者做出同步迴應。整個過程都是非同步的。
​ 案例:
​ 也就是說,一個系統跟另一個系統之間進行通訊的時候,假如系統A希望發送一個訊息給系統B,讓他去處理。但是系統A不關注系統B到底怎麼處理或者有沒有處理好,所以系統A把訊息發送給MQ,然後就不管這條訊息的「死活了」,接着系統B從MQ裏面消費出來處理即可。至於怎麼處理,是否處理完畢,什麼時候處理,都是系統B的事兒,與系統A無關。

流量削峯

​ 在MQ中,可以設定流量緩衝池,可以讓後端系統按照自身吞吐能力進行消費,不被沖垮 衝垮;

​ 但是,引入MQ後,也會產生一些問題

​ 1)系統更復雜,多了一個MQ元件

​ 2)訊息傳遞路徑更長,延時會增加

​ 3)訊息可靠性和重複性互爲矛盾,訊息不丟不重難以同時保證

​ 4)上遊無法知道下遊的執行結果,這一點是很致命的。所以請牢記, 呼叫方實時依賴執行結果的業務場景,請直接呼叫,不能使用MQ。

​ 權衡利弊之後,可以得到適用MQ的場景:

上遊不關注下遊的處理結果,並且同步呼叫耗時較長;

​ 不適用的場景:

上遊實時關注下遊的處理結果。

3、MQ的常用產品

​ MQ指的是訊息佇列,放置在系統之間的訊息中介軟體,是一種理念,常用的落地實現有一下幾種:
在这里插入图片描述

​ 從整體上來講,ActiveMQ屬於Apache公司,是比較老牌的MQ產品,並且底層使用java編寫;RabbitMQ使用的也比較多,但是它底層採用的是Erlang語言;Kafka主要用在大數據領域,RocketMQ是ailibaba參考Kafka推出的一個MQ產品,使用的比較多。

​ 本次僅介紹ActiveMQ的使用,其他產品的使用大致思路相同,因爲它們都是基於MQ理唸的產品,只不過一些落地的實現有所區別。

4、ActiveMQ安裝

​ 第一步:進入官網下載Linux系統安裝包,官網下載地址:

	http://activemq.apache.org/download-archives 

​ 選擇版本後,進入選擇操作系統頁面:
在这里插入图片描述
​ 第二步:下載完成之後,將tar包上傳到Linux操作系統/opt目錄下,並解壓

	tar -zxvf apache-activemq-5.15.9-bin.tar.gz	

​ 第三步:安裝成功,進入 apache-activemq-5.15.9/bin目錄下,執行activemq命令:

    ./activemq start  	----  啓動activemq
    ./activemq stop	  	----  停止activemq
    ./activemq restart	----  重新啓動activemq

    #將啓動、停止、重新啓動日誌重定向至日誌檔案(檔名自定義)
    ./activemq start >> run_activemq.log
    ./activemq stop >> run_activemq.log
    ./activemq restart >> run_activemq.log

​ 第四步:啓動之後,可以通過ps命令檢視是否啓動成功:

    ps -ef | grep activemq

​ 第五步:ActiveMQ啓動之後,預設佔用兩個埠號,一個提供JMS服務(61616),一個提供管理控制檯服務(8161),如果需要存取這兩個埠,請設定linux防火牆,開放這兩個埠。

​ 第六步:開啓防火牆之後,存取http://192.168.234.133:8161(Linux主機ip+port),即可進入ActiveMQ的管理控制檯。得到此頁面也意味着ActiveMQ安裝和啓動成功。
在这里插入图片描述

5、ActiveMQ的入門程式

​ 在ActiveMQ中,訊息通道有兩種,一種是Queue,一種是Topic,二者的比較如下:
在这里插入图片描述
​ Queue是一種一對一的方式,生產者將訊息發送到MQ中的一個Queue中,MQ再將訊息推播給監聽此Queue的消費者。注意,這種方式的訊息只會被推播一次,也就是說,消費者應當只有一個。當消費者是一個叢集時,預設採用輪詢的方式進行訊息推播,所有服務範例均分訊息。

​ Topic是一種一對多的方式,訊息發佈者將訊息發送到MQ中的一個Topic中,MQ再將訊息推播給訂閱此Topic的訂閱者。注意,當Topic的訂閱者不止一個時,這種方式的訊息會被重複推播,總推播次數=訂閱者個數*訊息數。

​ 更詳細的比較如下:

比較專案 Queue佇列模式 Topic佇列模式
工作模式 "負載均衡"模式,如果當前沒有消費者,訊息也不會丟棄;如果有多個消費者,那麼一條訊息也只會發送給其中一個消費者,並且要求消費者ack資訊 "訂閱-發佈"模式,如果當前沒有訂閱者,訊息將會被丟棄,如果有多個訂閱者,那麼這些訂閱者都會收到訊息
有無狀態 Queue數據預設會在mq伺服器上已檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOME\data\kr-store\data下面 下麪,也可以設定成DB儲存 無狀態
傳遞完整性 訊息不會被丟棄 如果沒有訂閱者,訊息會被丟棄
處理效率 由於一條訊息只發送給一個消費者,所以就算消費者再多,效能也不會有明顯降低。當然不同訊息協定的具體效能也是有差異的 由於訊息要按照訂閱者的數量進行復制,所以處理效能會隨着訂閱者的增加而明顯降低,並且還要結合不同訊息協定自身的效能差異

​ 下面 下麪我們分Queue和Topic兩種模式來書寫入門程式,但是二者的整體書寫思路是一致的:
在这里插入图片描述

​ JMS(java message service)開發的基本步驟:

​ 1:建立一個connection factory
​ 2:通過connection factory來建立JMS connection
​ 3:啓動JMS connection
​ 4:通過JMS connection建立JMS session
​ 5:建立JMS destination(目的地 佇列/主題)
​ 6:建立JMS producer或者建立JMS consume並設定destination
​ 7:建立JMS consumer或者註冊一個JMS message listener
​ 8:發送(send)或者接收(receive)JMS message
​ 9:關閉所有JMS資源

5.1、Queue模式

​ 第一步:環境搭建

​ 建立Maven工程,引入ActiveMQ依賴:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
</dependency>

​ 第二步:書寫生產者主程式

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce {
    private static final String ACTIVEMQ_URL = "tcp://192.168.234.133:61616";
    private static final String QUEUE_NAME = "queue01";
    
    public static void main(String[] args) throws JMSException {
        //1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連線工廠,獲得connection並啓動存取
        Connection connection = activeMQConnectionFactory.createConnection();
        //3.建立對談session
        //兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是佇列queue還是主題topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.建立訊息的生產者
        MessageProducer messageProducer = session.createProducer(queue);
        //6、啓動連線
        connection.start();
        //7.通過使用訊息生產者,生產三條訊息,發送到MQ的佇列裏面
        for (int i = 0; i < 3; i++) {
            //8.建立訊息
            TextMessage textMessage = session.createTextMessage("msg---hello" + i);//理解爲一個字串
            //9.通過messageProducer發送給MQ佇列
            messageProducer.send(textMessage);
        }
        //10.關閉資源
        messageProducer.close();
        session.close();
        System.out.println("****訊息發佈到MQ佇列完成");
    }
}

​ 第三步:書寫消費者主程式

​ 兩種書寫方式,一種是阻塞式,一種是監聽式。

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 阻塞式訊息消費者
 */
public class JmsConsumer {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    
    public static void main(String[] args) throws JMSException {
        //1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連線工廠,獲得connection並啓動存取
        Connection connection = activeMQConnectionFactory.createConnection();
        //3.建立對談session
        //兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是佇列queue還是主題topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //6、啓動連線
        connection.start();
        //回圈獲取
        while (true) {
            //7.通過消費者呼叫方法獲取佇列裏面的訊息(發送的訊息是什麼型別,接收的時候就強轉成什麼型別)
            //receive()方法爲阻塞方法,如果沒有接受到訊息,就會一致阻塞等待
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage != null) {
                System.out.println("****消費者接收到的訊息:  " + textMessage.getText());
            }else {
                break;
            }
        }
        //8.關閉資源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
 * 監聽模式下的消費者
 */
public class JmsConsumer2 {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    
    public static void main(String[] args) throws JMSException, IOException {
        //1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連線工廠,獲得connection並啓動存取
        Connection connection = activeMQConnectionFactory.createConnection();
        //3.建立對談session
        //兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是佇列queue還是主題topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //6、啓動連線
        connection.start();
        //7.通過監聽的方式消費訊息
        /*
        非同步非阻塞式方式監聽器(onMessage)
        訂閱者或消費者通過建立的消費者物件,給消費者註冊訊息監聽器setMessageListener,
        當訊息有訊息的時候,系統會自動呼叫MessageListener類的onMessage方法
        我們只需要在onMessage方法內判斷訊息型別即可獲取訊息
         */
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message != null && message instanceof TextMessage) {
                    //8.把message轉換成訊息發送前的型別並獲取訊息內容
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("****消費者接收到的訊息:  " + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.out.println("執行了39行");
        //保證控制檯不關閉,阻止程式關閉
        System.in.read();
        //關閉資源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

​ 第四步:啓動生產者,觀察ActiveMQ管理控制檯,得到如下:
在这里插入图片描述
​ Number Of Pending Messages=等待消費的訊息,這個是未出佇列的數量,=總接收數-總出佇列數。
​ Number Of Consumers=消費者數量,消費者端的消費者數量。
​ Messages Enqueued=進隊訊息數,進佇列的總訊息量,包括出佇列的。這個數只增不減。
​ Messages Dequeued=出隊訊息數,可以理解爲是消費者消費掉的數量。

​ 第五步:啓動消費者JmsConsumer,觀察控制檯
在这里插入图片描述
​ 可以看到當前的消費者數量爲1,待消費爲0,總進佇列數爲3,已經消費爲3.

​ 如果我們先啓動JmsConsumer和JmsConsumer2,再啓動生產者,此時生產者生產的訊息將通過負載均衡的方式分發到JmsConsumer和JmsConsumer2中。

5.2、Topic模式

​ 第一步:環境搭建

​ 建立Maven工程,引入ActiveMQ依賴:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
</dependency>

​ 第二步:書寫生產者主程式

package com.demo.activemq.topic;
 
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
 
public class JmsProducer_Topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    public static final String TOPIC_NAME = "topic01";
 
    public static void main(String[] args) throws JMSException {
        //1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連線工廠,獲得connection並啓動存取
        Connection connection = activeMQConnectionFactory.createConnection();
        //3.建立對談session
        //兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是佇列queue還是主題topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.建立訊息的生產者
        MessageProducer messageProducer = session.createProducer(topic);
        //6、啓動連線
        connection.start();
        //7.通過使用訊息生產者,生產三條訊息,發送到MQ的佇列裏面
        for (int i = 0; i < 3; i++) {
            //8.通過session建立訊息
            TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
            //9.使用指定好目的地的訊息生產者發送訊息
            messageProducer.send(textMessage);
        }
        //10.關閉資源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("****TOPIC_NAME訊息發佈到MQ完成");
    }
}

​ 第三步:書寫消費者主程式

​ 兩種書寫方式,一種是阻塞式,一種是監聽式。

package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 阻塞式訊息消費者
 */
public class JmsConsumer_Topic {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String TOPIC_NAME = "topic-ly";
    
    public static void main(String[] args) throws JMSException {
        //1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連線工廠,獲得connection並啓動存取
        Connection connection = activeMQConnectionFactory.createConnection();
        //3.建立對談session
        //兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是佇列queue還是主題topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
        MessageConsumer messageConsumer = session.createConsumer(topic);
        //6、啓動連線
        connection.start();
        //回圈獲取
        while (true) {
            //7.通過消費者呼叫方法獲取佇列裏面的訊息(發送的訊息是什麼型別,接收的時候就強轉成什麼型別)
            //receive()方法爲阻塞方法,如果沒有接受到訊息,就會一致阻塞等待
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage != null) {
                System.out.println("****消費者接收到的訊息:  " + textMessage.getText());
            }else {
                break;
            }
        }
        //8.關閉資源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}
package com.demo.activemq.topic;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
import javax.jms.*;
import java.io.IOException;
 
public class JmsConsumer2_Topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    public static final String TOPIC_NAME = "topic-ly";
 
    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("我是2號消費者");
        //1.建立連線工廠,按照給定的URL,採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連線工廠,獲得connection並啓動存取
        Connection connection = activeMQConnectionFactory.createConnection();
        //3.建立對談session
        //兩個參數transacted=事務,acknowledgeMode=確認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是佇列queue還是主題topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.建立訊息的消費者
        MessageConsumer messageConsumer = session.createConsumer(topic);
        //6、啓動連線
        connection.start();
        //7.建立訊息的消費者,指定消費哪一個佇列裏面的訊息
        messageConsumer.setMessageListener(message -> {
            if (message instanceof TextMessage){
                try {
                    String text = ((TextMessage) message).getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
    }
}

​ 第四步:先啓動消費者JmsConsumer_Topic和JmsConsumer2_Topic,此時觀察ActiveMQ管理控制檯,得到如下:
在这里插入图片描述
​ 第五步:啓動生產者,觀察控制檯在这里插入图片描述
​ 可以看到當前的消費者數量爲2,總進佇列數爲3,已經消費爲6(2*3)。

​ 因爲Topic是發佈/訂閱模式,生產者發佈一次,所有的訂閱此主題的消費者都能夠接收全部訊息,類似於微信公衆號。

​ 但是要注意的是,在Topic模式下,消費者只能消費它存在之後生產者發佈的訊息,所以必須要先啓動消費者,再啓動生產者,如果生產者發佈的訊息沒有消費者,則這些訊息會成爲廢訊息,永遠不會有消費者接收。

​ 而Queue模式下就不存在這種問題,因爲ActiveMQ會儲存Queue訊息,等有消費者時再進行推播。

下接:訊息中介軟體–JMS–ActiveMQ–02