作者:謝澤華
眾所周知單個機房在出現不可抗拒的問題(如斷電、斷網等因素)時,會導致無法正常提供服務,會對業務造成潛在的損失。所以在協同辦公領域,一種可以基於同城或異地多活機制的高可用設計,在保障資料一致性的同時,能夠最大程度降低由於機房的僅單點可用所導致的潛在高可用問題,最大程度上保障業務的使用者體驗,降低單點問題對業務造成的潛在損失顯得尤為重要。
同城雙活,對於生產的高可用保障,重大的意義和價值是不可言喻的。表面上同城雙活只是簡單的部署了一套生產環境而已,但是在架構上,這個改變的影響是巨大的,無狀態應用的高可用管理、請求流量的管理、版本釋出的管理、網路架構的管理等,其提升的架構複雜度巨大。
結合真實的協同辦公產品:京辦(為北京市政府提供協同辦公服務的綜合性平臺)生產環境面對的複雜的政務網路以及京辦同城雙活架構演進的案例,給大家介紹下京辦持續改進、分階段演進過程中的一些思考和實踐經驗的總結。本文僅針對ES叢集在跨機房同步過程中的方案和經驗進行介紹和總結。
1.部署Logstash在金山雲機房上,Logstash啟動多個範例(按不同的型別分類,提高同步效率),並且和金山雲機房的ES叢集在相同的VPC
2.Logstash需要設定大網存取許可權,保證Logstash和ES原叢集和目標叢集互通。
3.資料遷移可以全量遷移和增量遷移,首次遷移都是全量遷移後續的增加資料選擇增量遷移。
4.增量遷移需要改造增加識別的增量資料的標識,具體方法後續進行介紹。
Logstash分為三個部分input 、filter、ouput:
1.input處理接收資料,資料可以來源ES,紀錄檔檔案,kafka等通道.
2.filter對資料進行過濾,清洗。
3.ouput輸出資料到目標裝置,可以輸出到ES,kafka,檔案等。
對於T時刻的資料,先使用Logstash將T以前的所有資料遷移到有孚機房京東雲ES,假設用時∆T
對於T到T+∆T的增量資料,再次使用logstash將資料匯入到有孚機房京東雲的ES叢集
重複上述步驟2,直到∆T足夠小,此時將業務切換到華為雲,最後完成新增資料的遷移
適用範圍:ES的資料中帶有時間戳或者其他能夠區分新舊資料的標籤
1.建立ECS和安裝JDK忽略,自行安裝即可
2.下載對應版本的Logstash,儘量選擇與Elasticsearch版本一致,或接近的版本安裝即可
1) 原始碼下載直接解壓安裝包,開箱即用
2)修改對記憶體使用,logstash預設的堆記憶體是1G,根據ECS叢集選擇合適的記憶體,可以加快叢集資料的遷移效率。
Logstash會幫助使用者自動建立索引,但是自動建立的索引和使用者本身的索引會有些許差異,導致最終資料的搜尋格式不一致,一般索引需要手動建立,保證索引的資料完全一致。
以下提供建立索引的python指令碼,使用者可以使用該指令碼建立需要的索引。
create_mapping.py檔案是同步索引的python指令碼,config.yaml是叢集地址組態檔。
注:使用該指令碼需要安裝相關依賴
yum install -y PyYAML
yum install -y python-requests
拷貝以下程式碼儲存為 create_mapping.py:
import yaml
import requests
import json
import getopt
import sys
def help():
print
"""
usage:
-h/--help print this help.
-c/--config config file path, default is config.yaml
example:
python create_mapping.py -c config.yaml
"""
def process_mapping(index_mapping, dest_index):
print(index_mapping)
# remove unnecessary keys
del index_mapping["settings"]["index"]["provided_name"]
del index_mapping["settings"]["index"]["uuid"]
del index_mapping["settings"]["index"]["creation_date"]
del index_mapping["settings"]["index"]["version"]
# check alias
aliases = index_mapping["aliases"]
for alias in list(aliases.keys()):
if alias == dest_index:
print(
"source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
del index_mapping["aliases"][alias]
if index_mapping["settings"]["index"].has_key("lifecycle"):
lifecycle = index_mapping["settings"]["index"]["lifecycle"]
opendistro = {"opendistro": {"index_state_management":
{"policy_id": lifecycle["name"],
"rollover_alias": lifecycle["rollover_alias"]}}}
index_mapping["settings"].update(opendistro)
# index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
del index_mapping["settings"]["index"]["lifecycle"]
print(index_mapping)
return index_mapping
def put_mapping_to_target(url, mapping, source_index, dest_auth=None):
headers = {'Content-Type': 'application/json'}
create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
if create_resp.status_code != 200:
print(
"create index " + url + " failed with response: " + str(create_resp) + ", source index is " + source_index)
print(create_resp.text)
with open(source_index + ".json", "w") as f:
json.dump(mapping, f)
def main():
config_yaml = "config.yaml"
opts, args = getopt.getopt(sys.argv[1:], '-h-c:', ['help', 'config='])
for opt_name, opt_value in opts:
if opt_name in ('-h', '--help'):
help()
exit()
if opt_name in ('-c', '--config'):
config_yaml = opt_value
config_file = open(config_yaml)
config = yaml.load(config_file)
source = config["source"]
source_user = config["source_user"]
source_passwd = config["source_passwd"]
source_auth = None
if source_user != "":
source_auth = (source_user, source_passwd)
dest = config["destination"]
dest_user = config["destination_user"]
dest_passwd = config["destination_passwd"]
dest_auth = None
if dest_user != "":
dest_auth = (dest_user, dest_passwd)
print(source_auth)
print(dest_auth)
# only deal with mapping list
if config["only_mapping"]:
for source_index, dest_index in config["mapping"].iteritems():
print("start to process source index" + source_index + ", target index: " + dest_index)
source_url = source + "/" + source_index
response = requests.get(source_url, auth=source_auth)
if response.status_code != 200:
print("*** get ElasticSearch message failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
continue
mapping = response.json()
index_mapping = process_mapping(mapping[source_index], dest_index)
dest_url = dest + "/" + dest_index
put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
print("process source index " + source_index + " to target index " + dest_index + " successed.")
else:
# get all indices
response = requests.get(source + "/_alias", auth=source_auth)
if response.status_code != 200:
print("*** get all index failed. resp statusCode:" + str(
response.status_code) + " response is " + response.text)
exit()
all_index = response.json()
for index in list(all_index.keys()):
if "." in index:
continue
print("start to process source index" + index)
source_url = source + "/" + index
index_response = requests.get(source_url, auth=source_auth)
if index_response.status_code != 200:
print("*** get ElasticSearch message failed. resp statusCode:" + str(
index_response.status_code) + " response is " + index_response.text)
continue
mapping = index_response.json()
dest_index = index
if index in config["mapping"].keys():
dest_index = config["mapping"][index]
index_mapping = process_mapping(mapping[index], dest_index)
dest_url = dest + "/" + dest_index
put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
print("process source index " + index + " to target index " + dest_index + " successed.")
if __name__ == '__main__':
main()
組態檔儲存為config.yaml:
# 源端ES叢集地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES叢集地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"
# 是否只處理這個檔案中mapping地址的索引
# 如果設定成true,則只會將下面的mapping中的索引獲取到並在目的端建立
# 如果設定成false,則會取源端叢集的所有索引,除去(.kibana)
# 並且將索引名稱與下面的mapping匹配,如果匹配到使用mapping的value作為目的端的索引名稱
# 如果匹配不到,則使用源端原始的索引名稱
only_mapping: true
# 要遷移的索引,key為源端的索引名字,value為目的端的索引名字
mapping:
source_index: dest_index
以上程式碼和組態檔準備完成,直接執行 python create_mapping.py 即可完成索引同步。
索引同步完成可以取目標叢集的kibana上檢視或者執行curl檢視索引遷移情況:
GET _cat/indices?v
Logstash設定位於config目錄下。
使用者可以參考設定修改Logstash組態檔,為了保證遷移資料的準確性,一般建議建立多組Logstash,分批次遷移資料,每個Logstash遷移部分資料。
設定叢集間遷移設定參考:
input{
elasticsearch{
# 源端地址
hosts => ["ip1:port1","ip2:port2"]
# 安全叢集設定登入使用者名稱密碼
user => "username"
password => "password"
# 需要遷移的索引列表,以逗號分隔,支援萬用字元
index => "a_*,b_*"
# 以下三項保持預設即可,包含執行緒數和遷移資料大小和logstash jvm設定相關
docinfo=>true
slices => 10
size => 2000
scroll => "60m"
}
}
filter {
# 去掉一些logstash自己加的欄位
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output{
elasticsearch{
# 目的端es地址
hosts => ["http://ip:port"]
# 安全叢集設定登入使用者名稱密碼
user => "username"
password => "password"
# 目的端索引名稱,以下設定為和源端保持一致
index => "%{[@metadata][_index]}"
# 目的端索引type,以下設定為和源端保持一致
document_type => "%{[@metadata][_type]}"
# 目標端資料的_id,如果不需要保留原_id,可以刪除以下這行,刪除後效能會更好
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
# 偵錯資訊,正式遷移去掉
stdout { codec => rubydebug { metadata => true }}
}
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html
PUT _ingest/pipeline/gmt_created_at
{
"description": "Adds gmt_created_at timestamp to documents",
"processors": [
{
"set": {
"field": "_source.gmt_created_at",
"value": "{{_ingest.timestamp}}"
}
}
]
}
GET _ingest/pipeline/*
PUT index_xxxx/_settings
{
"settings": {
"index.default_pipeline": "gmt_created_at"
}
}
GET index_xxxx/_settings
schedule-migrate.conf
index:可以使用萬用字元的方式
query: 增量同步的DSL,統一gmt_create_at為增量同步的特殊標記
schedule: 每分鐘同步一把,"* * * * *"
input {
elasticsearch {
hosts => ["ip:port"]
# 安全叢集設定登入使用者名稱密碼
user => "username"
password => "password"
index => "index_*"
query => '{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
size => 5000
scroll => "5m"
docinfo => true
schedule => "* * * * *"
}
}
filter {
mutate {
remove_field => ["source", "@version"]
}
}
output {
elasticsearch {
# 目的端es地址
hosts => ["http://ip:port"]
# 安全叢集設定登入使用者名稱密碼
user => "username"
password => "password"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
# 偵錯資訊,正式遷移去掉
stdout { codec => rubydebug { metadata => true }}
}
mapping中存在join父子型別的欄位,直接遷移報400異常
[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400,
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>],
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400,
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse",
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}
https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140 https://github.com/elastic/elasticsearch/issues/26183
結合業務特徵,通過在filter中加入小量的ruby程式碼,將_routing的值取出來,放回logstah event中,由此問題得以解決。