Windows下實現C++ 連線ActiveMQ

2020-08-07 18:43:41

1.什麼是ActiveMQ?

ActiveMQ是Apache軟體基金會所研發的開放原始碼訊息中介軟體;由於ActiveMQ是一個純Java程式,因此只需要操作系統支援Java虛擬機器,ActiveMQ便可執行。
支援的程式語言包括:C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby。

2.能用來做什麼?

通過ActiveMQ能實現訊息的網路通訊。生產者生產推動訊息到ActiveMQ服務,消費者連線ActiveMQ實現訊息的接收。
在这里插入图片描述

3. 支援的訊息型別

主要有Queues和Topics兩種訊息型別。Queues是一對一的模式,一個生產者對應一個消費者,訊息被消費了就沒有了。 Topics是多對對的模式,多個生產者,多個消費者。

4.本地安裝ActiveMQ服務

4.1 下載地址

http://activemq.apache.org/
有經典版和高效能版。我使用的是經典版

4.2 啓動

下載好後,解壓,雙擊bin下的win64下的activemq.bat啓動服務。
在这里插入图片描述
可用瀏覽器登錄ActiveMQ的管理端,可以檢視訊息和連線等情況。如有賬號和密碼都是admin吧。
http://127.0.0.1:8161/admin

4.3 組態檔activemq.xml

修改組態檔,實現訊息的恢復,當消費者用戶端重新啓動後能恢復歷史訊息。
在这里插入图片描述

5.C++實現連線ActiveMQ

ActiveMQ服務起來後,現在就是實現訊息的生產者和消費者了。

5.1 下載相關介面

下載activemq-cpp對應庫程式碼並編譯:
http://www.apache.org/dyn/closer.lua/activemq/activemq-cpp/
下載環境工具庫,否則activemq-cpp編譯不過的:
http://apr.apache.org/download.cgi
在这里插入图片描述
網上別的人說要好幾個環境庫,我就只用了apr。

5.2 編譯

本人使用的是vs2019編譯,它原來工程預設的是2010,需要升級下工程屬性中的,常規,平臺工具集,到v142。

5.2.1 編譯activemq-cpp工程

在这里插入图片描述
如報錯找不到標頭檔案apr_pools.h的話,需要把apr庫中的標頭檔案資料夾include包含到工程中。
在这里插入图片描述
修改工程屬性後應該就能編譯過了:
在这里插入图片描述

5.2.2 編譯libapr工程生成libapr-1.lib和libapr-1.dll庫

在这里插入图片描述
可能會報錯一大推編譯錯誤錯誤,參考這篇文章,https://blog.csdn.net/tiger_xs/article/details/100038183,需要在include資料夾下找到apr.hw檔案,找到宏定義「#define _WIN32_WINNT 0x0501」,將0x0501改爲0x0600。

5.2.3 編譯activemq-cpp.example例子工程,執行例子

找不到標頭檔案的加包含目錄,找不到libapr-1.lib庫的,把這個庫複製到這裏:
在这里插入图片描述
然後,在ActiveMQ服務啓動了的情況下,這個例子就能啓動啦。

這個工程的main.cpp給了生產者和消費者的例子程式碼,它是起了兩個執行緒程。可以通過這個例子根據自己的需要修改成自己的工程。

6.實際專案中的改造

下面 下麪是我通過改動實現消費者的單例模式,這樣就可以在專案中各個地方呼叫,實現訊息的傳輸。
爲了防止阻塞,實際情況發訊息的時候是轉了執行緒的。這裏註釋掉了。

ActiveMQ.h

#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
#include<boost/asio.hpp>
#include<boost/thread.hpp>

using namespace decaf::lang;
using namespace cms;
using namespace decaf::util::concurrent;
class MQProducer //: public Runnable
{
public:
    static void init();
    static MQProducer* instance();
    int sendMessage(const std::string& msg);
private:
    MQProducer(const std::string& brokerURI, bool useTopic = false, bool sessionTransacted = false);
    virtual ~MQProducer();
    void close();
    virtual void run();
    MQProducer(const MQProducer&);
    MQProducer& operator=(const MQProducer&);
    void cleanup();
    void doSendMessage(const string& msg);
    void getHostIP();
    
private:
    Connection* m_pConnection;
    Session* m_pSession;
    Destination* m_pDestination;
    MessageProducer* m_pProducer;
    bool m_bUseTopic;
    bool m_bSessionTransacted;
    std::string m_brokerURI;
    boost::asio::io_service m_ioservice;
    boost::asio::io_service::work m_work;
    std::string m_server;
    static MQProducer* s_pMQProducer;
};

ActiveMQ.cpp

#include <Utils/ActiveMQ.h>

using namespace activemq::core;
using namespace decaf::util;

MQProducer* MQProducer::s_pMQProducer = NULL; 

MQProducer* MQProducer::instance()
{
    return s_pMQProducer;
}

//有點特殊的單子,確保多執行緒前,呼叫了init。
void MQProducer::init()
{
    if (s_pMQProducer == NULL)
    {
        string serverip = "127.0.0.1:16161";
         std::string brokerURI =
            "failover:(tcp://"+ serverip+
            "?wireFormat=openwire"
            //        "&transport.useInactivityMonitor=false"
            //        "&connection.alwaysSyncSend=true"
            "&connection.useAsyncSend=true"
            //        "?transport.commandTracingEnabled=true"
            //        "&transport.tcpTracingEnabled=true"
            //        "&wireFormat.tightEncodingEnabled=true"
            ")";
        s_pMQProducer = new MQProducer(brokerURI, true, false);
    }
}

MQProducer::MQProducer(const std::string& brokerURI, bool useTopic, bool sessionTransacted )
    : m_pConnection(NULL)
    , m_pSession(NULL)
    , m_pDestination(NULL)
    , m_pProducer(NULL)
    , m_bUseTopic(useTopic)
    , m_bSessionTransacted(sessionTransacted)
    , m_brokerURI(brokerURI)
    , m_work(m_ioservice)
    , m_server("")
    {
        activemq::library::ActiveMQCPP::initializeLibrary();
        this->run();
    }

MQProducer::~MQProducer() 
{
    cleanup();
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

void MQProducer::close() 
{
    this->cleanup();
}

void MQProducer::run() 
{
    try 
    {
        // Create a ConnectionFactory
        auto_ptr<ConnectionFactory> connectionFactory(ConnectionFactory::createCMSConnectionFactory(m_brokerURI));
        //ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(m_brokerURI);

        // Create a Connection
        //connectionFactory->setConnectResponseTimeout(30000);
        m_pConnection = connectionFactory->createConnection();
        m_pConnection->start();

        // Create a Session
        if (this->m_bSessionTransacted) 
        {
            m_pSession = m_pConnection->createSession(Session::SESSION_TRANSACTED);
        }
        else 
        {
            m_pSession = m_pConnection->createSession(Session::AUTO_ACKNOWLEDGE);
        }

        // Create the destination (Topic or Queue)
        if (m_bUseTopic)
        {
            m_pDestination = m_pSession->createTopic("alert");
        }
        else 
        {
            m_pDestination = m_pSession->createQueue("alert");
        }

        // Create a MessageProducer from the Session to the Topic or Queue
        m_pProducer = m_pSession->createProducer(m_pDestination);
        m_pProducer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
    }
    catch (CMSException& e) 
    {
        e.printStackTrace();
    }
}

int MQProducer::sendMessage(const string& msg)
{
    //m_ioservice.post(boost::bind(&utils::MQProducer::doSendMessage, this, msg));
    doSendMessage(msg);
    return 0;
}

void MQProducer::doSendMessage(const string& msg)
{
    try
    {
       //生產者怎麼封裝休息,消費者那裏就要怎麼解析訊息,這裏用的是MapMessage字典方式的訊息
        std::auto_ptr<MapMessage> message(m_pSession->createMapMessage());
        message->setString("Instance", "來自BJ");
        message->setString("Severity", "ERROR");
        string text(msg);
        text = boost::locale::conv::between(text, "UTF-8", "GBK");
        message->setString("Message", text);
        m_pProducer->send(message.get());
    }
    catch (CMSException& e)
    {
        e.printStackTrace();
    }
}

void MQProducer::cleanup()
{
    if (m_pConnection != NULL) 
    {
        try 
        {
            m_pConnection->close();
        }
        catch (cms::CMSException& ex) 
        {
            ex.printStackTrace();
        }
    }

    // Destroy resources.
    try {
        delete m_pDestination;
        m_pDestination = NULL;
        delete m_pProducer;
        m_pProducer = NULL;
        delete m_pSession;
        m_pSession = NULL;
        delete m_pConnection;
        m_pConnection = NULL;
    }
    catch (CMSException& e) 
    {
        e.printStackTrace();
    }
}

大概這樣,未必能編譯過。不過也應該差不多了。
main.cpp

#include <ActiveMQ.h>
int main(int argc, char** argv)
{
    MQProducer::init();
    !MQProducer::instance() ? 0 : utils::MQProducer::instance()->sendMessage("test");
}