diff --git a/pywxdump/analyzer/export_chat.py b/pywxdump/analyzer/export_chat.py index 199810b..5f09047 100644 --- a/pywxdump/analyzer/export_chat.py +++ b/pywxdump/analyzer/export_chat.py @@ -17,41 +17,91 @@ import sqlite3 import os import json import time -from .utils import get_md5 +from functools import wraps + +from .utils import get_md5, detach_databases, attach_databases, execute_sql from .db_parsing import read_img_dat, decompress_CompressContent, read_audio, parse_xml_string from flask import Flask, request, render_template, g, Blueprint -def get_user_list(MSG_ALL_db_path, MicroMsg_db_path): +def get_contact_list(MicroMsg_db_path): + """ + 获取联系人列表 + :param MicroMsg_db_path: MicroMsg.db 文件路径 + :return: 联系人列表 + """ users = [] - # 连接 MSG_ALL.db 数据库,并执行查询 - db1 = sqlite3.connect(MSG_ALL_db_path) - cursor1 = db1.cursor() - cursor1.execute("SELECT StrTalker, COUNT(*) AS ChatCount FROM MSG GROUP BY StrTalker ORDER BY ChatCount DESC") - result = cursor1.fetchall() + # 连接 MicroMsg.db 数据库,并执行查询 + db = sqlite3.connect(MicroMsg_db_path) + cursor = db.cursor() + cursor.execute( + "SELECT A.UserName, A.NickName, A.Remark,B.bigHeadImgUrl FROM Contact A,ContactHeadImgUrl B ORDER BY NickName ASC") + result = cursor.fetchall() for row in result: # 获取用户名、昵称、备注和聊天记录数量 - db2 = sqlite3.connect(MicroMsg_db_path) - cursor2 = db2.cursor() - cursor2.execute("SELECT UserName, NickName, Remark FROM Contact WHERE UserName=?", (row[0],)) - result2 = cursor2.fetchone() - if result2: - username, nickname, remark = result2 - chat_count = row[1] - - # 拼接四列数据为元组 - row_data = {"username": username, "nickname": nickname, "remark": remark, "chat_count": chat_count, - "isChatRoom": username.startswith("@chatroom")} - users.append(row_data) - cursor2.close() - db2.close() - cursor1.close() - db1.close() + username, nickname, remark, headImgUrl = row + users.append({"username": username, "nickname": nickname, "remark": remark, "headImgUrl": headImgUrl}) + cursor.close() + db.close() return users +def msg_db_connect(func): + @wraps(func) + def wrapper(MSG_db_path, *args, **kwargs): + # 连接 MSG.db 数据库,并执行查询 + if isinstance(MSG_db_path, list): + # alias, file_path + databases = {f"MSG{i}": db_path for i, db_path in enumerate(MSG_db_path)} + elif isinstance(MSG_db_path, str): + databases = {"MSG": MSG_db_path} + else: + raise TypeError("MSG_db_path 类型错误") + + # 连接 MSG_ALL.db 数据库,并执行查询 + if len(databases) > 1: + db = sqlite3.connect(":memory:") + attach_databases(db, databases) + else: + db = sqlite3.connect(list(databases.values())[0]) + + result = func("", db=db, databases=databases, *args, **kwargs) + + # 断开数据库连接 + if len(databases) > 1: + for alias in databases: + db.execute(f"DETACH DATABASE {alias}") + db.close() + + return result + + return wrapper + + +@msg_db_connect +def get_chat_count(MSG_db_path: [str, list], db=None, databases=None): + """ + 获取聊天记录数量 + :param MSG_db_path: MSG.db 文件路径 + :return: 聊天记录数量列表 + """ + # 构造 SQL 查询,使用 UNION ALL 联合不同数据库的 MSG 表 + union_sql = " UNION ALL ".join( + f"SELECT StrTalker, COUNT(*) AS ChatCount FROM {alias}.MSG GROUP BY StrTalker" for alias in databases) + + sql = f"SELECT StrTalker, SUM(ChatCount) AS TotalChatCount FROM ({union_sql}) GROUP BY StrTalker ORDER BY TotalChatCount DESC" + + chat_counts = [] + result = execute_sql(db, sql) + for row in result: + username, chat_count = row + row_data = {"username": username, "chat_count": chat_count} + chat_counts.append(row_data) + return chat_counts + + def load_base64_audio_data(MsgSvrID, MediaMSG_all_db_path): wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=MediaMSG_all_db_path) if not wave_data: diff --git a/pywxdump/analyzer/utils.py b/pywxdump/analyzer/utils.py index acd334b..7ab16d1 100644 --- a/pywxdump/analyzer/utils.py +++ b/pywxdump/analyzer/utils.py @@ -18,5 +18,50 @@ def get_md5(data): md5.update(data) return md5.hexdigest() + +def attach_databases(connection, databases): + """ + 将多个数据库附加到给定的SQLite连接。 + 参数: + -连接:SQLite连接 + -数据库:包含数据库别名和文件路径的词典 + """ + cursor = connection.cursor() + for alias, file_path in databases.items(): + attach_command = f"ATTACH DATABASE '{file_path}' AS {alias};" + cursor.execute(attach_command) + connection.commit() + + +def detach_databases(connection, aliases): + """ + 从给定的 SQLite 连接中分离多个数据库。 + + 参数: + - connection: SQLite连接 + - aliases:要分离的数据库别名列表 + """ + cursor = connection.cursor() + for alias in aliases: + detach_command = f"DETACH DATABASE {alias};" + cursor.execute(detach_command) + connection.commit() + + +def execute_sql(connection, sql, params=None): + """ + 执行给定的SQL语句,返回结果。 + 参数: + - connection: SQLite连接 + - sql:要执行的SQL语句 + - params:SQL语句中的参数 + """ + cursor = connection.cursor() + if params: + cursor.execute(sql, params) + else: + cursor.execute(sql) + return cursor.fetchall() + if __name__ == '__main__': pass