本文首發於公眾號:Hunter後端
原文連結:Python連線es筆記三之es更新操作
這一篇筆記介紹如何使用 Python 對資料進行更新操作。
對於 es 的更新的操作,不用到 Search() 方法,而是直接使用 es 的連線加上相應的函數來操作,本篇筆記目錄如下:
如果使用的是之前的全域性建立連線的方式:
from elasticsearch_dsl import connections
connections.configure(
default={"hosts": "localhost:9200"},
)
我們可以根據別名獲取相應的連線:
conn = connections.connections.get_connection("default")
或者我們直接使用 elasticsearch.Elasticsearch 模組來重新建立一個連線:
from elasticsearch import Elasticsearch
conn = Elasticsearch(hosts="localhost:9200")
前面介紹過,我們安裝 elasticsearch_dsl 依賴的時候,會自動為我們安裝上相應的 elasticsearch 模組,我們這裡直接使用即可。
然後通過 conn 連線可以直接對資料進行更新,可用的方法有 update(),update_by_query() 以及一個批次的 bulk() 方法。
update() 函數一般只用於指定 id 的更新操作,如果我們知道一條資料的 id,我們可以直接使用 update()。
比如對於 exam 這個 index 下 id=18 的資料,我們想要更新它的 name 欄位和 address 欄位分別為 王五和湖南省,我們可以如下操作:
conn.update(
index="exam",
id=18,
body={
"doc": {
"name": "王五2",
"address": "湖南省",
}
}
)
在上面的操作中,index 為指定的索引,id 引數為我們需要更新的 id,body 內 doc 下的欄位即為我們要更新的資料。
update_by_query() 函數不侷限於 id 的查詢更新,我們可以更新任意符合條件的資料,以下是一個簡單的範例:
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "張三丰"}
},
"script": {
"source": "ctx._source.address = params.address",
"params": {
"address": "新地址",
}
}
}
)
在這裡,index 引數還是指向對應的索引,body 內包含了需要更新查詢的條件,這裡都在 query 引數內,需要更新的資料在 script 下,通過指令碼的形式來操作更新。
這裡注意下,我這裡用到的是 7.6.0 版本,所以 script 下使用的 source,更低一點版本用的欄位可能是 inline,這裡使用對應版本的引數即可。
在 script.source 中,內容為 ctx._source.address = params.address
,意思是將符合條件資料的 address 欄位內容更新為 params 的 address 的資料。
如果想要更改其他欄位內容,注意前面 ctx._source 為固定寫法,只需要更改後面的欄位名即可。
在 script.params 中,我們則可以定義各種對應的欄位及其內容。
如果我們想同時更新多個欄位,比如說符合條件的資料將 address 改為 新地址
,將 age 欄位改為 28,我們則需要將多個條件在 script.source 中使用分號 ;
連線起來,範例如下:
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "新張三丰2"}
},
"script": {
"source": "ctx._source.address = params.address; ctx._source.age = params.age",
"params": {
"address": "新地址3",
"age": "28"
}
}
}
)
雖然這裡更新多個欄位需要使用分號連線,但是在實際的程式碼中我們不用這麼寫死,比如說我們需要更改三個欄位,為 ["address", "name", "age"]
,我們如下操作:
field_list = ["address", "name", "age"]
source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]
params = {
"address": "新地址3",
"age": "28",
"name": "new name"
}
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "新張三丰3"}
},
"script": {
"source": ";".join(source_list),
"params": params
}
}
)
如果我們想批次更新一批資料,這批資料各個欄位的值都不一致,自定義的程度很大,使用 update_by_query() 函數已經不現實了,怎麼辦?
好解決,我們可以使用 helpers.bulk() 批次更新方法。
首先引入這個模組:
from elasticsearch import helpers
假設我們系統裡現在有 id 為 21,23,24 的幾條資料,還是在 exam 這個索引下,我們來構造幾條需要更新的資料來操作:
action_1 = {
"_op_type": "update",
"_index": "exam",
"_id": 21,
"doc": {"age": 19, "name": "令狐沖", "address": "華山派"},
}
action_2 = {
"_op_type": "update",
"_index": "exam",
"_id": 23,
"doc": {"age": 20, "name": "楊過", "address": "終南山"},
}
action_3 = {
"_op_type": "update",
"_index": "exam",
"_id": 24,
"doc": {"age": 21, "name": "張無忌", "address": "武當"},
}
action_list = [action_1, action_2, action_3]
helpers.bulk(conn, actions=action_list)
對於每一條需要更新的資料,有這幾個引數:
_op_type:如果是更新操作,其值則是 update
_index:表示需要更新的資料所在的索引,這裡是 exam
_id:表示這條需要更新的資料的 id
doc:是一個 dict 資料,其下包含了需要更新的欄位及其對應的值
至此,一條需要更新的資料的結構就構造完畢了。
然後對於 helpers.bulk() 函數,接收的第一個引數為 es 連線,actions 引數是一個列表,其內容就是我們前面構造的資料的集合。
然後執行這個操作就可以發現 es 中對應的值已經更改了。
UpdateByQuery() 函數來源於 elasticsearch_dsl 模組,它的使用和 Search() 方法差不多,都是通過 using 和 index 引數來獲取 es 連線和索引:
from elasticsearch_dsl import connections
from elasticsearch_dsl import UpdateByQuery
from elasticsearch_dsl import Q as ES_Q
connections.configure(
default={"hosts": "localhost:9200"},
)
ubq = UpdateByQuery(using="default", index="exam")
使用這個方法更新資料的具體語法和 update_by_query 差不多,都是通過 script 的方式來操作,以下是一個簡單範例:
ubq = UpdateByQuery(using="default", index="exam")
q1 = ES_Q("term", name="郭靖")
ubq = ubq.query(q1)
ubq = ubq.script(
source="ctx._source.address=params.address",
params={
"address": "襄陽城"
}
)
ubq.execute()
與 Search() 函數一樣,都需要通過 execute() 函數來向 es 提交資料。
如果想獲取更多後端相關文章,可掃碼關注閱讀: