前提
Python 连接MongoDB.
目前实现:
增删改查(大部分已经实例运行过).
对于聚合函数的实现并没有细化(没有提供相关示例).
官方提供了更多的方法,目前只集成了常用的部分函数(其余可以去官方网站参考).
源码
代码已经托管在Github[访问].
PS:2015.12.19 更新//修复数据库表创建的问题//修复数据库集合的使用问题,下面的源码已经替换为最新.
PS:2015.12.28 更新//修复查询分页的问题.
PS:2016.1.6 更新//优化查询方式,简化原有代码结构.此处使用了find()方法.原有方式不再使用.但兼容原有数据.并增加一项功能:设定返回列,或排除返回列.
(文件名:DaoUtil.py)
#!/usr/bin/env python3
# !-*-coding=utf-8-*-
"""
Python 连接MongoDB.
目前实现:
增删改查(大部分已经实例运行过).
对于聚合函数的实现并没有细化(没有提供相关示例).
官方提供了更多的方法,目前只集成了常用的部分函数(其余可以去官方网站参考).
依赖:
Python3+
MongoDB3+
pymongo3+
参考:
MongoDB官方文档:
https://docs.mongodb.org/getting-started/python/
pymongo下载提示页面:
https://pypi.python.org/pypi/pymongo/
相关运行信息:
运行平台:Linux
MongoDB版本:3.03
Python版本:Python 3.4.3
pymongo(Python连接MongoDB的驱动)版本:3.2
author : prd
version : 0.9 2015.12.11
2015.12.18 修复几个问题:
修改原有的获取数据库名称方式不正确.
修改原有获取数据库集合方式的问题.
2015.12.28 修复一个问题:
在查询时,进行分页之前的方式,无法进行正常分页.
之前的错误原因(现在已经修复):满足其一条件将不再往下执行,现在已修改方式.
举例:
错误的:
if page:
pass
if pageSize:
pass
if page and pageSize:
pass
# 若传递page和pageSize也无法正常获取数据.因为判断完第一个将不再继续执行.
正确的:
if page and pageSize:
pass
if page:
pass
if pageSize:
pass
2016.1.6 更新:
优化查询方式,简化原有代码结构.此处使用了find()方法.原有方式不再使用.但兼容原有数据.
并增加一项功能:设定返回列,或排除返回列.
更多细节:http://api.mongodb.org/python/current/api/pymongo/collection.html?highlight=find#pymongo.collection.Collection.find
2016.1.10 v2.0.0 更新:
为更新操作,新增参数(可选,若文档不存在时,是否进行新增).
查询时的参数:dataLimit,修改默认为0(之前默认为None会导致错误!).
email : pruidong#gmail.com
"""
import pymongo
class PyDaoUtil(object):
dbname = None
collection = None
def __init__(self, dbname, collection):
self.dbname = dbname
self.collection = collection
""" 返回MongoDB客户端对象. """
def getClient(self):
return pymongo.MongoClient("localhost", 27017)
""" 返回MongoDB的一个数据库 """
def getDB(self):
client = self.getClient()
# 修改原有的获取数据库名称方式不正确. TODO 2015.12.18
dbdatabase = client['%s' % (self.dbname)]
return dbdatabase
"""
添加数据
可新增单条或者多条.
单条:json对象,
多条:json数组.
"""
def insertData(self, bsonData):
if self.dbname:
db = self.getDB()
collections = self.collection
if isinstance(bsonData, list):
# 修改原有获取数据库集合方式的问题. TODO 2015.12.18 get_collection(collections)
result = db.get_collection(collections).insert_many(bsonData)
return result.inserted_ids
return db.get_collection(collections).insert_one(bsonData).inserted_id
else:
return None
"""
删除数据!!!!
关键字参数:
oneDeleteFilter->单条删除
manyDeleteFilter->多条删除
"""
def deleteData(self, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()
def deleteOne(self, oneDeleteFilter=None): # 单个删除
result = db.get_collection(collections).delete_one(oneDeleteFilter)
return result.deleted_count
def deleteMany(self, manyDeleteFilter=None): # 全部删除
result = db.get_collection(collections).delete_many(manyDeleteFilter)
return result.deleted_count
onedel = kwargs.get("oneDeleteFilter", "")
manydel = kwargs.get("manyDeleteFilter", "")
if onedel:
return deleteOne(self, **kwargs)
elif manydel:
return deleteMany(self, **kwargs)
"""
更新数据
oldData:过滤原有数据,
关键字参数:
oneUpdate->单条更新
oneUpsert->True:如果文档不存在,执行插入.
(默认)False:如果文档不存在不执行执行操作.
manyUpdate->多条更新(如果过滤条件结果存在多条)
manUpsert->True:如果文档不存在,执行插入.
(默认)False:如果文档不存在不执行执行操作.
"""
def updateData(self, oldData=None, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()
def updateOne(self, oneOldData=None, oneUpdate=None,oneUpsert=False): # 单个更新
result = db.get_collection(collections).update_one(filter=oneOldData,update=oneUpdate,upsert=oneUpsert)
return result.matched_count
def updateMany(self, manyOldData, manyUpdate=None,manUpsert=False): # 全部更新
result = db.get_collection(collections).update_many(filter=manyOldData,update=manyUpdate,upsert=manUpsert)
return result.matched_count
if oldData:
oneup = kwargs.get("oneUpdate", "")
manyup = kwargs.get("manyUpdate", "")
if oneup:
return updateOne(self, oldData, **kwargs)
elif manyup:
return updateMany(self, oldData, **kwargs)
"""
查询数据
关键字参数:
dataLimit - > 限定最多返回多少条.
dataSkip - > 跳过多少条.
dataQuery - > 查询限定条件
dataSortQuery -> 排序条件
dataProjection -> 返回指定的列或者排除指定的列
----------------------------------------------------------------
oneDataQuery -> 查询单条条件 -- 弃用!!!!
"""
def findAll(self, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()
def findAllDataQuery(self, dataLimit=0, dataSkip=0, dataQuery=None, dataSortQuery=None,
dataProjection=None):
'''
TODO :
2016.1.6 更新:
优化查询方式,简化原有代码结构.此处使用了find()方法.原有方式不再使用.但兼容原有数据.
并增加一项功能:设定返回列,或排除返回列.
更多细节:http://api.mongodb.org/python/current/api/pymongo/collection.html?highlight=find#pymongo.collection.Collection.find
'''
return db.get_collection(collections).find(filter=dataQuery, projection=dataProjection, skip=dataSkip,
limit=dataLimit, sort=dataSortQuery)
return findAllDataQuery(self, **kwargs)
"""
聚合函数.
具体参考:
https://docs.mongodb.org/manual/meta/aggregation-quick-reference/
"""
def aggregation(self, aggreg):
if self.dbname:
collections = self.collection
db = self.getDB()
return db.get_collection(collections).aggregate(aggreg)
"""
统计数据库中的数量,
关键字参数参考:
http://api.mongodb.org/python/current/api
/pymongo/collection.html
?_ga=1.21101243.1811534224.1449817089#pymongo.collection.Collection.count
"""
def countData(self, countQuery=None, **kwargs):
if self.dbname:
collections = self.collection
db = self.getDB()
if countQuery and kwargs:
return db.get_collection(collections).count(countQuery, kwargs)
elif countQuery:
return db.get_collection(collections).count(countQuery)
elif kwargs:
return db.get_collection(collections).count(filter=None, **kwargs)
else:
return db.get_collection(collections).count()
""" 删除集合中所有数据!!!!谨慎调用!!!! """
def dropAllData(self, dataPassword=None):
if self.dbname:
collections = self.collection
db = self.getDB()
if dataPassword and isinstance(dataPassword, list):
db.get_collection(collections).drop()
if __name__ == '__main__':
# dao = PyDaoUtil("test", "my_collection")
# # 新增数据.
# dao.insertData(
# {
# "title": "MongoDB Overview",
# "description": "MongoDB is no sql database",
# "by": "百度一下你就知道",
# "url": "http://www.baidu.com",
# "tags": [
# "mongodb",
# "database",
# "NoSQL"
# ],
# "likes": 100
# })#单个新增.
# print(dao.insertData([{"a": "67"}, {"a": "67"}, {"x": "5"}])) # 批量新增.
# 新增数据.END.
# 查询数据
# print(dao.findAll())
# print(dao.findAll(dataLimit=2, dataSkip=1))
# print(dao.findAll(dataSkip=1))
# print(dao.findAll(dataLimit=2))
# print(dao.findAll(oneDataQuery={"x": "67"}))
# print(dao.findAll(dataSortQuery=[("x", "55")]))
# print(dao.findAll(dataQuery={"x": "55"}, dataSortQuery=[("x", "55")]))
# 查询数据END.
# 删除数据
# print(dao.deleteData(manyDeleteFilter={"a": "10000"}))
# print(dao.deleteData(oneDeleteFilter={"a": "10000"}))
# 删除数据END.
# 更新数据
# print(dao.insertData([{"a": "67"}, {"a": "67"}, {"x": "5"}])) # 批量新增.
# print(dao.updateData(oldData={"x": "5"}, oneUpdate={'$set': {"x": "300"}}))
# print(dao.updateData(oldData={"a": "67"}, manyUpdate={'$set': {"a": "10000"}}))
# 更新数据END.
# 清除所有数据[不限定条件,直接删除]
"""
警告:会删除所有数据,请谨慎调用!!!!!!!!!!!!!!!!!!
"""
# 清除所有数据END.
# dao.dropAllData(dataPassword=[123])
# 查询显示所有数据.
# print(dao.insertData([{"测试": "测试"}, {"a": "67"}, {"x": "5"}]))
# print(dao.findAll())
# 获得数据库中一共有多少条数据.
# print(dao.countData())
# print(dao.countData(countQuery={"a":"10000"}))
《Python:操作MongoDB工具实例》上有1条评论