隨著集團MHA叢集的日漸增長,MHA管理平臺話越來越迫切。而MHA平臺的建設第一步就是將這些成百上千套的MHA叢集資訊收集起來,便於查詢和管理。
MHA主要資訊如下:
(1)基礎設定資訊;
(2)執行狀態資訊;
(3)啟動及FailOver的log資訊。
集團目前資料庫的管理平臺是在Archery的基礎上打造,所以,需要將此功能嵌入到既有平臺上。通過Archery系統進行查詢展示。
簡單來說,通過 Filebeat + Logstash + MySQL 架構 來收集儲存各個叢集的設定資訊、啟動及FailOver的log資訊 和執行狀態資訊。
執行狀態資訊是通過一個小程式獲取的,這個小程式每分鐘執行一次,會把執行結果輸出到檔案中。當然這個檔案是被failbeat監控的。
檔案為mha_checkstatus.py
#!/usr/bin/python # -*- coding: UTF-8 -*- import os import io import re import ConfigParser Path='/etc/mha' #fout=open('輸出檔名','w') for Name in os.listdir(Path) : Pathname= os.path.join(Path,Name) ## print(Pathname) ## print(Name) config =ConfigParser.ConfigParser() try: config.read(Pathname) server_item = config.sections() server1_host = '' ##MHA cnf 組態檔中的節點1 server2_host = '' ##MHA cnf 組態檔中的節點2 server3_host = '' ##MHA cnf 組態檔中的節點3 mha_cnf_remark = '' if 'server1' in server_item: server1_host = config.get('server1','hostname') else: mha_cnf_remark = mha_cnf_remark + 'Server1未設定;' if 'server2' in server_item: server2_host = config.get('server2','hostname') else: mha_cnf_remark = mha_cnf_remark + 'Server2未設定;' if 'server3' in server_item: server3_host = config.get('server3','hostname') ##print(mha_cnf_remark) except Exception as e: print(e) mha_status_result ='' ###20190330 Name = Name.replace(".cnf", "") ###叢集一主一從 if server1_host <> '' and server2_host <> '' and server3_host == '': cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname with os.popen(cmd_mha_status) as mha_status: mha_status_result = mha_status.read() if 'running(0:PING_OK)' in mha_status_result: print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result) if 'stopped(2:NOT_RUNNING)' in mha_status_result: print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result) ####叢集一主兩從 if server1_host <> '' and server2_host <> '' and server3_host <> '': cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname with os.popen(cmd_mha_status) as mha_status: mha_status_result = mha_status.read() if 'running(0:PING_OK)' in mha_status_result: print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::0:::'+server3_host+':::'+mha_status_result) if 'stopped(2:NOT_RUNNING)' in mha_status_result: print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result) print(Name+':::'+Pathname+':::None:::'+server3_host+':::'+mha_status_result)
概況說明,就是到存放MHA設定的資料夾,根據每個叢集的設定檔案,去逐一執行下masterha_check_status,把結果格式化,輸出到指定的檔案中。這個就是每個叢集的狀態資料。通過filebeat實時彙報上去。
觸發的方式可以是crontab,每分鐘執行一次。再本案中是輸出到 /???/checkmhastatus/masterha_check_status.log 中。
形式類似如下:
*/1 * * * * python /???/????/mha_checkstatus.py >> /???/????/masterha_check_status.log
CREATE TABLE `dbmha_status` ( `id` int NOT NULL AUTO_INCREMENT, `host` varchar(100) NOT NULL, `clustername` varchar(200) NOT NULL, `logpath` varchar(500) NOT NULL, `confpath` varchar(500) NOT NULL, `mhstatus` varchar(100) NOT NULL, `serverip` varchar(100) NOT NULL, `info` varchar(2000) NOT NULL, `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間', PRIMARY KEY (`id`), KEY `idx_createtime` (`create_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `dbmha_log` ( `id` int NOT NULL AUTO_INCREMENT, `host` varchar(100) NOT NULL, `clustername` varchar(200) NOT NULL, `filename` varchar(200) NOT NULL, `logpath` varchar(500) NOT NULL, `message` longtext NOT NULL, `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `dbmha_conf_info` ( `id` int NOT NULL AUTO_INCREMENT, `host` varchar(100) NOT NULL, `clustername` varchar(200) NOT NULL DEFAULT '', `confpath` varchar(500) NOT NULL DEFAULT '', `manager_log` varchar(500) NOT NULL DEFAULT '', `manager_workdir` varchar(500) NOT NULL DEFAULT '', `master_binlog_dir` varchar(500) NOT NULL DEFAULT '', `failover_script` varchar(500) NOT NULL DEFAULT '', `online_change_script` varchar(500) NOT NULL DEFAULT '', `password` varchar(128) NOT NULL DEFAULT '', `ping_interval` varchar(100) NOT NULL DEFAULT '', `remote_workdir` varchar(100) NOT NULL DEFAULT '', `repl_password` varchar(128) NOT NULL DEFAULT '', `repl_user` varchar(20) NOT NULL DEFAULT '', `ssh_user` varchar(20) NOT NULL DEFAULT '', `user` varchar(20) NOT NULL DEFAULT '', `serverip1` varchar(100) NOT NULL DEFAULT '', `port1` varchar(10) NOT NULL DEFAULT '', `candidate_master1` varchar(5) NOT NULL DEFAULT '', `check_repl_delay1` varchar(20) NOT NULL DEFAULT '', `serverip2` varchar(100) NOT NULL DEFAULT '', `port2` varchar(10) NOT NULL DEFAULT '', `candidate_master2` varchar(5) NOT NULL DEFAULT '', `check_repl_delay2` varchar(20) NOT NULL DEFAULT '', `serverip3` varchar(100) NOT NULL DEFAULT '', `port3` varchar(10) NOT NULL DEFAULT '', `candidate_master3` varchar(5) NOT NULL DEFAULT '', `check_repl_delay3` varchar(20) NOT NULL DEFAULT '', `info` longtext NOT NULL, `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間', PRIMARY KEY (`id`), KEY `idx_createtime` (`create_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
.............. - type: log paths: - /???/????/masterha_check_status.log fields: log_type: mha-status db_host: 111.111.XXX.1XX ###這個IP為mha Mnaager所在serverip - type: log paths: - /???/mhaconf/*.cnf fields: log_type: mha-cnf db_host: 111.111.XXX.XXX multiline.type: pattern multiline.pattern: '^\[server [[:space:]] default' multiline.negate: true multiline.match: after - type: log paths: - /???/????/mha/*/*.log fields: log_type: mysql-mha db_host: 111.111.XXX.XXX ................
# Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { beats { port => 5044 } } filter { if [fields][log_type] == "mysql-mha" { grok { match => ["message", "(?m)^%{TIMESTAMP_ISO8601:timestamp} %{BASE10NUM} \[%{WORD:error_level}\] %{GREEDYDATA:error_msg}$"] } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"} } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"} } date { match=> ["timestamp", "ISO8601"] remove_field => ["timestamp"] } mutate { copy => { "[log][file][path]" => "logpath" "[fields][db_host]" => "manager_ip" } } mutate { remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"] } } if [fields][log_type] == "mha-cnf" { mutate { split => ["message","server"] add_field => {"message1" => "%{[message][1]}"} add_field => {"messages1" => "%{[message][2]}"} add_field => {"messages2" => "%{[message][3]}"} add_field => {"messages3" => "%{[message][4]}"} add_field => {"dft_password" => "*********"} add_field => {"dft_repl_password" => "*********"} } kv { source => "message1" field_split => "\n" include_keys => ["manager_log", "manager_workdir", "master_binlog_dir", "master_ip_failover_script", "master_ip_online_change_script", "ping_interval", "remote_workdir", "repl_user", "ssh_user", "user" ] prefix => "dft_" remove_char_value => "<>\[\]," } kv { source => "messages1" field_split => "\n" include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ] prefix => "s1_" } kv { source => "messages2" field_split => "\n" default_keys => [ "s2_candidate_master", "", "s2_check_repl_delay", "", "s2_hostname","", "s2_port","" ] include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ] prefix => "s2_" } kv { source => "messages3" field_split => "\n" default_keys => [ "s3_candidate_master", "", "s3_check_repl_delay", "", "s3_hostname","", "s3_port","" ] include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ] prefix => "s3_" } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"} match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"} } mutate { copy => { "[fields][db_host]" => "manager_ip" } copy => { "[log][file][path]" => "conf_path" } gsub => [ "message", "需要加密的***密***碼", "*********", "message", "需要加密的其他字元", "*********" ] } date { match=> ["timestamp", "ISO8601"] remove_field => ["timestamp"] } mutate { remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"] } } if [fields][log_type] == "mha-status" { mutate { split => ["message",":::"] add_field => {"cluster_name" => "%{[message][0]}"} add_field => {"conf_path" => "%{[message][1]}"} add_field => {"masterha_check_status" => "%{[message][2]}"} add_field => {"server" => "%{[message][3]}"} add_field => {"info" => "%{[message][4]}"} } grok { match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"} } mutate { copy => { "[fields][db_host]" => "manager_ip" } } date { match=> ["timestamp", "ISO8601"] remove_field => ["timestamp"] } mutate { remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"] } } } output { if [fields][log_type] == "mysql-mha" { jdbc { driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar" driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????" statement => ["INSERT INTO dbmha_log (host,clustername,filename,logpath, message) VALUES(?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{filename}","%{logpath}","%{message}"] } } if [fields][log_type] == "mha-status" { jdbc { driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar" driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????" statement => ["INSERT INTO dbmha_status (host,clustername,logpath,confpath,mhstatus,serverip,info) VALUES(?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{cluster_name}","%{filename}","%{conf_path}","%{masterha_check_status}","%{server}","%{info}"] } } if [fields][log_type] == "mha-cnf" { jdbc { driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar" driver_class => "com.mysql.jdbc.Driver" connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????" statement => ["INSERT INTO dbmha_conf_info (host,clustername,confpath,manager_log,manager_workdir,master_binlog_dir,failover_script,online_change_script,password,ping_interval,remote_workdir,repl_password,repl_user,ssh_user,user,serverip1,port1,candidate_master1,check_repl_delay1,serverip2,port2,candidate_master2,check_repl_delay2,serverip3,port3,candidate_master3,check_repl_delay3,info) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{conf_path}","%{dft_manager_log}","%{dft_manager_workdir}","%{dft_master_binlog_dir}","%{dft_master_ip_failover_script}","%{dft_master_ip_online_change_script}","%{dft_password}","%{dft_ping_interval}","%{dft_remote_workdir}","%{dft_repl_password}","%{dft_repl_user}","%{dft_ssh_user}","%{dft_user}","%{s1_hostname}","%{s1_port}","%{s1_candidate_master}","%{s1_check_repl_delay}","%{s2_hostname}","%{s2_port}","%{s2_candidate_master}","%{s2_check_repl_delay}","%{s3_hostname}","%{s3_port}","%{s3_candidate_master}","%{s3_check_repl_delay}","%{message}"] } } }
這個設定還是相對複雜難懂的。這個檔案設定了對三種檔案的讀取,我們就看讀取mha組態檔的部分【[fields][log_type] == "mha-cnf"】,我們挑其中的幾個點說下,更多的內容可參照logstash官網--https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
首先,我們是 「server」 關鍵字,把檔案中的設定資訊,分割成不同的部分。
接著,因為組態檔的格式是 key=value的樣式,所以需要藉助 kv{},其中的引數說下:field_split---定義欄位間的分隔符;include_keys--定義唯讀去規定的特定key;prefix---格式化欄位名字,加個字首名字,主要是用來區分server 1 部分和 server2、、、之間的分別。
通過【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}】,獲取product欄位,我們是通過mha的組態檔的名字來定義叢集的名字,即規範了mha組態檔的名字的命名來自於叢集的名字,反推得知了組態檔的名字,就知道了叢集的名字。【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}】這個地方的filename包含了檔案的字尾。
我們是把此專案嵌入到既有的Archery平臺中,增加了3個查詢介面,介面的實現,在此就不具體展開了。
需要注意的是,介面需要支援模糊查詢,例如支援MHA Manager Server IP查詢(方便查詢各個Manager節點上有多少叢集);支援叢集名字的模糊查詢;支援節點serverIP的模糊查詢。
Q.1 為什麼用MySQL儲存資訊,ELK是更成熟的架構啊?
是的,用elasticsearch來儲存這種文字資訊更常見。我們用MySQL替代elasticsearch基於以下考慮:(1)我們既有的管理平臺使用的是MySQL,把他們儲存到MySQL 便於整合;(2)這些資料,不僅僅是Log,還有些是基礎資料,放到MySQL便於相互管理、聚合展示(3)這是資料量並不大,例如mha.log,只有在啟動或者failover時才有變化,conf資訊也是很少的,所以,從資料量也一點考慮,也不需要儲存到MySQL。
Q.2 Logstash 可以把資料寫入到MySql中嗎?
是可以的。主要是logstash-output-jdbc、logstash-codec-plain外掛的安裝。
如果是離線的環境下安裝,可以參考 《logstash 離線安裝logstash-output-jdbc》
https://blog.csdn.net/sxw1065430201/article/details/123663108
Q.3 MHA log 資料夾中 原有一個 .health ,裡面是MHA每分鐘的健康性報告,那為什麼還要自己寫Python程式獲取呢?
因為.healthy 的內容不是換行符結尾,而filebeat是以換行符來判斷的(https://www.elastic.co/guide/en/beats/filebeat/7.4/newline-character-required-eof.html 有詳細說明)。
簡單來說,filebeat讀取不了。
Q.4 MHA 健康性檢查的原理
具體的原理可以參考此文章的分析說明--《mha檢測mysql狀況方式》
https://blog.csdn.net/weixin_35411438/article/details/113455263
Q.5 歷史資料的刪除
mha log 資訊表 dbmha_log
MHA 基礎設定表 dbmha_conf_info
以上兩張表基本上很少變化,量不大,其資料無需定期刪除。
執行狀態表 dbmha_status,此表每個叢集每分鐘(具體crontab定義)都會有新資料插入,資料量增長較大,應設定定時任務,定期刪除歷史資料,例如刪除7天前的資料。