前言:
大概一年多前寫過一個部署ELK系列的部落格文章,前不久剛好在部署一個ELK的解決方案,我順便就把一些基礎的部分拎出來,再整合成一期文章。大概內容包括:搭建ELK叢集,以及寫一個簡單的MQ服務。
如果需要看一年多之前寫的文章,可以詳見下列文章連結(例如部署成Windows服務、設定瀏覽器外掛、logstash接收消費者資料等,該篇文章不再重複描述,可以點選下方連結自行參考):
安裝包目錄
將有關環境解壓備用。如圖:
其中,這個包是我寫的一個比較通用的基於DIRECT模式的簡易版MQ生產者和消費者基礎功能服務,也可以直接拿來做MQ的業務對接開發。
然後,在program.cs裡面,新增對WeskyMqService的註冊,裡面註冊了基礎的生產者和消費者服務。
咱新增一個消費者訊息消費的方法ConsumeMessage以及有關interface介面,該方法後面用來當作回撥函數使用,用來傳給MQ消費者,當監聽到訊息以後,會進入到該方法裡面。
再然後,回到program裡面,對剛才新增的方法進行依賴注入的註冊;以及在裡面做一個簡單的MQ佇列的設定資訊。
其中,RabbitMqOptions建構函式帶有兩個引數,分別是
RabbitMqConfiguration:它傳入MQ的連線
RabbitMqMessageQueues[]:路由鍵以及對應的佇列名稱陣列,有多少個佇列,陣列元素就是多少個一一對應。
以上建立了基礎的連線,通過direct模式進行訊息訂閱和釋出;並且建立兩個佇列,分別是Test1和Test2
再然後,新建一個API控制器,並提供有關依賴注入進行實現的驗證。
為了方便測試,新增兩個api介面,用來檢測MQ的連線以及訊息釋出和訂閱。
其中,index此處的作用是咱們設定的訊息佇列的佇列陣列裡面的下標。
連線引數isActive代表是否消費者消費訊息,如果不消費訊息,那麼就可以用來給我們上面的logstash來消費了;如果消費訊息,那麼訊息就會進入到我們剛才建立的ConsumeMessage方法裡面去,因為它通過回撥函數引數丟進去了:
開啟MQ面板,可以看到現在是都沒有佇列存在的。
此時啟動程式,先呼叫連線的API(API裡面,包括了生產者連線和消費者連線,平常如果只需要連線一個,也是可以遮蔽其他的,例如與第三方MQ做資訊通訊的話),再呼叫釋出訊息的API,檢視效果。咱先做一個不消費(允許消費引數設為false)的例子:
然後再檢視MQ面板,可以看到多了兩個佇列,這兩個佇列就是程式碼裡面連線以後自動建立的:
呼叫生產者,釋出一條訊息,例如發給陣列的第一個佇列一條訊息:
因為不允許消費,所以訊息會一直在佇列裡面沒有被消費掉:
我們先關閉api程式,然後重新做個連線,做成允許消費的,看下效果:
再切換到MQ面板,可以看到訊息立馬被消費了:
我們剛才寫的一個輸出控制檯的消費訊息的業務方法,此時也被執行了:
以上說明MQ的生產者與消費者服務是OK的了。然後就可以與上面的EKL叢集進行配合使用,例如你的程式需要通過MQ的方式給Logstash傳送訊息,那麼就可以使用傳入不啟用MQ使用者端消費的功能來實現;如果需要與其他生產者對接,或者需要做MQ訊息消費的業務,就可以通過類似方式,寫一個回撥函數當作引數丟進MQ消費者服務裡面去即可。
最後,ELK上面我沒做其他的優化,大佬們感興趣可以自己優化,例如資訊的壓縮、定時刪除等等,這些都可以在Kibana的管理介面裡面進行設定。
如果需要以上的環境全套資源、以及後面的MQ的例子,可以掃碼關注以下公眾號,或者搜尋【Dotnet Dancer】關注公眾號,傳送【ELKQ】即可獲取。