""" 数据处理工具类,用于处理聊天记录和联系人信息 """ import os import sqlite3 from typing import List, Dict, Tuple from datetime import datetime import re class DataProcessor: def __init__(self): """初始化数据处理器""" # 获取当前文件所在目录的上两级目录作为项目根目录 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 使用os.path.join构建跨平台的路径 db_dir = os.path.join(project_root, "app", "Database", "Msg") self.micro_msg_db_path = os.path.join(db_dir, "MicroMsg.db") self.msg_db_path = os.path.join(db_dir, "MSG.db") self.misc_db_path = os.path.join(db_dir, "MISC.db") print(f"[DataProcessor] 数据库路径配置:") print(f"MicroMsg数据库: {self.micro_msg_db_path}") print(f"MSG数据库: {self.msg_db_path}") print(f"MISC数据库: {self.misc_db_path}") self.micro_msg_conn = None self.msg_conn = None self.misc_conn = None self.init_database() def check_misc_tables(self): """检查MISC数据库的表结构""" if not self.misc_conn: raise Exception("MISC 数据库未连接") try: cursor = self.misc_conn.cursor() # 获取所有表名 cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = cursor.fetchall() print("\nMISC数据库中的表:") for table in tables: print(f"\n表名: {table[0]}") cursor.execute(f"PRAGMA table_info({table[0]})") columns = cursor.fetchall() for col in columns: print(f" 列名: {col[1]}, 类型: {col[2]}") except sqlite3.Error as e: print(f"检查表结构失败:{str(e)}") finally: if cursor: cursor.close() def init_database(self): """初始化数据库连接""" if not os.path.exists(os.path.dirname(self.micro_msg_db_path)): print(f"[DataProcessor] 数据库目录不存在,尝试创建: {os.path.dirname(self.micro_msg_db_path)}") try: os.makedirs(os.path.dirname(self.micro_msg_db_path), exist_ok=True) except Exception as e: print(f"[DataProcessor] 创建数据库目录失败: {str(e)}") if os.path.exists(self.micro_msg_db_path): try: self.micro_msg_conn = sqlite3.connect(self.micro_msg_db_path, check_same_thread=False) print(f"[DataProcessor] 成功连接到 MicroMsg 数据库: {self.micro_msg_db_path}") except sqlite3.Error as e: print(f"[DataProcessor] 连接 MicroMsg 数据库失败: {str(e)}") self.micro_msg_conn = None else: print(f"[DataProcessor] MicroMsg 数据库文件不存在: {self.micro_msg_db_path}") if os.path.exists(self.msg_db_path): try: self.msg_conn = sqlite3.connect(self.msg_db_path, check_same_thread=False) print(f"[DataProcessor] 成功连接到 MSG 数据库: {self.msg_db_path}") except sqlite3.Error as e: print(f"[DataProcessor] 连接 MSG 数据库失败: {str(e)}") self.msg_conn = None else: print(f"[DataProcessor] MSG 数据库文件不存在: {self.msg_db_path}") if os.path.exists(self.misc_db_path): try: self.misc_conn = sqlite3.connect(self.misc_db_path, check_same_thread=False) print(f"[DataProcessor] 成功连接到 MISC 数据库: {self.misc_db_path}") # 检查MISC数据库的表结构 self.check_misc_tables() except sqlite3.Error as e: print(f"[DataProcessor] 连接 MISC 数据库失败: {str(e)}") self.misc_conn = None else: print(f"[DataProcessor] MISC 数据库文件不存在: {self.misc_db_path}") # 如果数据库未连接,尝试从其他位置查找 if not all([self.micro_msg_conn, self.msg_conn, self.misc_conn]): print("[DataProcessor] 尝试从其他位置查找数据库...") alt_db_paths = [ os.path.join(project_root, "app", "DataBase", "Msg"), # 注意大小写 os.path.join(project_root, "app", "database", "msg"), os.path.join(project_root, "app", "Database", "msg"), os.path.join(project_root, "app", "database", "Msg"), ] for db_dir in alt_db_paths: if os.path.exists(db_dir): print(f"[DataProcessor] 找到数据库目录: {db_dir}") self.micro_msg_db_path = os.path.join(db_dir, "MicroMsg.db") self.msg_db_path = os.path.join(db_dir, "MSG.db") self.misc_db_path = os.path.join(db_dir, "MISC.db") # 重新尝试连接 if not self.micro_msg_conn and os.path.exists(self.micro_msg_db_path): try: self.micro_msg_conn = sqlite3.connect(self.micro_msg_db_path, check_same_thread=False) print(f"[DataProcessor] 成功连接到 MicroMsg 数据库: {self.micro_msg_db_path}") except sqlite3.Error as e: print(f"[DataProcessor] 连接 MicroMsg 数据库失败: {str(e)}") if not self.msg_conn and os.path.exists(self.msg_db_path): try: self.msg_conn = sqlite3.connect(self.msg_db_path, check_same_thread=False) print(f"[DataProcessor] 成功连接到 MSG 数据库: {self.msg_db_path}") except sqlite3.Error as e: print(f"[DataProcessor] 连接 MSG 数据库失败: {str(e)}") if not self.misc_conn and os.path.exists(self.misc_db_path): try: self.misc_conn = sqlite3.connect(self.misc_db_path, check_same_thread=False) print(f"[DataProcessor] 成功连接到 MISC 数据库: {self.misc_db_path}") self.check_misc_tables() except sqlite3.Error as e: print(f"[DataProcessor] 连接 MISC 数据库失败: {str(e)}") if all([self.micro_msg_conn, self.msg_conn, self.misc_conn]): print("[DataProcessor] 所有数据库连接成功") break def check_contact_table(self): """检查Contact表的结构""" if not self.micro_msg_conn: raise Exception("MicroMsg 数据库未连接") try: cursor = self.micro_msg_conn.cursor() # 获取表结构 cursor.execute("PRAGMA table_info(Contact)") columns = cursor.fetchall() print("\nContact表结构:") for col in columns: print(f"列名: {col[1]}, 类型: {col[2]}") # 获取一条示例数据 cursor.execute("SELECT * FROM Contact LIMIT 1") result = cursor.fetchone() if result: print("\n示例数据:") for i, col in enumerate(columns): print(f"{col[1]}: {result[i]}") except sqlite3.Error as e: print(f"检查表结构失败:{str(e)}") finally: if cursor: cursor.close() def get_all_contacts(self) -> List[Dict]: """获取所有联系人列表 Returns: List[Dict]: 联系人列表 """ if not self.micro_msg_conn: raise Exception("MicroMsg 数据库未连接") try: cursor = self.micro_msg_conn.cursor() # 查询所有联系人 query = """ SELECT Contact.UserName, Contact.Alias, Contact.Type, Contact.Remark, Contact.NickName, Contact.PYInitial, Contact.RemarkPYInitial, ContactHeadImgUrl.smallHeadImgUrl, ContactHeadImgUrl.bigHeadImgUrl, Contact.ExTraBuf, COALESCE(ContactLabel.LabelName, 'None') AS labelName FROM Contact INNER JOIN ContactHeadImgUrl ON Contact.UserName = ContactHeadImgUrl.usrName LEFT JOIN ContactLabel ON Contact.LabelIDList = ContactLabel.LabelId WHERE (Type!=4 AND VerifyFlag=0) AND NickName != '' ORDER BY CASE WHEN RemarkPYInitial = '' THEN PYInitial ELSE RemarkPYInitial END ASC """ try: cursor.execute(query) results = cursor.fetchall() except sqlite3.OperationalError: # 处理ContactLabel表不存在的情况 query = """ SELECT Contact.UserName, Contact.Alias, Contact.Type, Contact.Remark, Contact.NickName, Contact.PYInitial, Contact.RemarkPYInitial, ContactHeadImgUrl.smallHeadImgUrl, ContactHeadImgUrl.bigHeadImgUrl, Contact.ExTraBuf, 'None' as labelName FROM Contact INNER JOIN ContactHeadImgUrl ON Contact.UserName = ContactHeadImgUrl.usrName WHERE (Type!=4 AND VerifyFlag=0) AND NickName != '' ORDER BY CASE WHEN RemarkPYInitial = '' THEN PYInitial ELSE RemarkPYInitial END ASC """ cursor.execute(query) results = cursor.fetchall() print(f"\n找到 {len(results)} 个联系人") contacts = [] for result in results: original_name = result[3] if result[3] else result[4] # 优先使用备注名 display_name = re.sub(r'[\\/:*?"<>|\s\.]', '_', original_name) # 用于文件名等需要处理特殊字符的场景 if original_name: # 确保名称不为空 contacts.append({ 'wxid': result[0], 'alias': result[1], 'type': result[2], 'name': display_name, 'original_name': original_name, 'nickname': result[4], 'py_initial': result[5], 'remark_py_initial': result[6], 'small_avatar_url': result[7], 'big_avatar_url': result[8], 'extra_buf': result[9], 'label_name': result[10] }) return contacts except sqlite3.Error as e: raise Exception(f"数据库查询失败:{str(e)}") finally: if cursor: cursor.close() def get_chat_history(self, contact_id: str, start_date: str, end_date: str) -> List[Dict]: """获取指定时间范围内的聊天记录 Args: contact_id: 联系人ID start_date: 开始日期(格式:YYYY-MM-DD) end_date: 结束日期(格式:YYYY-MM-DD) Returns: List[Dict]: 聊天记录列表 """ try: cursor = self.msg_conn.cursor() # 查询聊天记录 query = """ SELECT localId, TalkerId, Type, SubType, IsSender, CreateTime, Status, StrContent, strftime('%Y-%m-%d %H:%M:%S', CreateTime, 'unixepoch', 'localtime') as create_time FROM MSG WHERE StrTalker = ? AND CreateTime BETWEEN strftime('%s', ?) AND strftime('%s', ?) AND Type = 1 -- 只获取文本消息 ORDER BY CreateTime ASC """ cursor.execute(query, (contact_id, start_date, end_date)) results = cursor.fetchall() # 转换为字典列表 chat_history = [] for msg in results: chat_history.append({ 'local_id': msg[0], 'talker_id': msg[1], 'type': msg[2], 'sub_type': msg[3], 'is_sender': msg[4], 'timestamp': msg[5], 'status': msg[6], 'message': msg[7], 'create_time': msg[8] }) return chat_history except sqlite3.Error as e: raise Exception(f"数据库查询失败:{str(e)}") finally: if cursor: cursor.close() def analyze_chat_content(self, chat_history: List[Dict]) -> Dict: """分析聊天内容,提取关键信息 Args: chat_history: 聊天记录列表 Returns: Dict: 分析结果,包含关键词、聊天频率等信息 """ # 实现聊天内容分析逻辑 pass def get_contact_avatar(self, wxid: str) -> bytes: """获取联系人头像数据 Args: wxid: 联系人的微信ID Returns: bytes: 头像的二进制数据 """ try: print(f"[DataProcessor] 开始获取头像 - 联系人ID: {wxid}") cursor = self.misc_conn.cursor() # 从ContactHeadImg1表获取头像数据 query = "SELECT smallHeadBuf FROM ContactHeadImg1 WHERE usrName=?" cursor.execute(query, (wxid,)) result = cursor.fetchone() if result and result[0]: data = result[0] print(f"[DataProcessor] 成功获取头像数据 - 联系人ID: {wxid}, 数据大小: {len(data)} 字节") return data else: print(f"[DataProcessor] 未找到头像数据 - 联系人ID: {wxid}") return None except sqlite3.Error as e: print(f"[DataProcessor] 获取头像失败 - 联系人ID: {wxid}, 错误: {str(e)}") return None finally: if cursor: cursor.close() def close(self): """关闭数据库连接""" if self.micro_msg_conn: self.micro_msg_conn.close() if self.msg_conn: self.msg_conn.close() if self.misc_conn: self.misc_conn.close() def __del__(self): self.close()