aPython 驱动MongoDB 两种推荐方式 - MongoDB Python Drivers:
这里简单汇总下 Motor 异步驱动 MongoDB. 文档可参考 Motor(Async Driver).
Motor是一个异步 mongodb driver,支持异步读写mongodb. 通常用在基于Tornado的异步web服务器中;同时支持使用asyncio(Python3.4以上标准库)作为异步模型,使用起来十分方便.
1. motor 安装
pip install motor
1.1. 连接 MongoDB Atlas
连接 MongoDB Atlas 集群,如,
import motor
client = motor.motor_tornado.MotorClient(
"mongodb+srv://<username>:<password>@<cluster-url>/test?retryWrites=true&w=majority")
db = client.test
2. Motor With Tornado
https://motor.readthedocs.io/en/stable/tutorial-tornado.html
pip install tornado motor
类似于 Pymongo,Motor 将数据表示为 4 层对象层:
[1] - MotorClient
[2] - MotorDatabase
[3] - MotorCollection
[4] - MotorCursor
2.1. Client 客户端
import motor.motor_tornado
client = motor.motor_tornado.MotorClient()
client = motor.motor_tornado.MotorClient('localhost', 27017)
client = motor.motor_tornado.MotorClient('mongodb://localhost:27017')
#副本集(replica set)
client = motor.motor_tornado.MotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')
2.2. Database 数据库
db = client.test_database
db = client['test_database']
2.3. Tornado 应用程序启动
#1.MotoClient 并未真正连接到 server,初始化连接
db = motor.motor_tornado.MotorClient().test_database
application = tornado.web.Application([
(r'/', MainHandler)
], db=db)
application.listen(8888)
tornado.ioloop.IOLoop.current().start()
class MainHandler(tornado.web.RequestHandler):
def get(self):
db = self.settings['db']
或,
db = motor.motor_tornado.MotorClient().test_database
# Create the application before creating a MotorClient.
application = tornado.web.Application([
(r'/', MainHandler)
])
server = tornado.httpserver.HTTPServer(application)
server.bind(8888)
# Forks one process per CPU.
server.start(0)
# Now, in each child process, create a MotorClient.
application.settings['db'] = MotorClient().test_database
IOLoop.current().start()
2.4. Collection
collection = db.test_collection
#或
collection = db['test_collection']
2.5. 插入数据
async def do_insert():
document = {'key': 'value'}
result = await db.test_collection.insert_one(document)
print('result %s' % repr(result.inserted_id))
#
IOLoop.current().run_sync(do_insert)
示例,
async def do_insert():
for i in range(2000):
await db.test_collection.insert_one({'i': i})
#
IOLoop.current().run_sync(do_insert)
效率更高的方式,
async def do_insert():
result = await db.test_collection.insert_many(
[{'i': i} for i in range(2000)])
print('inserted %d docs' % (len(result.inserted_ids),))
#
IOLoop.current().run_sync(do_insert)
2.6. 查询单条数据 find_one
async def do_find_one():
document = await db.test_collection.find_one({'i': {'$lt': 1}})
pprint.pprint(document)
#
IOLoop.current().run_sync(do_find_one)
2.7. 查询多条数据 find
如,
async def do_find():
cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i')
for document in await cursor.to_list(length=100):
pprint.pprint(document)
#
IOLoop.current().run_sync(do_find)
2.8. 统计数据 count
async def do_count():
n = await db.test_collection.count_documents({})
print('%s documents in collection' % n)
n = await db.test_collection.count_documents({'i': {'$gt': 1000}})
print('%s documents where i > 1000' % n)
#
IOLoop.current().run_sync(do_count)
2.9. 更新数据
替换:
async def do_replace():
coll = db.test_collection
old_document = await coll.find_one({'i': 50})
print('found document: %s' % pprint.pformat(old_document))
_id = old_document['_id']
result = await coll.replace_one({'_id': _id}, {'key': 'value'})
print('replaced %s document' % result.modified_count)
new_document = await coll.find_one({'_id': _id})
print('document is now %s' % pprint.pformat(new_document))
#
IOLoop.current().run_sync(do_replace)
更新:
async def do_update():
coll = db.test_collection
result = await coll.update_one({'i': 51}, {'$set': {'key': 'value'}})
print('updated %s document' % result.modified_count)
new_document = await coll.find_one({'i': 51})
print('document is now %s' % pprint.pformat(new_document))
#
IOLoop.current().run_sync(do_update)
#await coll.update_many({'i': {'$gt': 100}},
# {'$set': {'key': 'value'}})
2.10. 删除数据
async def do_delete_many():
coll = db.test_collection
n = await coll.count_documents({})
print('%s documents before calling delete_many()' % n)
result = await db.test_collection.delete_many({'i': {'$gte': 1000}})
print('%s documents after' % (await coll.count_documents({})))
#
IOLoop.current().run_sync(do_delete_many)
3. Motor With asyncio
https://motor.readthedocs.io/en/stable/tutorial-asyncio.html
类似于 Pymongo,Motor 将数据表示为 4 层对象层:
[1] - AsyncIOMotorClient
[2] - AsyncIOMotorDatabase
[3] - AsyncIOMotorCollection
[4] - AsyncIOMotorCursor
3.1. Client 客户端
import motor.motor_asyncio
client = motor.motor_asyncio.AsyncIOMotorClient()
client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017)
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
#副本集(replica set)
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://host1,host2/?replicaSet=my-replicaset-name')
3.2. Database 数据库
db = client.test_database
db = client['test_database']
3.3. Collection
collection = db.test_collection
collection = db['test_collection']
3.4. 插入数据
插入单条数据,
async def do_insert():
document = {'key': 'value'}
result = await db.test_collection.insert_one(document)
print('result %s' % repr(result.inserted_id))
#
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(do_insert())
插入多条数据,
async def do_insert():
result = await db.test_collection.insert_many(
[{'i': i} for i in range(2000)])
print('inserted %d docs' % (len(result.inserted_ids),))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_insert())
3.5. 查询单条数据 find_one
async def do_find_one():
document = await db.test_collection.find_one({'i': {'$lt': 1}})
pprint.pprint(document)
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_find_one())
3.6. 查询多条数据 find
async def do_find():
cursor = db.test_collection.find({'i': {'$lt': 5}}).sort('i')
for document in await cursor.to_list(length=100):
pprint.pprint(document)
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_find())
3.7. 统计数据
async def do_count():
n = await db.test_collection.count_documents({})
print('%s documents in collection' % n)
n = await db.test_collection.count_documents({'i': {'$gt': 1000}})
print('%s documents where i > 1000' % n)
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_count())
3.8. 更新数据
async def do_replace():
coll = db.test_collection
old_document = await coll.find_one({'i': 50})
print('found document: %s' % pprint.pformat(old_document))
_id = old_document['_id']
result = await coll.replace_one({'_id': _id}, {'key': 'value'})
print('replaced %s document' % result.modified_count)
new_document = await coll.find_one({'_id': _id})
print('document is now %s' % pprint.pformat(new_document))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_replace())
#await coll.update_many({'i': {'$gt': 100}},
# {'$set': {'key': 'value'}})
3.9. 删除数据
async def do_delete_many():
coll = db.test_collection
n = await coll.count_documents({})
print('%s documents before calling delete_many()' % n)
result = await db.test_collection.delete_many({'i': {'$gte': 1000}})
print('%s documents after' % (await coll.count_documents({})))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_delete_many())
3.10 示例 - 插入数据
import asyncio
import motor.motor_asyncio
#
client = motor.motor_asyncio.AsyncIOMotorClient('localhost', 27017)
#连接数据库
db = client.test_database
async def do_insert():
result = await db.lx.insert_many(
[{'i': i} for i in range(20)])
# insert_many可以插入一条或多条数据,必须以列表(list)的形式组织数据
print('inserted %d docs' % (len(result.inserted_ids),))
#
loop = asyncio.get_event_loop()
loop.run_until_complete(do_insert())
4. pymongo 和 motor with asyncio 对比
4.1. pymongo
import time
from pymongo import MongoClient
start = time.time()
connection = MongoClient('127.0.0.1',27017)
db = connection['test_database']
for doc in db.test_collection.find({}, ['_id', 'start_time', 'end_idx']):
db.test_collection.update_one({'_id': doc.get('_id')}, {
'$set': {
'end_idx': 1
}
})
print("[INFO]Timecost:", time.time() - start)
4.2. motor with asyncio
import time
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
start = time.time()
connection = AsyncIOMotorClient('127.0.0.1',27017)
db = connection['test_database']
async def run():
async for doc in db.LiePin_Analysis1.find({}, ['_id', 'start_time', 'end_idx']):
db.LiePin_Analysis1.update_one({'_id': doc.get('_id')}, {'$set': {'end_idx':0}})
#
asyncio.get_event_loop().run_until_complete(run())
print("[INFO]Timecost:", time.time() - start)