分离检查数据库中表是否存在,在每个函数运行前检查表是否存在

This commit is contained in:
xaoyaoo 2024-08-13 12:53:53 +08:00
parent bc3c9baa99
commit 0e675f6629
10 changed files with 137 additions and 119 deletions

View File

@ -205,7 +205,7 @@ def msg_count():
db_config = get_conf(g.caf, my_wxid, "db_config")
db = DBHandler(db_config)
chat_count = db.get_msg_count(wxid)
chat_count1 = db.get_plc_msg_count(wxid) if db.PublicMsg_exist else {}
chat_count1 = db.get_plc_msg_count(wxid)
# 合并两个字典相同key则将value相加
count = {k: chat_count.get(k, 0) + chat_count1.get(k, 0) for k in
list(set(list(chat_count.keys()) + list(chat_count1.keys())))}
@ -235,11 +235,9 @@ def get_msgs():
return ReJson(1002, body=f"start or limit is not int {start} {limit}")
db = DBHandler(db_config)
msgs, wxid_list = db.get_msg_list(wxid=wxid, start_index=start, page_size=limit)
if not msgs and db.PublicMsg_exist:
msgs, wxid_list = db.get_plc_msg_list(wxid=wxid, start_index=start, page_size=limit)
msgs, wxid_list = db.get_msgs(wxid=wxid, start_index=start, page_size=limit)
wxid_list.append(my_wxid)
user = db.get_user_list(wxids=wxid_list)
user = db.get_user(wxids=wxid_list)
return ReJson(0, {"msg_list": msgs, "user_list": user})

View File

@ -27,25 +27,41 @@ class DBHandler(MicroHandler, MediaHandler, OpenIMContactHandler, PublicMsgHandl
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.MSG_exist = self.Msg_tables_exist()
self.Micro_exist = self.Micro_tables_exist()
self.Media_exist = self.Media_tables_exist()
self.OpenIMContact_exist = self.OpenIMContact_tables_exist()
self.PublicMsg_exist = self.PublicMSG_tables_exist()
self.OpenIMMedia_exist = self.OpenIMMedia_tables_exist()
self.Favorite_exist = self.Favorite_tables_exist()
if self.MSG_exist: # 添加索引
self.Msg_add_index()
if self.PublicMsg_exist: # 添加索引
self.PublicMsg_add_index()
if self.Micro_exist: # 添加索引
self.Micro_add_index()
self.Micro_add_index()
self.Msg_add_index()
self.PublicMsg_add_index()
def get_user(self, word=None, wxids=None, labels=None):
"""
获取联系人列表
"""
users = self.get_user_list(word=word, wxids=wxids, label_ids=labels) if self.Micro_exist else {}
if self.OpenIMContact_exist: users.update(self.get_im_user_list(word=word, wxids=wxids))
users = self.get_user_list(word=word, wxids=wxids, label_ids=labels)
users.update(self.get_im_user_list(word=word, wxids=wxids))
return users
def get_msgs(self, wxid="", start_index=0, page_size=500, msg_type: str = "", msg_sub_type: str = "",
start_createtime=None, end_createtime=None):
"""
获取聊天记录列表
:param wxid: wxid
:param start_index: 起始索引
:param page_size: 页大小
:param msg_type: 消息类型
:param msg_sub_type: 消息子类型
:param start_createtime: 开始时间
:param end_createtime: 结束时间
:return: 聊天记录列表 {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender,
"talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {},
"CreateTime": CreateTime, }
"""
msgs0, wxid_list0 = self.get_msg_list(wxid=wxid, start_index=start_index, page_size=page_size,
msg_type=msg_type,
msg_sub_type=msg_sub_type, start_createtime=start_createtime,
end_createtime=end_createtime)
msgs1, wxid_list1 = self.get_plc_msg_list(wxid=wxid, start_index=start_index, page_size=page_size,
msg_type=msg_type,
msg_sub_type=msg_sub_type, start_createtime=start_createtime,
end_createtime=end_createtime)
msgs = msgs0 + msgs1
wxid_list = wxid_list0 + wxid_list1
return msgs, wxid_list

View File

@ -22,16 +22,12 @@ class FavoriteHandler(DatabaseBase):
_class_name = "Favorite"
Favorite_required_tables = ["FavItems", "FavDataItem", "FavTagDatas", "FavBindTagDatas"]
def Favorite_tables_exist(self):
"""
判断该类所需要的表是否存在
"""
return self.check_tables_exist(self.Favorite_required_tables)
def get_tags(self, LocalID):
"""
return: {LocalID: TagName}
"""
if not self.tables_exist("FavTagDatas"):
return {}
if LocalID is None:
sql = "select LocalID, TagName from FavTagDatas order by ServerSeq"
else:
@ -108,6 +104,9 @@ class FavoriteHandler(DatabaseBase):
"Rerserved7": "保留字段7"
}
if not self.tables_exist(["FavItems", "FavDataItem"]):
return False
sql1 = "select " + ",".join(FavItemsFields.keys()) + " from FavItems order by UpdateTime desc"
sql2 = "select " + ",".join(FavDataItemFields.keys()) + " from FavDataItem B order by B.RecId asc"

View File

@ -23,23 +23,16 @@ class MsgHandler(DatabaseBase):
_class_name = "MSG"
MSG_required_tables = ["MSG"]
def Msg_tables_exist(self):
"""
判断该类所需要的表是否存在
"""
return self.check_tables_exist(self.MSG_required_tables)
def Msg_add_index(self):
"""
添加索引,加快查询速度
"""
# 检查是否存在索引
sql = "CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker ON MSG(StrTalker);"
self.execute(sql)
sql = "CREATE INDEX IF NOT EXISTS idx_MSG_CreateTime ON MSG(CreateTime);"
self.execute(sql)
sql = "CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker_CreateTime ON MSG(StrTalker, CreateTime);"
self.execute(sql)
if not self.tables_exist("MSG"):
return
self.execute("CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker ON MSG(StrTalker);")
self.execute("CREATE INDEX IF NOT EXISTS idx_MSG_CreateTime ON MSG(CreateTime);")
self.execute("CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker_CreateTime ON MSG(StrTalker, CreateTime);")
@db_error
def get_msg_count(self, wxids: list = ""):
@ -57,6 +50,8 @@ class MsgHandler(DatabaseBase):
sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
sql_total = f"SELECT COUNT(*) FROM MSG;"
if not self.tables_exist("MSG"):
return {}
result = self.execute(sql)
total_ret = self.execute(sql_total)
@ -271,11 +266,8 @@ class MsgHandler(DatabaseBase):
"talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {},
"CreateTime": CreateTime, }
"""
sql_base = ("SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,"
"MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,"
"Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,"
"ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM MSG ")
if not self.tables_exist("MSG"):
return [], []
param = ()
sql_wxid, param = ("AND StrTalker=? ", param + (wxid,)) if wxid else ("", param)
@ -286,7 +278,11 @@ class MsgHandler(DatabaseBase):
sql_end_createtime, param = ("AND CreateTime<=? ", param + (end_createtime,)) if end_createtime else ("", param)
sql = (
f"{sql_base} WHERE 1=1 "
"SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,"
"MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,"
"Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,"
"ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM MSG WHERE 1=1 "
f"{sql_wxid}"
f"{sql_type}"
f"{sql_sub_type}"
@ -310,6 +306,8 @@ class MsgHandler(DatabaseBase):
"""
获取每日聊天记录数量包括发送者数量接收者数量和总数
"""
if not self.tables_exist("MSG"):
return {}
if isinstance(start_time, str) and start_time.isdigit():
start_time = int(start_time)
if isinstance(end_time, str) and end_time.isdigit():
@ -355,6 +353,8 @@ class MsgHandler(DatabaseBase):
"""
获取聊天记录数量最多的联系人,他们聊天记录数量
"""
if not self.tables_exist("MSG"):
return {}
if isinstance(start_time, str) and start_time.isdigit():
start_time = int(start_time)
if isinstance(end_time, str) and end_time.isdigit():

View File

@ -13,13 +13,10 @@ class MediaHandler(DatabaseBase):
_class_name = "MediaMSG"
Media_required_tables = ["Media"]
def Media_tables_exist(self):
"""
判断该类所需要的表是否存在
"""
return self.check_tables_exist(self.Media_required_tables)
def get_audio(self, MsgSvrID, is_play=False, is_wave=False, save_path=None, rate=24000):
if not self.tables_exist("Media"):
return False
sql = "select Buf from Media where Reserved0=? "
DBdata = self.execute(sql, (MsgSvrID,))
if not DBdata:

View File

@ -18,39 +18,41 @@ class MicroHandler(DatabaseBase):
Micro_required_tables = ["ContactLabel", "Contact", "ContactHeadImgUrl", "Session", "ChatInfo", "ChatRoom",
"ChatRoomInfo"]
def Micro_tables_exist(self):
"""
判断该类所需要的表是否存在
"""
return self.check_tables_exist(self.Micro_required_tables)
def Micro_add_index(self):
"""
添加索引, 加快查询速度
"""
# 为 Session 表添加索引
self.execute("CREATE INDEX IF NOT EXISTS idx_Session_strUsrName_nTime ON Session(strUsrName, nTime);")
self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nOrder ON Session(nOrder);")
self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nTime ON Session(nTime);")
if self.tables_exist("Session"):
self.execute("CREATE INDEX IF NOT EXISTS idx_Session_strUsrName_nTime ON Session(strUsrName, nTime);")
self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nOrder ON Session(nOrder);")
self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nTime ON Session(nTime);")
# 为 Contact 表添加索引
self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_UserName ON Contact(UserName);")
if self.tables_exist("Contact"):
self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_UserName ON Contact(UserName);")
# 为 ContactHeadImgUrl 表添加索引
self.execute("CREATE INDEX IF NOT EXISTS idx_ContactHeadImgUrl_usrName ON ContactHeadImgUrl(usrName);")
if self.tables_exist('ContactHeadImgUrl'):
self.execute("CREATE INDEX IF NOT EXISTS idx_ContactHeadImgUrl_usrName ON ContactHeadImgUrl(usrName);")
# 为 ChatInfo 表添加索引
self.execute("CREATE INDEX IF NOT EXISTS idx_ChatInfo_Username_LastReadedCreateTime "
"ON ChatInfo(Username, LastReadedCreateTime);")
self.execute("CREATE INDEX IF NOT EXISTS idx_ChatInfo_LastReadedCreateTime ON ChatInfo(LastReadedCreateTime);")
if self.tables_exist('ChatInfo'):
self.execute("CREATE INDEX IF NOT EXISTS idx_ChatInfo_Username_LastReadedCreateTime "
"ON ChatInfo(Username, LastReadedCreateTime);")
self.execute(
"CREATE INDEX IF NOT EXISTS idx_ChatInfo_LastReadedCreateTime ON ChatInfo(LastReadedCreateTime);")
# 为 Contact 表添加复合索引
self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_search "
"ON Contact(UserName, NickName, Remark, Alias, QuanPin, PYInitial, RemarkQuanPin, RemarkPYInitial);")
if self.tables_exist('Contact'):
self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_search "
"ON Contact(UserName, NickName, Remark, Alias, QuanPin, PYInitial, RemarkQuanPin, RemarkPYInitial);")
# 为 ChatRoom 和 ChatRoomInfo 表添加索引
self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoom_ChatRoomName ON ChatRoom(ChatRoomName);")
self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoomInfo_ChatRoomName ON ChatRoomInfo(ChatRoomName);")
if self.tables_exist(['ChatRoomInfo', "ChatRoom"]):
self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoom_ChatRoomName ON ChatRoom(ChatRoomName);")
self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoomInfo_ChatRoomName ON ChatRoomInfo(ChatRoomName);")
@db_error
def get_labels(self, id_is_key=True):
@ -59,12 +61,13 @@ class MicroHandler(DatabaseBase):
:param id_is_key: id_is_key: True: id作为keyFalse: name作为key
:return:
"""
if not self.table_exist.get("ContactLabel", False):
return {}
labels = {}
if not self.tables_exist("ContactLabel"):
return labels
sql = "SELECT LabelId, LabelName FROM ContactLabel ORDER BY LabelName ASC;"
result = self.execute(sql)
if not result:
return []
return labels
if id_is_key:
labels = {row[0]: row[1] for row in result}
else:
@ -78,6 +81,8 @@ class MicroHandler(DatabaseBase):
:return: 会话列表
"""
sessions = {}
if not self.tables_exist(["Session", "Contact", "ContactHeadImgUrl"]):
return sessions
sql = (
"SELECT S.strUsrName,S.nOrder,S.nUnReadCount, S.strNickName, S.nStatus, S.nIsSend, S.strContent, "
"S.nMsgLocalID, S.nMsgStatus, S.nTime, S.nMsgType, S.Reserved2 AS nMsgSubType, C.UserName, C.Alias, "
@ -124,6 +129,8 @@ class MicroHandler(DatabaseBase):
:return: 最近聊天的联系人
"""
users = []
if not self.tables_exist(["ChatInfo"]):
return users
sql = (
"SELECT A.Username, LastReadedCreateTime, LastReadedSvrId "
"FROM ( SELECT Username, MAX(LastReadedCreateTime) AS MaxLastReadedCreateTime FROM ChatInfo "
@ -159,7 +166,8 @@ class MicroHandler(DatabaseBase):
label_ids = [label_ids]
users = {}
if not self.tables_exist(["Contact", "ContactHeadImgUrl"]):
return users
sql = (
"SELECT A.UserName, A.Alias, A.DelFlag, A.Type, A.VerifyFlag, A.Reserved1, A.Reserved2,"
"A.Remark, A.NickName, A.LabelIDList, A.ChatRoomType, A.ChatRoomNotify, A.Reserved5,"
@ -220,6 +228,9 @@ class MicroHandler(DatabaseBase):
if isinstance(roomwxids, str):
roomwxids = [roomwxids]
rooms = {}
if not self.tables_exist(["ChatRoom", "ChatRoomInfo"]):
return rooms
sql = (
"SELECT A.ChatRoomName,A.UserNameList,A.DisplayNameList,A.ChatRoomFlag,A.IsShowName,"
"A.SelfDisplayName,A.Reserved2,A.RoomData, "
@ -232,7 +243,6 @@ class MicroHandler(DatabaseBase):
if roomwxids:
sql = sql.replace(";", f"AND A.UserName IN ('" + "','".join(roomwxids) + "') ;")
rooms = {}
result = self.execute(sql)
if not result:
return rooms

View File

@ -13,12 +13,6 @@ class OpenIMContactHandler(DatabaseBase):
_class_name = "OpenIMContact"
OpenIMContact_required_tables = ["OpenIMContact"]
def OpenIMContact_tables_exist(self):
"""
判断该类所需要的表是否存在
"""
return self.check_tables_exist(self.OpenIMContact_required_tables)
def get_im_user_list(self, word=None, wxids=None):
"""
获取联系人列表
@ -27,8 +21,10 @@ class OpenIMContactHandler(DatabaseBase):
:param wxids: 微信id列表
:return: 联系人字典
"""
if not self.tables_exist("OpenIMContact"):
return []
if not wxids:
wxids = []
wxids = {}
if isinstance(wxids, str):
wxids = [wxids]
sql = ("SELECT UserName,NickName,Type,Remark,BigHeadImgUrl,CustomInfoDetail,CustomInfoDetailVisible,"
@ -49,7 +45,7 @@ class OpenIMContactHandler(DatabaseBase):
result = self.execute(sql)
if not result:
return []
return {}
users = {}
for row in result:

View File

@ -13,13 +13,9 @@ class OpenIMMediaHandler(DatabaseBase):
_class_name = "OpenIMMedia"
OpenIMMedia_required_tables = ["OpenIMMedia"]
def OpenIMMedia_tables_exist(self):
"""
判断该类所需要的表是否存在
"""
return self.check_tables_exist(self.OpenIMMedia_required_tables)
def get_im_audio(self, MsgSvrID, is_play=False, is_wave=False, save_path=None, rate=24000):
if not self.tables_exist("OpenIMMedia"):
return False
sql = "select Buf from OpenIMMedia where Reserved0=? "
DBdata = self.execute(sql, (MsgSvrID,))
if not DBdata:

View File

@ -32,23 +32,18 @@ class PublicMsgHandler(MsgHandler):
_class_name = "PublicMSG"
PublicMSG_required_tables = ["PublicMsg"]
@db_error
def PublicMSG_tables_exist(self):
"""
判断该类所需要的表是否存在
"""
return self.check_tables_exist(self.PublicMSG_required_tables)
def PublicMsg_add_index(self):
"""
添加索引,加快查询速度
"""
# 检查是否存在索引
sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker ON MSG(StrTalker);"
if not self.tables_exist("PublicMsg"):
return
sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker ON PublicMsg(StrTalker);"
self.execute(sql)
sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_CreateTime ON MSG(CreateTime);"
sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_CreateTime ON PublicMsg(CreateTime);"
self.execute(sql)
sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker_CreateTime ON MSG(StrTalker, CreateTime);"
sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker_CreateTime ON PublicMsg(StrTalker, CreateTime);"
self.execute(sql)
@db_error
@ -58,6 +53,8 @@ class PublicMsgHandler(MsgHandler):
:param wxids: wxid list
:return: 聊天记录数量列表 {wxid: chat_count}
"""
if not self.tables_exist("PublicMsg"):
return {}
if isinstance(wxids, str):
wxids = [wxids]
if wxids:
@ -96,11 +93,8 @@ class PublicMsgHandler(MsgHandler):
"talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {},
"CreateTime": CreateTime, }
"""
sql_base = ("SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,"
"MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,"
"Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,"
"ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM PublicMsg ")
if not self.tables_exist("PublicMsg"):
return [], []
param = ()
sql_wxid, param = ("AND StrTalker=? ", param + (wxid,)) if wxid else ("", param)
@ -111,7 +105,11 @@ class PublicMsgHandler(MsgHandler):
sql_end_createtime, param = ("AND CreateTime<=? ", param + (end_createtime,)) if end_createtime else ("", param)
sql = (
f"{sql_base} WHERE 1=1 "
"SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,"
"MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,"
"Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,"
"ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM PublicMsg WHERE 1=1 "
f"{sql_wxid}"
f"{sql_type}"
f"{sql_sub_type}"

View File

@ -79,7 +79,7 @@ class DatabaseSingletonBase:
class DatabaseBase(DatabaseSingletonBase):
_class_name = "DatabaseBase"
table_exist = {}
existed_tables = []
def __init__(self, db_config):
"""
@ -91,6 +91,28 @@ class DatabaseBase(DatabaseSingletonBase):
"""
self.config = db_config
self.pool = self.connect(self.config)
self.__get_existed_tables()
def __get_existed_tables(self):
sql = "SELECT tbl_name FROM sqlite_master WHERE type = 'table' and tbl_name!='sqlite_sequence';"
existing_tables = self.execute(sql)
self.existed_tables = [row[0].lower() for row in existing_tables]
return self.existed_tables
def tables_exist(self, required_tables: str or list):
"""
判断该类所需要的表是否存在
Check if all required tables exist in the database.
Args:
required_tables (list or str): A list of table names or a single table name string.
Returns:
bool: True if all required tables exist, False otherwise.
"""
if isinstance(required_tables, str):
required_tables = [required_tables]
rbool = all(table.lower() in self.existed_tables for table in (required_tables or []))
if not rbool: db_loger.warning(f"{required_tables=}\n{self.existed_tables=}\n{rbool=}\n")
return rbool
def execute(self, sql, params=None):
"""
@ -125,20 +147,6 @@ class DatabaseBase(DatabaseSingletonBase):
finally:
connection.close()
def check_tables_exist(self, required_tables):
"""
判断该类所需要的表是否存在
"""
required_tables = required_tables or []
required_tables_str = "'" + "','".join(required_tables) + "'"
sql = (f"SELECT tbl_name FROM sqlite_master "
f"WHERE type='table' AND tbl_name in ({required_tables_str});")
existing_tables = self.execute(sql)
existing_tables = [row[0] for row in existing_tables] # 将查询结果转换为列表
self.table_exist = {table: table in existing_tables for table in required_tables}
# 检查所有必需的表是否都在现有表中
return all(table in existing_tables for table in required_tables)
def close(self):
self.pool.close()
db_loger.info(f"关闭数据库 - {self.config}")