python自建通用数据库连接池

it2025-09-25  1

python自建通用数据库连接池

使用场景初始化连接池基类 使用pymongo创建连接从队列中获取mongodb连接关闭连接连接池基类完整代码创建mongodb连接池插入数据基本查询分页查询分组查询更新删除mongodb连接池完整代码 使用永无止境

使用场景

可以作为python各种orm插件的替代品,同时通用性更好,也更好控制和优化。 如果项目中使用了不同的数据库,例如使用了mysql和mongodb, 使用自建连接池只需要熟悉原生的方法而不需要熟悉各种orm参数和api,在控制方面可以说能做到随心所欲。 随着你能力的提升,还可以对连接池作出各种扩展,扩展出属于自己的orm。

初始化连接池基类

def __init__(self, **kwargs): '''初始化线程池,默认连接限制为10,不指定则会默认创建10个连接入队列''' self.size = kwargs.get("size", 10) # 连接池大小,默认为10 self.kwargs = kwargs self.conn_queue = queue.Queue(maxsize=self.size) for i in range(self.size): self.conn_queue.put(self._create_new_conn())

使用pymongo创建连接

这里mongodb地址从配置文件导入,实际项目中可以从环境变量获取

def _create_new_conn(self): """创建新连接""" return pymongo.MongoClient(Config.MONGODB_HOST, serverSelectionTimeoutMS=5000, socketTimeoutMS=5000)

从队列中获取mongodb连接

如果队列中无连接,则新建连接

def _get_conn(self): """获取连接""" conn = self.conn_queue.get() if conn is None: self._create_new_conn() return conn

关闭连接

def __close__(self): """关闭连接:如果取到的队列中的连接已经为空了,直接pass""" try: while True: conn = self.conn_queue.get_nowait() if conn: conn.close() # queue.Empty的作用是,如果队列为空,返回True,如果不为空,返回False except queue.Empty: pass

连接池基类完整代码

import queue, pymongo from loguru import logger class DbPool(object): """基本连接池""" def __init__(self, **kwargs): self.size = kwargs.get("size", 10) # 连接池大小,默认为10 self.kwargs = kwargs self.conn_queue = queue.Queue(maxsize=self.size) for i in range(self.size): self.conn_queue.put(self._create_new_conn()) def _create_new_conn(self): """创建新连接""" return pymongo.MongoClient(Config.MONGODB_HOST, serverSelectionTimeoutMS=5000, socketTimeoutMS=5000) def _put_conn(self, conn): self.conn_queue.put(conn) def _get_conn(self): """获取连接""" conn = self.conn_queue.get() if conn is None: self._create_new_conn() return conn def __close__(self): """关闭连接:如果取到的队列中的连接已经为空了,直接pass""" try: while True: conn = self.conn_queue.get_nowait() if conn: conn.close() # queue.Empty的作用是,如果队列为空,返回True,如果不为空,返回False except queue.Empty: pass

创建mongodb连接池

创建mondodb连接池对象继承基本连接池,如果有多个数据库则分别创建对象继承DbPool,继承后为了方便后续使用最好封装一些通用的crud方法,这里以mongodb为例

插入数据

插入多条数据无需指定,插入一条必须指定insert_one参数

def insert(self, col_name, insert_data, **kwargs): """ 插入数据通用, **kwargs insert_one, insert_many """ conn = self._get_conn() try: if kwargs.get("insert_one"): conn[database][col_name].insert_one(insert_data) else: conn[database][col_name].insert_many(insert_data) except Exception as e: logger.error(e) finally: self._put_conn(conn)

基本查询

col_name参数为表名,如果指定find_one参数则使用pymongo的find_one函数返回一条数据,否则返回查询的所有数据

def find(self, col_name, match=None, **kwargs): """基本查询""" conn = self._get_conn() try: col = conn[database][col_name] match = {} if match is None else match if kwargs.get("find_one"): return col.find_one(match) else: return col.find(match) except Exception as e: logger.error(e) finally: self._put_conn(conn)

分页查询

分页查询至少需要传入一个参数(表名),不指定则默认分页大小为10,默认查询第一页,match为查询条件,sort为排序,如果不懂可以查看pymongo文档

def find_by_page(self, col_name, **kwargs): """分页查询""" conn = self._get_conn() try: page_size = kwargs.get("page_size", 10) page_no = kwargs.get("page_no", 1) match = kwargs.get("match", {}) sort = kwargs.get("sort", None) skip = page_size * (page_no - 1) col = conn[database][col_name] count = col.count_documents(match) result = col.find(match).sort(sort).skip(skip).limit(page_size) if sort else col.find(match).skip( skip).limit(page_size) return result, count except Exception as e: logger.error(e) finally: self._put_conn(conn)

分组查询

group参数为分组条件

def aggregate_query(self, col_name, **kwargs): """分组查询""" conn = self._get_conn() try: match = kwargs.get("match", {}) group = kwargs.get("group", {}) pipeline = [ {"$match": match}, {"$group": group} ] return conn[database][col_name].aggregate(pipeline) except Exception as e: logger.error(e) finally: self._put_conn(conn)

更新

更新数据如果只更新一条数据,指定update_one数据即可,否则更新多条数据

def update(self, col_name, update_data, **kwargs): """更新数据通用""" conn = self._get_conn() try: match = kwargs.get("filter", {}) logger.info(match) if kwargs.get("update_one"): conn[database][col_name].update_one(match, {"$set", update_data}, upsert=True) else: conn[database][col_name].update_many(match, {"$set", update_data}, upsert=True) except Exception as e: logger.error(e) finally: self._put_conn(conn)

删除

指定del_one参数则只删除一条数据,否则删除匹配的所有数据

def del(self, col_name, **kwargs): """删除数据通用""" conn = self._get_conn() try: match = kwargs.get("match", {}) if kwargs.get("del_one"): conn[database][col_name].delete_one(match) else: conn[database][col_name].delete_many(match) except Exception as e: logger.error(e) finally: self._put_conn(conn)

mongodb连接池完整代码

这里只是按照我个人习惯封装了一些方法,这里仅供参考,最好根据实际项目进行扩展,如果是其他数据库也可以参照该类进行处理。

class MongodbPool(DbPool): """mongodb连接池""" def find(self, col_name, match=None, **kwargs): """基本查询""" conn = self._get_conn() try: col = conn[database][col_name] match = {} if match is None else match if kwargs.get("find_one"): return col.find_one(match) else: return col.find(match) except Exception as e: logger.error(e) finally: self._put_conn(conn) def find_by_page(self, col_name, **kwargs): """分页查询""" conn = self._get_conn() try: page_size = kwargs.get("page_size", 10) page_no = kwargs.get("page_no", 1) match = kwargs.get("match", {}) sort = kwargs.get("sort", None) skip = page_size * (page_no - 1) col = conn[database][col_name] count = col.count_documents(match) result = col.find(match).sort(sort).skip(skip).limit(page_size) if sort else col.find(match).skip( skip).limit(page_size) return result, count except Exception as e: logger.error(e) finally: self._put_conn(conn) def update(self, col_name, update_data, **kwargs): """更新数据通用""" conn = self._get_conn() try: # update_data.pop("_id") if update_data.get("_id") else update_data match = kwargs.get("filter", {}) logger.info(match) if kwargs.get("update_one"): conn[database][col_name].update_one(match, {"$set", update_data}, upsert=True) else: conn[database][col_name].update_many(match, {"$set", update_data}, upsert=True) except Exception as e: logger.error(e) finally: self._put_conn(conn) def aggregate_query(self, col_name, **kwargs): """分组查询""" conn = self._get_conn() try: match = kwargs.get("match", {}) group = kwargs.get("group", {}) pipeline = [ {"$match": match}, {"$group": group} ] return conn[database][col_name].aggregate(pipeline) except Exception as e: logger.error(e) finally: self._put_conn(conn) def del(self, col_name, **kwargs): """删除数据通用""" conn = self._get_conn() try: match = kwargs.get("match", {}) if kwargs.get("del_one"): conn[database][col_name].delete_one(match) else: conn[database][col_name].delete_many(match) except Exception as e: logger.error(e) finally: self._put_conn(conn) def insert(self, col_name, insert_data, **kwargs): """ 插入数据通用, 插入多条无需指定,插入一条必须指定 **kwargs insert_one, insert_many """ conn = self._get_conn() try: if kwargs.get("insert_one"): conn[database][col_name].insert_one(insert_data) else: conn[database][col_name].insert_many(insert_data) except Exception as e: logger.error(e) finally: self._put_conn(conn)

使用

这里以在flask中使用为例,使用时尽量不要到处创建对象,在初始化app时创建db对象

app.py from main import create_app app = create_app("product") if __name__ == "__main__": app.run(host='0.0.0.0', port='5000')

代码全部放在main文件夹下面,在main下面的__init__.py中创建db和create_app,这样其他模块只需要引入该db就行

init.py 部分代码 # mongodb db = MongodbPool() # 日志 logger.add(sys.stderr, format="{time} {level} {message}", filter="my_module", level="DEBUG") logger.add("./log/log.log", retention="10 days", encoding="utf-8", rotation="100 MB") def create_app(config_name): """"根据配置文件名创建flask对象""" app = Flask(__name__) config = config_map.get(config_name) app.config.from_object(config) 使用示例 from main import db from main.models import FwNetworkConfig if __name__ == '__main__': data = db.find(FwNetworkConfig.col_name, find_one=True) print(data)

永无止境

随着你能力的提升,可以不断对连接池进行改进,封装数据库模型(models),提升连接池性能,逐步就可以取代orm了。由于目前python各种生态还处于初级阶段,你完全可以为python生态贡献出自己的一分努力…

最新回复(0)