MySQL MHA資訊的收集【Filebeat+logstash+MySQL】

2023-04-16 06:00:19

一.專案背景

隨著集團MHA叢集的日漸增長,MHA管理平臺話越來越迫切。而MHA平臺的建設第一步就是將這些成百上千套的MHA叢集資訊收集起來,便於查詢和管理。

MHA主要資訊如下:

(1)基礎設定資訊;

(2)執行狀態資訊;

(3)啟動及FailOver的log資訊。

集團目前資料庫的管理平臺是在Archery的基礎上打造,所以,需要將此功能嵌入到既有平臺上。通過Archery系統進行查詢展示。

二.架構

 簡單來說,通過 Filebeat + Logstash + MySQL 架構 來收集儲存各個叢集的設定資訊、啟動及FailOver的log資訊 和執行狀態資訊。

執行狀態資訊是通過一個小程式獲取的,這個小程式每分鐘執行一次,會把執行結果輸出到檔案中。當然這個檔案是被failbeat監控的。

三.實現

3.1 獲取MHA狀態的指令碼

檔案為mha_checkstatus.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
import io
import re
import ConfigParser

Path='/etc/mha'
#fout=open('輸出檔名','w')
for Name in os.listdir(Path) :
  Pathname= os.path.join(Path,Name)
 ## print(Pathname)
 ## print(Name)
  config =ConfigParser.ConfigParser()
  try:
    config.read(Pathname)
    server_item = config.sections()
    server1_host = ''  ##MHA cnf 組態檔中的節點1
    server2_host = ''  ##MHA cnf 組態檔中的節點2
    server3_host = ''  ##MHA cnf 組態檔中的節點3
    mha_cnf_remark = ''
    if 'server1' in server_item:
      server1_host = config.get('server1','hostname')
    else:
       mha_cnf_remark = mha_cnf_remark + 'Server1未設定;'
    if 'server2' in server_item:
      server2_host = config.get('server2','hostname')
    else:
      mha_cnf_remark = mha_cnf_remark + 'Server2未設定;'
    if 'server3' in server_item:
      server3_host = config.get('server3','hostname')

      ##print(mha_cnf_remark)
  except Exception as e:
    print(e)

  mha_status_result =''
  ###20190330
  Name = Name.replace(".cnf", "")

  ###叢集一主一從
  if server1_host <> '' and server2_host <> '' and server3_host == '':
    cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
    with os.popen(cmd_mha_status) as mha_status:
      mha_status_result = mha_status.read()
      if 'running(0:PING_OK)' in mha_status_result:
        print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
      if 'stopped(2:NOT_RUNNING)' in mha_status_result:
        print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)

  ####叢集一主兩從
  if server1_host <> '' and server2_host <> '' and server3_host <> '':
    cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
    with os.popen(cmd_mha_status) as mha_status:
      mha_status_result = mha_status.read()
      if 'running(0:PING_OK)' in mha_status_result:
        print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::0:::'+server3_host+':::'+mha_status_result)
      if 'stopped(2:NOT_RUNNING)' in mha_status_result:
        print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)
        print(Name+':::'+Pathname+':::None:::'+server3_host+':::'+mha_status_result)

 概況說明,就是到存放MHA設定的資料夾,根據每個叢集的設定檔案,去逐一執行下masterha_check_status,把結果格式化,輸出到指定的檔案中。這個就是每個叢集的狀態資料。通過filebeat實時彙報上去。

觸發的方式可以是crontab,每分鐘執行一次。再本案中是輸出到 /???/checkmhastatus/masterha_check_status.log 中。

形式類似如下:

*/1 * * * * python /???/????/mha_checkstatus.py >>   /???/????/masterha_check_status.log

3.2 表的設計及指令碼

3.2.1 執行狀態表 dbmha_status

CREATE TABLE `dbmha_status` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL,
  `logpath` varchar(500) NOT NULL,
  `confpath` varchar(500) NOT NULL,
  `mhstatus` varchar(100) NOT NULL,
  `serverip` varchar(100) NOT NULL,
  `info` varchar(2000) NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間',
  PRIMARY KEY (`id`),
  KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.2.2 mha log 資訊表 dbmha_log

CREATE TABLE `dbmha_log` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL,
  `filename` varchar(200) NOT NULL,
  `logpath` varchar(500) NOT NULL,
  `message` longtext NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.2.3 MHA 基礎設定表 dbmha_conf_info

CREATE TABLE `dbmha_conf_info` (
  `id` int NOT NULL AUTO_INCREMENT,
  `host` varchar(100) NOT NULL,
  `clustername` varchar(200) NOT NULL DEFAULT '',
  `confpath` varchar(500) NOT NULL DEFAULT '',
  `manager_log` varchar(500) NOT NULL DEFAULT '',
  `manager_workdir` varchar(500) NOT NULL DEFAULT '',
  `master_binlog_dir` varchar(500) NOT NULL DEFAULT '',
  `failover_script` varchar(500) NOT NULL DEFAULT '',
  `online_change_script` varchar(500) NOT NULL DEFAULT '',
  `password` varchar(128) NOT NULL DEFAULT '',
  `ping_interval` varchar(100) NOT NULL DEFAULT '',
  `remote_workdir` varchar(100) NOT NULL DEFAULT '',
  `repl_password` varchar(128) NOT NULL DEFAULT '',
  `repl_user` varchar(20) NOT NULL DEFAULT '',
  `ssh_user` varchar(20) NOT NULL DEFAULT '',
  `user` varchar(20) NOT NULL DEFAULT '',
  `serverip1` varchar(100) NOT NULL DEFAULT '',
  `port1` varchar(10) NOT NULL DEFAULT '',
  `candidate_master1` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay1` varchar(20) NOT NULL DEFAULT '',
  `serverip2` varchar(100) NOT NULL DEFAULT '',
  `port2` varchar(10) NOT NULL DEFAULT '',
  `candidate_master2` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay2` varchar(20) NOT NULL DEFAULT '',
  `serverip3` varchar(100) NOT NULL DEFAULT '',
  `port3` varchar(10) NOT NULL DEFAULT '',
  `candidate_master3` varchar(5) NOT NULL DEFAULT '',
  `check_repl_delay3` varchar(20) NOT NULL DEFAULT '',
  `info` longtext NOT NULL,
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集時間',
  PRIMARY KEY (`id`),
  KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3.3 filbeat 中關於讀取檔案的設定

..............
- type: log
  paths:
    - /???/????/masterha_check_status.log
  fields:
    log_type: mha-status
    db_host: 111.111.XXX.1XX    ###這個IP為mha Mnaager所在serverip

- type: log
  paths:
    - /???/mhaconf/*.cnf
  fields:
    log_type: mha-cnf
    db_host: 111.111.XXX.XXX
  multiline.type: pattern
  multiline.pattern: '^\[server [[:space:]] default'
  multiline.negate: true
  multiline.match: after


- type: log
  paths:
    - /???/????/mha/*/*.log
  fields:
    log_type: mysql-mha
    db_host: 111.111.XXX.XXX
................

3.4 Logstash 的組態檔

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

filter {

    if [fields][log_type] == "mysql-mha" {
        grok {
            match => ["message", "(?m)^%{TIMESTAMP_ISO8601:timestamp} %{BASE10NUM} \[%{WORD:error_level}\] %{GREEDYDATA:error_msg}$"]
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
        copy => { "[log][file][path]" => "logpath"
                 "[fields][db_host]" => "manager_ip" }
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }


    if [fields][log_type] == "mha-cnf" {
        mutate {
        split => ["message","server"]
        add_field => {"message1" => "%{[message][1]}"}
        add_field => {"messages1" => "%{[message][2]}"}
        add_field => {"messages2" => "%{[message][3]}"}
        add_field => {"messages3" => "%{[message][4]}"}
        add_field => {"dft_password" => "*********"}
        add_field => {"dft_repl_password" => "*********"}
        }
        kv {
             source => "message1" 
             field_split => "\n"
             include_keys => ["manager_log", "manager_workdir", "master_binlog_dir", "master_ip_failover_script", "master_ip_online_change_script", "ping_interval", "remote_workdir", "repl_user", "ssh_user", "user" ]
             prefix => "dft_"
             remove_char_value => "<>\[\]," 
        }
        kv {
             source => "messages1"
             field_split => "\n"
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s1_"
        }
        kv {
             source => "messages2"
             field_split => "\n"
             default_keys => [ "s2_candidate_master", "",
                         "s2_check_repl_delay", "",
                         "s2_hostname","",
                          "s2_port",""
                          ]
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s2_"
        }
        kv {
             source => "messages3"
             field_split => "\n"
             default_keys => [ "s3_candidate_master", "",
                         "s3_check_repl_delay", "",
                         "s3_hostname","",
                          "s3_port","" 
                          ]
             include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
             prefix => "s3_"
        }
        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        mutate {
             copy => { "[fields][db_host]" => "manager_ip" }
             copy => { "[log][file][path]" => "conf_path" }
             gsub => [
                      "message", "需要加密的***密***碼", "*********",
                      "message", "需要加密的其他字元", "*********"
                      ]
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }

    if [fields][log_type] == "mha-status" {
       mutate {
        split => ["message",":::"]
        add_field => {"cluster_name" => "%{[message][0]}"}
        add_field => {"conf_path" => "%{[message][1]}"}
        add_field => {"masterha_check_status" => "%{[message][2]}"}
        add_field => {"server" => "%{[message][3]}"}
        add_field => {"info" => "%{[message][4]}"}
         }

        grok {
            match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
        }
        mutate {
             copy => { "[fields][db_host]" => "manager_ip" }
        }
        date {
            match=> ["timestamp", "ISO8601"]
            remove_field => ["timestamp"]
        }
        mutate {
            remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
        }
    }

}


output {
    if [fields][log_type] == "mysql-mha" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_log (host,clustername,filename,logpath, message) VALUES(?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{filename}","%{logpath}","%{message}"]
       }
    }

    if [fields][log_type] == "mha-status" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_status (host,clustername,logpath,confpath,mhstatus,serverip,info) VALUES(?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{cluster_name}","%{filename}","%{conf_path}","%{masterha_check_status}","%{server}","%{info}"]
       }
   }
    if [fields][log_type] == "mha-cnf" {
      jdbc {
           driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
           driver_class => "com.mysql.jdbc.Driver"
           connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
           statement => ["INSERT INTO dbmha_conf_info (host,clustername,confpath,manager_log,manager_workdir,master_binlog_dir,failover_script,online_change_script,password,ping_interval,remote_workdir,repl_password,repl_user,ssh_user,user,serverip1,port1,candidate_master1,check_repl_delay1,serverip2,port2,candidate_master2,check_repl_delay2,serverip3,port3,candidate_master3,check_repl_delay3,info) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{conf_path}","%{dft_manager_log}","%{dft_manager_workdir}","%{dft_master_binlog_dir}","%{dft_master_ip_failover_script}","%{dft_master_ip_online_change_script}","%{dft_password}","%{dft_ping_interval}","%{dft_remote_workdir}","%{dft_repl_password}","%{dft_repl_user}","%{dft_ssh_user}","%{dft_user}","%{s1_hostname}","%{s1_port}","%{s1_candidate_master}","%{s1_check_repl_delay}","%{s2_hostname}","%{s2_port}","%{s2_candidate_master}","%{s2_check_repl_delay}","%{s3_hostname}","%{s3_port}","%{s3_candidate_master}","%{s3_check_repl_delay}","%{message}"]
       }
   }

}

 這個設定還是相對複雜難懂的。這個檔案設定了對三種檔案的讀取,我們就看讀取mha組態檔的部分【[fields][log_type] == "mha-cnf"】,我們挑其中的幾個點說下,更多的內容可參照logstash官網--https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

首先,我們是 「server」 關鍵字,把檔案中的設定資訊,分割成不同的部分。

接著,因為組態檔的格式是 key=value的樣式,所以需要藉助 kv{},其中的引數說下:field_split---定義欄位間的分隔符;include_keys--定義唯讀去規定的特定key;prefix---格式化欄位名字,加個字首名字,主要是用來區分server 1 部分和 server2、、、之間的分別。

 通過【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}】,獲取product欄位,我們是通過mha的組態檔的名字來定義叢集的名字,即規範了mha組態檔的名字的命名來自於叢集的名字,反推得知了組態檔的名字,就知道了叢集的名字。【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}】這個地方的filename包含了檔案的字尾。

四.平臺前端

我們是把此專案嵌入到既有的Archery平臺中,增加了3個查詢介面,介面的實現,在此就不具體展開了。

需要注意的是,介面需要支援模糊查詢,例如支援MHA Manager Server IP查詢(方便查詢各個Manager節點上有多少叢集);支援叢集名字的模糊查詢;支援節點serverIP的模糊查詢。

五.補充說明

Q.1 為什麼用MySQL儲存資訊,ELK是更成熟的架構啊?

是的,用elasticsearch來儲存這種文字資訊更常見。我們用MySQL替代elasticsearch基於以下考慮:(1)我們既有的管理平臺使用的是MySQL,把他們儲存到MySQL 便於整合;(2)這些資料,不僅僅是Log,還有些是基礎資料,放到MySQL便於相互管理、聚合展示(3)這是資料量並不大,例如mha.log,只有在啟動或者failover時才有變化,conf資訊也是很少的,所以,從資料量也一點考慮,也不需要儲存到MySQL。

Q.2 Logstash 可以把資料寫入到MySql中嗎?

是可以的。主要是logstash-output-jdbc、logstash-codec-plain外掛的安裝。

如果是離線的環境下安裝,可以參考 《logstash 離線安裝logstash-output-jdbc》

https://blog.csdn.net/sxw1065430201/article/details/123663108

Q.3 MHA log 資料夾中 原有一個 .health ,裡面是MHA每分鐘的健康性報告,那為什麼還要自己寫Python程式獲取呢?

因為.healthy 的內容不是換行符結尾,而filebeat是以換行符來判斷的(https://www.elastic.co/guide/en/beats/filebeat/7.4/newline-character-required-eof.html 有詳細說明)。

簡單來說,filebeat讀取不了。

Q.4 MHA 健康性檢查的原理

具體的原理可以參考此文章的分析說明--《mha檢測mysql狀況方式》

https://blog.csdn.net/weixin_35411438/article/details/113455263

Q.5 歷史資料的刪除

mha log 資訊表 dbmha_log

MHA 基礎設定表 dbmha_conf_info

以上兩張表基本上很少變化,量不大,其資料無需定期刪除。

執行狀態表 dbmha_status,此表每個叢集每分鐘(具體crontab定義)都會有新資料插入,資料量增長較大,應設定定時任務,定期刪除歷史資料,例如刪除7天前的資料。