这里mongodb地址从配置文件导入,实际项目中可以从环境变量获取
def _create_new_conn(self): """创建新连接""" return pymongo.MongoClient(Config.MONGODB_HOST, serverSelectionTimeoutMS=5000, socketTimeoutMS=5000)如果队列中无连接,则新建连接
def _get_conn(self): """获取连接""" conn = self.conn_queue.get() if conn is None: self._create_new_conn() return conn创建mondodb连接池对象继承基本连接池,如果有多个数据库则分别创建对象继承DbPool,继承后为了方便后续使用最好封装一些通用的crud方法,这里以mongodb为例
插入多条数据无需指定,插入一条必须指定insert_one参数
def insert(self, col_name, insert_data, **kwargs): """ 插入数据通用, **kwargs insert_one, insert_many """ conn = self._get_conn() try: if kwargs.get("insert_one"): conn[database][col_name].insert_one(insert_data) else: conn[database][col_name].insert_many(insert_data) except Exception as e: logger.error(e) finally: self._put_conn(conn)col_name参数为表名,如果指定find_one参数则使用pymongo的find_one函数返回一条数据,否则返回查询的所有数据
def find(self, col_name, match=None, **kwargs): """基本查询""" conn = self._get_conn() try: col = conn[database][col_name] match = {} if match is None else match if kwargs.get("find_one"): return col.find_one(match) else: return col.find(match) except Exception as e: logger.error(e) finally: self._put_conn(conn)分页查询至少需要传入一个参数(表名),不指定则默认分页大小为10,默认查询第一页,match为查询条件,sort为排序,如果不懂可以查看pymongo文档
def find_by_page(self, col_name, **kwargs): """分页查询""" conn = self._get_conn() try: page_size = kwargs.get("page_size", 10) page_no = kwargs.get("page_no", 1) match = kwargs.get("match", {}) sort = kwargs.get("sort", None) skip = page_size * (page_no - 1) col = conn[database][col_name] count = col.count_documents(match) result = col.find(match).sort(sort).skip(skip).limit(page_size) if sort else col.find(match).skip( skip).limit(page_size) return result, count except Exception as e: logger.error(e) finally: self._put_conn(conn)group参数为分组条件
def aggregate_query(self, col_name, **kwargs): """分组查询""" conn = self._get_conn() try: match = kwargs.get("match", {}) group = kwargs.get("group", {}) pipeline = [ {"$match": match}, {"$group": group} ] return conn[database][col_name].aggregate(pipeline) except Exception as e: logger.error(e) finally: self._put_conn(conn)更新数据如果只更新一条数据,指定update_one数据即可,否则更新多条数据
def update(self, col_name, update_data, **kwargs): """更新数据通用""" conn = self._get_conn() try: match = kwargs.get("filter", {}) logger.info(match) if kwargs.get("update_one"): conn[database][col_name].update_one(match, {"$set", update_data}, upsert=True) else: conn[database][col_name].update_many(match, {"$set", update_data}, upsert=True) except Exception as e: logger.error(e) finally: self._put_conn(conn)指定del_one参数则只删除一条数据,否则删除匹配的所有数据
def del(self, col_name, **kwargs): """删除数据通用""" conn = self._get_conn() try: match = kwargs.get("match", {}) if kwargs.get("del_one"): conn[database][col_name].delete_one(match) else: conn[database][col_name].delete_many(match) except Exception as e: logger.error(e) finally: self._put_conn(conn)这里只是按照我个人习惯封装了一些方法,这里仅供参考,最好根据实际项目进行扩展,如果是其他数据库也可以参照该类进行处理。
class MongodbPool(DbPool): """mongodb连接池""" def find(self, col_name, match=None, **kwargs): """基本查询""" conn = self._get_conn() try: col = conn[database][col_name] match = {} if match is None else match if kwargs.get("find_one"): return col.find_one(match) else: return col.find(match) except Exception as e: logger.error(e) finally: self._put_conn(conn) def find_by_page(self, col_name, **kwargs): """分页查询""" conn = self._get_conn() try: page_size = kwargs.get("page_size", 10) page_no = kwargs.get("page_no", 1) match = kwargs.get("match", {}) sort = kwargs.get("sort", None) skip = page_size * (page_no - 1) col = conn[database][col_name] count = col.count_documents(match) result = col.find(match).sort(sort).skip(skip).limit(page_size) if sort else col.find(match).skip( skip).limit(page_size) return result, count except Exception as e: logger.error(e) finally: self._put_conn(conn) def update(self, col_name, update_data, **kwargs): """更新数据通用""" conn = self._get_conn() try: # update_data.pop("_id") if update_data.get("_id") else update_data match = kwargs.get("filter", {}) logger.info(match) if kwargs.get("update_one"): conn[database][col_name].update_one(match, {"$set", update_data}, upsert=True) else: conn[database][col_name].update_many(match, {"$set", update_data}, upsert=True) except Exception as e: logger.error(e) finally: self._put_conn(conn) def aggregate_query(self, col_name, **kwargs): """分组查询""" conn = self._get_conn() try: match = kwargs.get("match", {}) group = kwargs.get("group", {}) pipeline = [ {"$match": match}, {"$group": group} ] return conn[database][col_name].aggregate(pipeline) except Exception as e: logger.error(e) finally: self._put_conn(conn) def del(self, col_name, **kwargs): """删除数据通用""" conn = self._get_conn() try: match = kwargs.get("match", {}) if kwargs.get("del_one"): conn[database][col_name].delete_one(match) else: conn[database][col_name].delete_many(match) except Exception as e: logger.error(e) finally: self._put_conn(conn) def insert(self, col_name, insert_data, **kwargs): """ 插入数据通用, 插入多条无需指定,插入一条必须指定 **kwargs insert_one, insert_many """ conn = self._get_conn() try: if kwargs.get("insert_one"): conn[database][col_name].insert_one(insert_data) else: conn[database][col_name].insert_many(insert_data) except Exception as e: logger.error(e) finally: self._put_conn(conn)这里以在flask中使用为例,使用时尽量不要到处创建对象,在初始化app时创建db对象
app.py from main import create_app app = create_app("product") if __name__ == "__main__": app.run(host='0.0.0.0', port='5000')代码全部放在main文件夹下面,在main下面的__init__.py中创建db和create_app,这样其他模块只需要引入该db就行
init.py 部分代码 # mongodb db = MongodbPool() # 日志 logger.add(sys.stderr, format="{time} {level} {message}", filter="my_module", level="DEBUG") logger.add("./log/log.log", retention="10 days", encoding="utf-8", rotation="100 MB") def create_app(config_name): """"根据配置文件名创建flask对象""" app = Flask(__name__) config = config_map.get(config_name) app.config.from_object(config) 使用示例 from main import db from main.models import FwNetworkConfig if __name__ == '__main__': data = db.find(FwNetworkConfig.col_name, find_one=True) print(data)随着你能力的提升,可以不断对连接池进行改进,封装数据库模型(models),提升连接池性能,逐步就可以取代orm了。由于目前python各种生态还处于初级阶段,你完全可以为python生态贡献出自己的一分努力…