ActiveMQ是Apache軟體基金會所研發的開放原始碼訊息中介軟體;由於ActiveMQ是一個純Java程式,因此只需要操作系統支援Java虛擬機器,ActiveMQ便可執行。
支援的程式語言包括:C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby。
通過ActiveMQ能實現訊息的網路通訊。生產者生產推動訊息到ActiveMQ服務,消費者連線ActiveMQ實現訊息的接收。
主要有Queues和Topics兩種訊息型別。Queues是一對一的模式,一個生產者對應一個消費者,訊息被消費了就沒有了。 Topics是多對對的模式,多個生產者,多個消費者。
http://activemq.apache.org/
有經典版和高效能版。我使用的是經典版。
下載好後,解壓,雙擊bin下的win64下的activemq.bat啓動服務。
可用瀏覽器登錄ActiveMQ的管理端,可以檢視訊息和連線等情況。如有賬號和密碼都是admin吧。
http://127.0.0.1:8161/admin
修改組態檔,實現訊息的恢復,當消費者用戶端重新啓動後能恢復歷史訊息。
ActiveMQ服務起來後,現在就是實現訊息的生產者和消費者了。
下載activemq-cpp對應庫程式碼並編譯:
http://www.apache.org/dyn/closer.lua/activemq/activemq-cpp/
下載環境工具庫,否則activemq-cpp編譯不過的:
http://apr.apache.org/download.cgi
網上別的人說要好幾個環境庫,我就只用了apr。
本人使用的是vs2019編譯,它原來工程預設的是2010,需要升級下工程屬性中的,常規,平臺工具集,到v142。
如報錯找不到標頭檔案apr_pools.h的話,需要把apr庫中的標頭檔案資料夾include包含到工程中。
修改工程屬性後應該就能編譯過了:
可能會報錯一大推編譯錯誤錯誤,參考這篇文章,https://blog.csdn.net/tiger_xs/article/details/100038183,需要在include資料夾下找到apr.hw檔案,找到宏定義「#define _WIN32_WINNT 0x0501」,將0x0501改爲0x0600。
找不到標頭檔案的加包含目錄,找不到libapr-1.lib庫的,把這個庫複製到這裏:
然後,在ActiveMQ服務啓動了的情況下,這個例子就能啓動啦。
這個工程的main.cpp給了生產者和消費者的例子程式碼,它是起了兩個執行緒程。可以通過這個例子根據自己的需要修改成自己的工程。
下面 下麪是我通過改動實現消費者的單例模式,這樣就可以在專案中各個地方呼叫,實現訊息的傳輸。
爲了防止阻塞,實際情況發訊息的時候是轉了執行緒的。這裏註釋掉了。
ActiveMQ.h
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
#include<boost/asio.hpp>
#include<boost/thread.hpp>
using namespace decaf::lang;
using namespace cms;
using namespace decaf::util::concurrent;
class MQProducer //: public Runnable
{
public:
static void init();
static MQProducer* instance();
int sendMessage(const std::string& msg);
private:
MQProducer(const std::string& brokerURI, bool useTopic = false, bool sessionTransacted = false);
virtual ~MQProducer();
void close();
virtual void run();
MQProducer(const MQProducer&);
MQProducer& operator=(const MQProducer&);
void cleanup();
void doSendMessage(const string& msg);
void getHostIP();
private:
Connection* m_pConnection;
Session* m_pSession;
Destination* m_pDestination;
MessageProducer* m_pProducer;
bool m_bUseTopic;
bool m_bSessionTransacted;
std::string m_brokerURI;
boost::asio::io_service m_ioservice;
boost::asio::io_service::work m_work;
std::string m_server;
static MQProducer* s_pMQProducer;
};
ActiveMQ.cpp
#include <Utils/ActiveMQ.h>
using namespace activemq::core;
using namespace decaf::util;
MQProducer* MQProducer::s_pMQProducer = NULL;
MQProducer* MQProducer::instance()
{
return s_pMQProducer;
}
//有點特殊的單子,確保多執行緒前,呼叫了init。
void MQProducer::init()
{
if (s_pMQProducer == NULL)
{
string serverip = "127.0.0.1:16161";
std::string brokerURI =
"failover:(tcp://"+ serverip+
"?wireFormat=openwire"
// "&transport.useInactivityMonitor=false"
// "&connection.alwaysSyncSend=true"
"&connection.useAsyncSend=true"
// "?transport.commandTracingEnabled=true"
// "&transport.tcpTracingEnabled=true"
// "&wireFormat.tightEncodingEnabled=true"
")";
s_pMQProducer = new MQProducer(brokerURI, true, false);
}
}
MQProducer::MQProducer(const std::string& brokerURI, bool useTopic, bool sessionTransacted )
: m_pConnection(NULL)
, m_pSession(NULL)
, m_pDestination(NULL)
, m_pProducer(NULL)
, m_bUseTopic(useTopic)
, m_bSessionTransacted(sessionTransacted)
, m_brokerURI(brokerURI)
, m_work(m_ioservice)
, m_server("")
{
activemq::library::ActiveMQCPP::initializeLibrary();
this->run();
}
MQProducer::~MQProducer()
{
cleanup();
activemq::library::ActiveMQCPP::shutdownLibrary();
}
void MQProducer::close()
{
this->cleanup();
}
void MQProducer::run()
{
try
{
// Create a ConnectionFactory
auto_ptr<ConnectionFactory> connectionFactory(ConnectionFactory::createCMSConnectionFactory(m_brokerURI));
//ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(m_brokerURI);
// Create a Connection
//connectionFactory->setConnectResponseTimeout(30000);
m_pConnection = connectionFactory->createConnection();
m_pConnection->start();
// Create a Session
if (this->m_bSessionTransacted)
{
m_pSession = m_pConnection->createSession(Session::SESSION_TRANSACTED);
}
else
{
m_pSession = m_pConnection->createSession(Session::AUTO_ACKNOWLEDGE);
}
// Create the destination (Topic or Queue)
if (m_bUseTopic)
{
m_pDestination = m_pSession->createTopic("alert");
}
else
{
m_pDestination = m_pSession->createQueue("alert");
}
// Create a MessageProducer from the Session to the Topic or Queue
m_pProducer = m_pSession->createProducer(m_pDestination);
m_pProducer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
}
catch (CMSException& e)
{
e.printStackTrace();
}
}
int MQProducer::sendMessage(const string& msg)
{
//m_ioservice.post(boost::bind(&utils::MQProducer::doSendMessage, this, msg));
doSendMessage(msg);
return 0;
}
void MQProducer::doSendMessage(const string& msg)
{
try
{
//生產者怎麼封裝休息,消費者那裏就要怎麼解析訊息,這裏用的是MapMessage字典方式的訊息
std::auto_ptr<MapMessage> message(m_pSession->createMapMessage());
message->setString("Instance", "來自BJ");
message->setString("Severity", "ERROR");
string text(msg);
text = boost::locale::conv::between(text, "UTF-8", "GBK");
message->setString("Message", text);
m_pProducer->send(message.get());
}
catch (CMSException& e)
{
e.printStackTrace();
}
}
void MQProducer::cleanup()
{
if (m_pConnection != NULL)
{
try
{
m_pConnection->close();
}
catch (cms::CMSException& ex)
{
ex.printStackTrace();
}
}
// Destroy resources.
try {
delete m_pDestination;
m_pDestination = NULL;
delete m_pProducer;
m_pProducer = NULL;
delete m_pSession;
m_pSession = NULL;
delete m_pConnection;
m_pConnection = NULL;
}
catch (CMSException& e)
{
e.printStackTrace();
}
}
大概這樣,未必能編譯過。不過也應該差不多了。
main.cpp
#include <ActiveMQ.h>
int main(int argc, char** argv)
{
MQProducer::init();
!MQProducer::instance() ? 0 : utils::MQProducer::instance()->sendMessage("test");
}