Python:操作MongoDB工具实例

前提

Python 连接MongoDB.
目前实现:
增删改查(大部分已经实例运行过).
对于聚合函数的实现并没有细化(没有提供相关示例).
官方提供了更多的方法,目前只集成了常用的部分函数(其余可以去官方网站参考).

python Logo
image-2366

源码

代码已经托管在Github[访问].

PS:2015.12.19 更新//修复数据库表创建的问题//修复数据库集合的使用问题,下面的源码已经替换为最新.
PS:2015.12.28 更新//修复查询分页的问题.
PS:2016.1.6 更新//优化查询方式,简化原有代码结构.此处使用了find()方法.原有方式不再使用.但兼容原有数据.并增加一项功能:设定返回列,或排除返回列.
(文件名:DaoUtil.py)


#!/usr/bin/env python3
# !-*-coding=utf-8-*-

"""

Python 连接MongoDB.

目前实现:
增删改查(大部分已经实例运行过).
对于聚合函数的实现并没有细化(没有提供相关示例).
官方提供了更多的方法,目前只集成了常用的部分函数(其余可以去官方网站参考).

依赖:
Python3+
MongoDB3+
pymongo3+

参考:
MongoDB官方文档:
https://docs.mongodb.org/getting-started/python/

pymongo下载提示页面:
https://pypi.python.org/pypi/pymongo/

相关运行信息:
运行平台:Linux
MongoDB版本:3.03
Python版本:Python 3.4.3
pymongo(Python连接MongoDB的驱动)版本:3.2

author : prd
version : 0.9 2015.12.11
2015.12.18 修复几个问题:
修改原有的获取数据库名称方式不正确.
修改原有获取数据库集合方式的问题.
2015.12.28 修复一个问题:
在查询时,进行分页之前的方式,无法进行正常分页.
之前的错误原因(现在已经修复):满足其一条件将不再往下执行,现在已修改方式.
举例:
错误的:
if page:
pass
if pageSize:
pass
if page and pageSize:
pass
# 若传递page和pageSize也无法正常获取数据.因为判断完第一个将不再继续执行.

正确的:
if page and pageSize:
pass
if page:
pass
if pageSize:
pass
2016.1.6 更新:
优化查询方式,简化原有代码结构.此处使用了find()方法.原有方式不再使用.但兼容原有数据.
并增加一项功能:设定返回列,或排除返回列.
更多细节:http://api.mongodb.org/python/current/api/pymongo/collection.html?highlight=find#pymongo.collection.Collection.find
2016.1.10 v2.0.0 更新:
为更新操作,新增参数(可选,若文档不存在时,是否进行新增).
查询时的参数:dataLimit,修改默认为0(之前默认为None会导致错误!).

email : pruidong#gmail.com

"""
import pymongo

class PyDaoUtil(object):
dbname = None
collection = None

def __init__(self, dbname, collection):
self.dbname = dbname
self.collection = collection

""" 返回MongoDB客户端对象. """

def getClient(self):
return pymongo.MongoClient("localhost", 27017)

""" 返回MongoDB的一个数据库 """

def getDB(self):
client = self.getClient()
# 修改原有的获取数据库名称方式不正确. TODO 2015.12.18
dbdatabase = client['%s' % (self.dbname)]
return dbdatabase

"""
添加数据

可新增单条或者多条.

单条:json对象,
多条:json数组.

"""

def insertData(self, bsonData):
if self.dbname:
db = self.getDB()
collections = self.collection
if isinstance(bsonData, list):
# 修改原有获取数据库集合方式的问题. TODO 2015.12.18 get_collection(collections)
result = db.get_collection(collections).insert_many(bsonData)
return result.inserted_ids
return db.get_collection(collections).insert_one(bsonData).inserted_id
else:
return None

"""
删除数据!!!!

关键字参数:
oneDeleteFilter->单条删除
manyDeleteFilter->多条删除

"""

def deleteData(self, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()

def deleteOne(self, oneDeleteFilter=None): # 单个删除
result = db.get_collection(collections).delete_one(oneDeleteFilter)
return result.deleted_count

def deleteMany(self, manyDeleteFilter=None): # 全部删除
result = db.get_collection(collections).delete_many(manyDeleteFilter)
return result.deleted_count

onedel = kwargs.get("oneDeleteFilter", "")
manydel = kwargs.get("manyDeleteFilter", "")
if onedel:
return deleteOne(self, **kwargs)
elif manydel:
return deleteMany(self, **kwargs)

"""
更新数据

oldData:过滤原有数据,

关键字参数:

oneUpdate->单条更新
oneUpsert->True:如果文档不存在,执行插入.
(默认)False:如果文档不存在不执行执行操作.
manyUpdate->多条更新(如果过滤条件结果存在多条)
manUpsert->True:如果文档不存在,执行插入.
(默认)False:如果文档不存在不执行执行操作.

"""

def updateData(self, oldData=None, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()

def updateOne(self, oneOldData=None, oneUpdate=None,oneUpsert=False): # 单个更新
result = db.get_collection(collections).update_one(filter=oneOldData,update=oneUpdate,upsert=oneUpsert)
return result.matched_count

def updateMany(self, manyOldData, manyUpdate=None,manUpsert=False): # 全部更新
result = db.get_collection(collections).update_many(filter=manyOldData,update=manyUpdate,upsert=manUpsert)
return result.matched_count

if oldData:
oneup = kwargs.get("oneUpdate", "")
manyup = kwargs.get("manyUpdate", "")
if oneup:
return updateOne(self, oldData, **kwargs)
elif manyup:
return updateMany(self, oldData, **kwargs)

"""
查询数据

关键字参数:

dataLimit - > 限定最多返回多少条.
dataSkip - > 跳过多少条.
dataQuery - > 查询限定条件
dataSortQuery -> 排序条件
dataProjection -> 返回指定的列或者排除指定的列

----------------------------------------------------------------

oneDataQuery -> 查询单条条件 -- 弃用!!!!

"""

def findAll(self, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()

def findAllDataQuery(self, dataLimit=0, dataSkip=0, dataQuery=None, dataSortQuery=None,
dataProjection=None):
'''
TODO :
2016.1.6 更新:
优化查询方式,简化原有代码结构.此处使用了find()方法.原有方式不再使用.但兼容原有数据.
并增加一项功能:设定返回列,或排除返回列.
更多细节:http://api.mongodb.org/python/current/api/pymongo/collection.html?highlight=find#pymongo.collection.Collection.find
'''
return db.get_collection(collections).find(filter=dataQuery, projection=dataProjection, skip=dataSkip,
limit=dataLimit, sort=dataSortQuery)

return findAllDataQuery(self, **kwargs)

"""
聚合函数.

具体参考:
https://docs.mongodb.org/manual/meta/aggregation-quick-reference/

"""

def aggregation(self, aggreg):
if self.dbname:
collections = self.collection
db = self.getDB()
return db.get_collection(collections).aggregate(aggreg)

"""
统计数据库中的数量,

关键字参数参考:
http://api.mongodb.org/python/current/api
/pymongo/collection.html
?_ga=1.21101243.1811534224.1449817089#pymongo.collection.Collection.count

"""

def countData(self, countQuery=None, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()
if countQuery and kwargs:
return db.get_collection(collections).count(countQuery, kwargs)
elif countQuery:
return db.get_collection(collections).count(countQuery)
elif kwargs:
return db.get_collection(collections).count(filter=None, **kwargs)
else:
return db.get_collection(collections).count()

""" 删除集合中所有数据!!!!谨慎调用!!!! """

def dropAllData(self, dataPassword=None):
if self.dbname:
collections = self.collection
db = self.getDB()
if dataPassword and isinstance(dataPassword, list):
db.get_collection(collections).drop()

if __name__ == '__main__':
# dao = PyDaoUtil("test", "my_collection")
# # 新增数据.
# dao.insertData(
# {
# "title": "MongoDB Overview",
# "description": "MongoDB is no sql database",
# "by": "百度一下你就知道",
# "url": "http://www.baidu.com",
# "tags": [
# "mongodb",
# "database",
# "NoSQL"
# ],
# "likes": 100
# })#单个新增.

# print(dao.insertData([{"a": "67"}, {"a": "67"}, {"x": "5"}])) # 批量新增.

# 新增数据.END.

# 查询数据

# print(dao.findAll())
# print(dao.findAll(dataLimit=2, dataSkip=1))
# print(dao.findAll(dataSkip=1))
# print(dao.findAll(dataLimit=2))
# print(dao.findAll(oneDataQuery={"x": "67"}))
# print(dao.findAll(dataSortQuery=[("x", "55")]))
# print(dao.findAll(dataQuery={"x": "55"}, dataSortQuery=[("x", "55")]))

# 查询数据END.

# 删除数据

# print(dao.deleteData(manyDeleteFilter={"a": "10000"}))
# print(dao.deleteData(oneDeleteFilter={"a": "10000"}))

# 删除数据END.

# 更新数据

# print(dao.insertData([{"a": "67"}, {"a": "67"}, {"x": "5"}])) # 批量新增.
# print(dao.updateData(oldData={"x": "5"}, oneUpdate={'$set': {"x": "300"}}))
# print(dao.updateData(oldData={"a": "67"}, manyUpdate={'$set': {"a": "10000"}}))

# 更新数据END.

# 清除所有数据[不限定条件,直接删除]
"""

警告:会删除所有数据,请谨慎调用!!!!!!!!!!!!!!!!!!

"""
# 清除所有数据END.
# dao.dropAllData(dataPassword=[123])
# 查询显示所有数据.
# print(dao.insertData([{"测试": "测试"}, {"a": "67"}, {"x": "5"}]))

# print(dao.findAll())
# 获得数据库中一共有多少条数据.
# print(dao.countData())
# print(dao.countData(countQuery={"a":"10000"}))

《Python:操作MongoDB工具实例》上有1条评论

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据