catdata-api-v2/asyncMysql.py
2025-08-01 20:36:20 +08:00

109 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio # 导入 asyncio 库,用于异步编程
import aiomysql # 导入 aiomysql 库,用于异步 MySQL 操作
# from Public_Config.PublicConfig import ConfigYaml # 导入配置文件模块,获取 MySQL 配置信息
MYSQL = {
'host':'119.29.118.68',
'port': 3306,
'database':'jifu5',
'user':'cyonjan',
'password':'livesite',
'charset':'utf8mb4',
}
class AsyncMysql:
def __init__(self):
# 获取 MySQL 配置信息
# mysql = self.load_config(r'D:\Database.yaml') # 从配置文件中读取 MySQL 配置
self.host = MYSQL.get('host') # 主机地址
self.port = int(MYSQL.get('port')) # 端口,转为整数
self.user = MYSQL.get('user') # 用户名
self.password = MYSQL.get('password') # 密码
self.database = MYSQL.get('database') # 数据库名
self.pool = None # 初始化连接池为 None稍后创建连接池
# def load_config(self, config_file): #todo 加载配置文件的方法
# with open(config_file, 'r') as file: #todo 以只读模式打开配置文件
# return yaml.safe_load(file) #todo 使用 yaml.safe_load 解析 YAML 文件内容
async def create_pool(self):
""" 创建 MySQL 连接池 """
try:
# 创建 MySQL 连接池,设置最小连接数 minsize 和最大连接数 maxsize
self.pool = await aiomysql.create_pool(
host=self.host, # 主机
port=self.port, # 端口
user=self.user, # 用户名
password=self.password, # 密码
db=self.database, # 数据库名
autocommit=True, # 自动提交事务
minsize=2, # 设置最小连接数
maxsize=10 # 设置最大连接数
)
except Exception as e:
# 如果创建连接池失败,捕获异常并输出错误信息
print(f"Failed to create MySQL connection pool: {str(e)}")
async def execute_query(self, query):
""" 执行增删改操作 """
try:
# 从连接池中获取一个连接对象
async with self.pool.acquire() as conn:
# 使用连接对象创建游标,用于执行 SQL 查询
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query) # 执行 SQL 查询
return True # 返回执行成功标志
except Exception as e:
# 如果执行 SQL 失败,捕获异常并输出错误信息
print(f"Failed to execute query: {query}")
print(f"Error: {str(e)}")
return False # 执行失败时返回 False
async def execute_select(self, query):
""" 执行查询操作 """
try:
# 从连接池中获取一个连接对象
async with self.pool.acquire() as conn:
# 使用连接对象创建游标,用于执行 SQL 查询
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query) # 执行 SQL 查询
results = await cursor.fetchall() # 获取所有查询结果
return results # 返回查询结果
except Exception as e:
# 如果执行查询失败,捕获异常并输出错误信息
print(f"Failed to execute select query: {query}")
print(f"Error: {str(e)}")
return None # 执行失败时返回 None
async def close_pool(self):
""" 关闭 MySQL 连接池 """
if self.pool:
self.pool.close() # 关闭连接池
await self.pool.wait_closed() # 等待连接池完全关闭
print("MySQL connection pool closed.") # 输出连接池已关闭的信息
# 测试异步操作
async def main():
mysql = AsyncMysql() # 创建 AsyncMysql 实例
await mysql.create_pool() # 异步创建连接池
# 定义 SQL 查询语句
query1 = "SELECT * FROM EDB_USER LIMIT 5" # 查询 USERLOGINLOG 表的前 5 条记录
query2 = "SELECT COUNT(*) FROM EDB_USER" # 查询 USERLOGINLOG 表的记录总数
# 使用 asyncio.gather 同时执行多个查询操作
results = await asyncio.gather(
mysql.execute_select(query1), # 执行第一个查询
mysql.execute_select(query2) # 执行第二个查询
)
# 打印查询结果
print("Query 1 results:", results[0]) # 输出第一个查询的结果
print("Query 2 results:", results[1]) # 输出第二个查询的结果
# 关闭连接池
await mysql.close_pool() # 异步关闭连接池
# 运行异步任务
if __name__ == '__main__':
asyncio.run(main()) # 启动并运行 main 异步任务