Python連線MongoDB操作


本教學的目的是介紹如何使用Python連線MongoDB資料庫,並通過PyMongo操作MongoDB資料庫。

1.安裝PyMongo

注意:請勿安裝「bson」軟體包。 PyMongo配有自己的bson包; 執行「pip install bson」或「easy_install bson」則會安裝與PyMongo不相容的第三方軟體包。

使用pip安裝

我們建議在所有平台上使用pip來安裝pymongo:

C:\Users\Administrator>python -m pip install pymongo
Collecting pymongo
  Downloading pymongo-3.4.0.tar.gz (583kB)
... ....
Installing collected packages: pymongo
  Running setup.py install for pymongo ... done
Successfully installed pymongo-3.4.0

要獲得pymongo的特定版本:

$ python -m pip install pymongo==3.1.1

要升級pymongo的版本:

$ python -m pip install --upgrade pymongo

Python版本依賴

PyMongo支援CPython 2.6,2.7,3.3+,PyPy和PyPy3。

GSSAPI和TLS的可選依賴關係:

GSSAPI認證需要Windows上的Unix或WinKerberos上的pykerberos。PyMongo可以自動安裝正確的依賴關係:

$ python -m pip install pymongo[gssapi]

2.使用MongoClient建立連線

使用PyMongo時,第一步是執行 mongod 範例建立一個MongoClient。如下:

from pymongo import MongoClient
client = MongoClient()

上述程式碼將連線預設主機和埠。 也可以明確指定主機和埠,如下所示:

from pymongo import MongoClient
#client = MongoClient()
client = MongoClient('localhost', 27017)

或使用MongoDB URI格式:

client = MongoClient('mongodb://localhost:27017/')

3.獲取資料庫

MongoDB的一個範例可以支援多個獨立的資料庫。 在使用PyMongo時,可以使用MongoClient範例上的屬性的方式來存取資料庫:

db = client.pythondb

如果資料庫名稱使用屬性方式存取無法正常工作(如:python-db),則可以使用字典樣式存取:

db = client['python-db']

4.獲取集合

集合是儲存在MongoDB中的一組文件,可以類似於關聯式資料庫中的表。 在PyMongo中獲取集合的工作方式與獲取資料庫相同:

collection = db.python_collection

或(使用字典方式存取):

collection = db['python-collection']

MongoDB中關於集合(和資料庫)的一個重要注意事項是它們是懶建立的 - 上述任何命令都沒有在MongoDB伺服器上實際執行任何操作。當第一個文件插入集合時才建立集合和資料庫。

集合是儲存在MongoDB中的一組文件,可以被認為大致相當於關聯式資料庫中的表。 在PyMongo中獲取集合的工作方式與獲取資料庫相同:

5.文件

MongoDB中的資料使用JSON方式來表示文件(並儲存)。 在PyMongo中使用字典來表示文件。例如,以下字典可能用於表示部落格文章:

import datetime
from pymongo import MongoClient
client = MongoClient()

post = {"author": "Mike",
         "text": "My first blog post!",
         "tags": ["mongodb", "python", "pymongo"],
         "date": datetime.datetime.utcnow()}
`

6.插入文件

要將文件插入到集合中,可以使用insert_one()方法:

#!/usr/bin/python3
#coding=utf-8

import datetime
from pymongo import MongoClient
client = MongoClient()

db = client.pythondb

post = {"author": "Maxsu",
         "text": "My first blog post!",
         "tags": ["mongodb", "python", "pymongo"],
         "date": datetime.datetime.utcnow()}

posts = db.posts
post_id = posts.insert_one(post).inserted_id
print ("post id is ", post_id)

執行上面程式碼,得到以下結果 -

post id is  595965fe4959eb09c4451091

插入文件時,如果文件尚未包含「_id」鍵,則會自動新增「_id」。 「_id」的值在集合中必須是唯一的。 insert_one()返回一個InsertOneResult的範例。 有關「_id」的更多資訊,請參閱有關_id文件

插入第一個文件後,實際上已經在伺服器上建立了貼文(posts)集合。可以列出資料庫中的所有集合:

#!/usr/bin/python3
#coding=utf-8

import datetime
from pymongo import MongoClient
client = MongoClient()

db = client.pythondb

"""
post = {"author": "Maxsu",
         "text": "My first blog post!",
         "tags": ["mongodb", "python", "pymongo"],
         "date": datetime.datetime.utcnow()}

posts = db.posts
post_id = posts.insert_one(post).inserted_id
print ("post id is ", post_id)
"""
cur_collection = db.collection_names(include_system_collections=False)

print("cur_collection is :", cur_collection)

執行上面程式碼,得到以下結果 -

cur_collection is : ['posts']

7.使用find_one()獲取單個文件

MongoDB中執行的最基本的查詢型別是find_one()。 此方法返回與查詢匹配的單個文件(如果沒有匹配,則返回None)。 當知道只有一個匹配的文件,或只對第一個匹配感興趣時則可考慮使用find_one()方法。下面範例中使用find_one()從貼文(posts)集中獲取第一個文件:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb
'''
post = {"author": "Maxsu",
         "text": "My first blog post!",
         "tags": ["mongodb", "python", "pymongo"],
         "date": datetime.datetime.utcnow()}
'''
posts = db.posts
#post_id = posts.insert_one(post).inserted_id
#print ("post id is ", post_id)

pprint.pprint(posts.find_one())

執行上面程式碼,得到以下結果 -

{'_id': ObjectId('595965fe4959eb09c4451091'),
 'author': 'Maxsu',
 'date': datetime.datetime(2017, 7, 2, 21, 30, 38, 402000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}

結果是匹配之前插入的字典格式(Json)。注意: 返回的文件包含一個「_id」,它是在插入時自動新增的。

find_one()方法還支援查詢結果文件必須匹配的特定元素。要查詢作者是「Maxsu」的文件,可以指定查詢的條件,如下所示:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb
post = {"author": "Minsu",
         "text": "This blog post belong to Minsu!",
         "tags": ["MySQL", "Oracle", "pymongo"],
         "date": datetime.datetime.utcnow()}

posts = db.posts
post_id = posts.insert_one(post).inserted_id

post = posts.find_one({"author": "Maxsu"})
pprint.pprint(post)
#print (post)

執行上面程式碼,得到以下結果 -

{'_id': ObjectId('595965fe4959eb09c4451091'),
 'author': 'Maxsu',
 'date': datetime.datetime(2017, 7, 2, 21, 30, 38, 402000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}

8.通過ObjectId查詢

也可以通過它的_id找到一個貼文(post),下面的範例子中演示如何根據給定的一個ObjectId查詢資料:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb

post = {"_id": 100,
         "author": "Kuber",
         "text": "This is is my first post!",
         "tags": ["Docker", "Shell", "pymongo"],
         "date": datetime.datetime.utcnow()}

posts = db.posts
post_id = posts.insert_one(post).inserted_id

print("post_id is :", post_id)
post = posts.find_one({"_id": post_id})
print("Find By Post ID:")
pprint.pprint(post)
#print (post)

執行上面程式碼,得到以下結果 -

post_id is : 100
Find By Post ID:
{'_id': 100,
 'author': 'Kuber',
 'date': datetime.datetime(2017, 7, 3, 14, 14, 8, 28000),
 'tags': ['Docker', 'Shell', 'pymongo'],
 'text': 'This is is my first post!'}

Web應用程式中的常見任務是從請求URL獲取ObjectId並找到匹配的文件。 在這種情況下,必須將ObjectId從一個字串轉換到find_one()

from bson.objectid import ObjectId

# The web framework gets post_id from the URL and passes it as a string
def get(post_id):
    # Convert from string to ObjectId:
    document = client.db.collection.find_one({'_id': ObjectId(post_id)})

9.關於Unicode字串的注釋

您可能已經注意到,我們先前儲存的常規Python字串在從伺服器檢索時看起來是不同的(例如,u’Mike而不是「Mike」)。一個簡短的解釋是有序的字串。

MongoDB以BSON格式儲存資料。BSON字串是UTF-8編碼的,所以PyMongo必須確保它儲存的任何字串只包含有效的UTF-8資料。 常規字串(<type'str'>)被驗證並儲存不變。 Unicode字串(<type'unicode'>)首先被編碼為UTF-8。 我們的範例字串在Python shell中表示為u'Mike而不是「Mike」的原因是PyMongo將每個BSON字串解碼為Python unicode字串,而不是常規str。

10.批次插入

為了執行更複雜一些的查詢,我們再插入一些文件。 除了插入單個文件外,還可以通過將列表作為第一個引數傳遞給insert_many()來執行批次插入操作。 這將在列表中插入每個文件,只向伺服器傳送一個命令:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb

new_posts = [{"_id": 1000,
               "author": "Curry",
               "text": "Another post!",
               "tags": ["bulk", "insert"],
               "date": datetime.datetime(2017, 11, 12, 11, 14)},
              {"_id": 1001,"author": "Maxsu",
               "title": "MongoDB is fun",
               "text": "and pretty easy too!",
               "date": datetime.datetime(2019, 11, 10, 10, 45)}]

posts = db.posts
result = posts.insert_many(new_posts)
print("Bulk Inserts Result is :", result.inserted_ids)
#print (post)

執行上面程式碼,得到以下結果 -

Bulk Inserts Result is : [1000, 1001]

有幾個有趣的事情要注意這個例子:

  • insert_many()的結果現在返回兩個ObjectId範例,每個ID表示插入的一個文件。
  • new_posts[1]具有與其他貼文不同的「形狀」(資料結構) - 沒有「tags」欄位,新增了一個新欄位「title」。MongoDB是無模式的,表示的就是這個意思。

11.查詢多個文件

要查詢獲得超過單個文件作為查詢的結果,可使用find()方法。find()返回一個Cursor範例,它允許遍歷所有匹配的文件。如下範例,遍歷貼文集合中的每個文件:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb

posts = db.posts
for post in posts.find():
    pprint.pprint(post)

執行上面程式碼,得到以下結果 -

{'_id': ObjectId('595965fe4959eb09c4451091'),
 'author': 'Maxsu',
 'date': datetime.datetime(2017, 7, 2, 21, 30, 38, 402000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}
{'_id': 100,
 'author': 'Kuber',
 'date': datetime.datetime(2017, 7, 3, 14, 14, 8, 28000),
 'tags': ['Docker', 'Shell', 'pymongo'],
 'text': 'This is is my first post!'}
{'_id': 1000,
 'author': 'Curry',
 'date': datetime.datetime(2017, 11, 12, 11, 14),
 'tags': ['bulk', 'insert'],
 'text': 'Another post!'}
{'_id': 1001,
 'author': 'Maxsu',
 'date': datetime.datetime(2019, 11, 10, 10, 45),
 'text': 'and pretty easy too!',
 'title': 'MongoDB is fun'}

類似使用find_one()一樣,我們可以將文件傳遞給find()來限制返回的結果。 在這裡,只希望得到作者是「Maxsu」的文件:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb

posts = db.posts
for post in posts.find({"author": "Maxsu"}):
    pprint.pprint(post)

執行上面的程式碼,得到以下結果 -

{'_id': ObjectId('595965fe4959eb09c4451091'),
 'author': 'Maxsu',
 'date': datetime.datetime(2017, 7, 2, 21, 30, 38, 402000),
 'tags': ['mongodb', 'python', 'pymongo'],
 'text': 'My first blog post!'}
{'_id': 1001,
 'author': 'Maxsu',
 'date': datetime.datetime(2019, 11, 10, 10, 45),
 'text': 'and pretty easy too!',
 'title': 'MongoDB is fun'}

12.計數統計

如果只想知道有多少文件匹配查詢,可以執行count()方法操作,而不是一個完整的查詢。 可以得到一個集合中的所有文件的計數:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb
posts = db.posts

print("posts count is = ", posts.count())

print("posts's author is Maxsu count is =", posts.find({"author": "Maxsu"}).count())

執行上面程式碼,得到以下結果 -

posts count is =  4
posts's author is Maxsu count is = 2

13.範圍查詢

MongoDB支援許多不同型別的高階查詢。例如,可以執行一個查詢,將結果限制在位元定日期更早的貼文,而且還可以按作者對結果進行排序:

#!/usr/bin/python3
#coding=utf-8

import datetime
import pprint
from pymongo import MongoClient

client = MongoClient()

db = client.pythondb
posts = db.posts

d = datetime.datetime(2019, 11, 12, 12)
for post in posts.find({"date": {"$lt": d}}).sort("author"):
    pprint.pprint(post)

這裡使用特殊的「$lt」運算子做範圍查詢,並且還可以呼叫sort()來按作者對結果進行排序。

14.索引

新增索引可以幫助加速某些查詢,並且還可以新增額外的功能來查詢和儲存文件。在這個例子中,將演示如何在一個鍵上建立一個唯一的索引,該索引將拒絕已經存在值的文件插入。

首先,我們建立索引:

result = db.profiles.create_index([('user_id', pymongo.ASCENDING)], unique=True)
sorted(list(db.profiles.index_information()))

請注意,現在有兩個索引:一個是MongoDB自動建立的在_id索引,另一個是剛剛建立在user_id上的索引。

現在來設定一些使用者組態檔案:

user_profiles = [{'user_id': 211, 'name': 'Luke'},{'user_id': 212, 'name': 'Ziltoid'}]
result = db.profiles.insert_many(user_profiles)

該索引將阻止 user_id 已經在集合中的文件插入:

new_profile = {'user_id': 213, 'name': 'Drew'}
duplicate_profile = {'user_id': 212, 'name': 'Tommy'}
result = db.profiles.insert_one(new_profile)  # This is fine.
result = db.profiles.insert_one(duplicate_profile)
## 出現錯誤提示...
Traceback (most recent call last):
DuplicateKeyError: E11000 duplicate key error index: test_database.profiles.$user_id_1 dup key: { : 212 }