python中单例模式应用
- 互联网
- 2025-09-21 15:18:02

数据库连接池单例模式 1. 为什么使用单例模式
创建数据库连接是一个昂贵的过程(涉及网络通信、认证等)。单例模式的连接池可以在程序启动时初始化一组连接,并在整个生命周期中重用这些连接,而不是每次请求都新建连接。同时还可以控制连接数量,防止资源耗尽。
2.代码下面这段代码实现了一个数据库连接池,并且通过单例模式确保整个程序中只有一个连接池实例,包含扩容机制。避免频繁创建和销毁数据库连接,通过单例复用连接池:
通过单例模式确保全局只有一个连接池实例。 使用线程锁和条件变量实现线程安全,处理多线程环境下的并发访问。 提供连接的创建、获取、归还和关闭功能。 通过多线程测试验证连接池的并发性能。
import pymysql import threading from pymysql import Error class DatabasePool: _instance = None def __new__(cls): if not cls._instance: cls._instance = super().__new__(cls) # 数据库配置 cls._instance.config = { 'host': 'localhost', 'user': 'root', 'password': '111111', 'database': 'test', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # 使用可重入锁和条件变量 cls._instance.lock = threading.RLock() cls._instance.condition = threading.Condition(cls._instance.lock) cls._instance.max_connections = 10 cls._instance.connections = [] # 未使用的连接 cls._instance.in_use = set() # 正在使用的连接 cls._instance.init_pool() return cls._instance # 返回唯一的连接池实例 def init_pool(self): """初始化连接池(线程安全)""" with self.lock: for _ in range(self.max_connections): self.add_connection() def create_connection(self): """创建单个数据库连接(无需加锁)""" try: conn = pymysql.connect(**self.config) print(f"成功创建连接:{conn._sock.getsockname()}") return conn except Error as e: print(f"连接创建失败: {e}") return None def add_connection(self): """向连接池添加连接(线程安全)""" if len(self.connections) + len(self.in_use) < self.max_connections: conn = self.create_connection() if conn: self.connections.append(conn) def get_connection(self): """获取连接(线程安全)""" with self.condition: while not self.connections: if len(self.in_use) < self.max_connections: self.add_connection() else: print("连接池已满,等待连接归还...") self.condition.wait() # 等待连接归还 conn = self.connections.pop() self.in_use.add(conn) print(f"获取连接:{conn._sock.getsockname()}") return conn def release_connection(self, conn): """归还连接(线程安全)""" with self.condition: if conn in self.in_use: self.in_use.remove(conn) if conn.open: self.connections.append(conn) print(f"归还连接:{conn._sock.getsockname()}") self.condition.notify() # 通知等待的线程 else: print("警告:连接已关闭,直接丢弃") def close_pool(self): """关闭所有连接(线程安全)""" with self.lock: for conn in self.connections + list(self.in_use): if conn.open: conn.close() self.connections.clear() self.in_use.clear() print("所有数据库连接已关闭") # 多线程测试示例 if __name__ == "__main__": import concurrent.futures def worker(thread_id): pool = DatabasePool() conn = pool.get_connection() try: with conn.cursor() as cursor: cursor.execute("SELECT SLEEP(1)") # 模拟耗时操作 print(f"线程 {thread_id} 执行查询") finally: pool.release_connection(conn) # 创建连接池 pool = DatabasePool() # 使用15个线程并发测试,超过最大连接数 with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: futures = [executor.submit(worker, i) for i in range(15)] for future in concurrent.futures.as_completed(futures): future.result() pool.close_pool() 3. 核心:__new__ 方法 class DatabasePool: _instance = None # 类变量,用于保存唯一的实例 def __new__(cls): if not cls._instance: cls._instance = super().__new__(cls) # 数据库配置 cls._instance.config = { 'host': 'localhost', 'user': 'root', 'password': '111111', 'database': 'test', 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # 使用可重入锁和条件变量 cls._instance.lock = threading.RLock() cls._instance.condition = threading.Condition(cls._instance.lock) cls._instance.max_connections = 10 cls._instance.connections = [] # 未使用的连接 cls._instance.in_use = set() # 正在使用的连接 cls._instance.init_pool() return cls._instance # 返回唯一的连接池实例 作用:确保无论创建多少次 DatabasePool(),都只会生成同一个实例。示例: pool1 = DatabasePool() # 第一次创建,初始化连接池 pool2 = DatabasePool() # 直接返回 pool1 的实例 print(pool1 is pool2) # 输出 True 3. 总结我们可以把连接池想象成一个共享铅笔盒(全班共享):
锁:管理员(锁)确保一次只有一个人能拿铅笔。连接:铅笔盒里的铅笔(初始10支)。动态扩容:当铅笔用完时,管理员临时制作新铅笔。归还机制:用完后必须归还,否则其他人无法使用。python中单例模式应用由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“python中单例模式应用”