WeChatMsg_NY/newYear/utils/data_processor.py

371 lines
15 KiB
Python
Raw Normal View History

2025-01-22 17:22:20 +08:00
"""
数据处理工具类用于处理聊天记录和联系人信息
"""
import os
import sqlite3
from typing import List, Dict, Tuple
from datetime import datetime
import re
class DataProcessor:
def __init__(self):
"""初始化数据处理器"""
# 获取当前文件所在目录的上两级目录作为项目根目录
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 使用os.path.join构建跨平台的路径
db_dir = os.path.join(project_root, "app", "Database", "Msg")
self.micro_msg_db_path = os.path.join(db_dir, "MicroMsg.db")
self.msg_db_path = os.path.join(db_dir, "MSG.db")
self.misc_db_path = os.path.join(db_dir, "MISC.db")
print(f"[DataProcessor] 数据库路径配置:")
print(f"MicroMsg数据库: {self.micro_msg_db_path}")
print(f"MSG数据库: {self.msg_db_path}")
print(f"MISC数据库: {self.misc_db_path}")
self.micro_msg_conn = None
self.msg_conn = None
self.misc_conn = None
self.init_database()
def check_misc_tables(self):
"""检查MISC数据库的表结构"""
if not self.misc_conn:
raise Exception("MISC 数据库未连接")
try:
cursor = self.misc_conn.cursor()
# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print("\nMISC数据库中的表:")
for table in tables:
print(f"\n表名: {table[0]}")
cursor.execute(f"PRAGMA table_info({table[0]})")
columns = cursor.fetchall()
for col in columns:
print(f" 列名: {col[1]}, 类型: {col[2]}")
except sqlite3.Error as e:
print(f"检查表结构失败:{str(e)}")
finally:
if cursor:
cursor.close()
def init_database(self):
"""初始化数据库连接"""
if not os.path.exists(os.path.dirname(self.micro_msg_db_path)):
print(f"[DataProcessor] 数据库目录不存在,尝试创建: {os.path.dirname(self.micro_msg_db_path)}")
try:
os.makedirs(os.path.dirname(self.micro_msg_db_path), exist_ok=True)
except Exception as e:
print(f"[DataProcessor] 创建数据库目录失败: {str(e)}")
if os.path.exists(self.micro_msg_db_path):
try:
self.micro_msg_conn = sqlite3.connect(self.micro_msg_db_path, check_same_thread=False)
print(f"[DataProcessor] 成功连接到 MicroMsg 数据库: {self.micro_msg_db_path}")
except sqlite3.Error as e:
print(f"[DataProcessor] 连接 MicroMsg 数据库失败: {str(e)}")
self.micro_msg_conn = None
else:
print(f"[DataProcessor] MicroMsg 数据库文件不存在: {self.micro_msg_db_path}")
if os.path.exists(self.msg_db_path):
try:
self.msg_conn = sqlite3.connect(self.msg_db_path, check_same_thread=False)
print(f"[DataProcessor] 成功连接到 MSG 数据库: {self.msg_db_path}")
except sqlite3.Error as e:
print(f"[DataProcessor] 连接 MSG 数据库失败: {str(e)}")
self.msg_conn = None
else:
print(f"[DataProcessor] MSG 数据库文件不存在: {self.msg_db_path}")
if os.path.exists(self.misc_db_path):
try:
self.misc_conn = sqlite3.connect(self.misc_db_path, check_same_thread=False)
print(f"[DataProcessor] 成功连接到 MISC 数据库: {self.misc_db_path}")
# 检查MISC数据库的表结构
self.check_misc_tables()
except sqlite3.Error as e:
print(f"[DataProcessor] 连接 MISC 数据库失败: {str(e)}")
self.misc_conn = None
else:
print(f"[DataProcessor] MISC 数据库文件不存在: {self.misc_db_path}")
# 如果数据库未连接,尝试从其他位置查找
if not all([self.micro_msg_conn, self.msg_conn, self.misc_conn]):
print("[DataProcessor] 尝试从其他位置查找数据库...")
alt_db_paths = [
os.path.join(project_root, "app", "DataBase", "Msg"), # 注意大小写
os.path.join(project_root, "app", "database", "msg"),
os.path.join(project_root, "app", "Database", "msg"),
os.path.join(project_root, "app", "database", "Msg"),
]
for db_dir in alt_db_paths:
if os.path.exists(db_dir):
print(f"[DataProcessor] 找到数据库目录: {db_dir}")
self.micro_msg_db_path = os.path.join(db_dir, "MicroMsg.db")
self.msg_db_path = os.path.join(db_dir, "MSG.db")
self.misc_db_path = os.path.join(db_dir, "MISC.db")
# 重新尝试连接
if not self.micro_msg_conn and os.path.exists(self.micro_msg_db_path):
try:
self.micro_msg_conn = sqlite3.connect(self.micro_msg_db_path, check_same_thread=False)
print(f"[DataProcessor] 成功连接到 MicroMsg 数据库: {self.micro_msg_db_path}")
except sqlite3.Error as e:
print(f"[DataProcessor] 连接 MicroMsg 数据库失败: {str(e)}")
if not self.msg_conn and os.path.exists(self.msg_db_path):
try:
self.msg_conn = sqlite3.connect(self.msg_db_path, check_same_thread=False)
print(f"[DataProcessor] 成功连接到 MSG 数据库: {self.msg_db_path}")
except sqlite3.Error as e:
print(f"[DataProcessor] 连接 MSG 数据库失败: {str(e)}")
if not self.misc_conn and os.path.exists(self.misc_db_path):
try:
self.misc_conn = sqlite3.connect(self.misc_db_path, check_same_thread=False)
print(f"[DataProcessor] 成功连接到 MISC 数据库: {self.misc_db_path}")
self.check_misc_tables()
except sqlite3.Error as e:
print(f"[DataProcessor] 连接 MISC 数据库失败: {str(e)}")
if all([self.micro_msg_conn, self.msg_conn, self.misc_conn]):
print("[DataProcessor] 所有数据库连接成功")
break
def check_contact_table(self):
"""检查Contact表的结构"""
if not self.micro_msg_conn:
raise Exception("MicroMsg 数据库未连接")
try:
cursor = self.micro_msg_conn.cursor()
# 获取表结构
cursor.execute("PRAGMA table_info(Contact)")
columns = cursor.fetchall()
print("\nContact表结构:")
for col in columns:
print(f"列名: {col[1]}, 类型: {col[2]}")
# 获取一条示例数据
cursor.execute("SELECT * FROM Contact LIMIT 1")
result = cursor.fetchone()
if result:
print("\n示例数据:")
for i, col in enumerate(columns):
print(f"{col[1]}: {result[i]}")
except sqlite3.Error as e:
print(f"检查表结构失败:{str(e)}")
finally:
if cursor:
cursor.close()
def get_all_contacts(self) -> List[Dict]:
"""获取所有联系人列表
Returns:
List[Dict]: 联系人列表
"""
if not self.micro_msg_conn:
raise Exception("MicroMsg 数据库未连接")
try:
cursor = self.micro_msg_conn.cursor()
# 查询所有联系人
query = """
SELECT Contact.UserName, Contact.Alias, Contact.Type, Contact.Remark, Contact.NickName,
Contact.PYInitial, Contact.RemarkPYInitial,
ContactHeadImgUrl.smallHeadImgUrl, ContactHeadImgUrl.bigHeadImgUrl,
Contact.ExTraBuf,
COALESCE(ContactLabel.LabelName, 'None') AS labelName
FROM Contact
INNER JOIN ContactHeadImgUrl ON Contact.UserName = ContactHeadImgUrl.usrName
LEFT JOIN ContactLabel ON Contact.LabelIDList = ContactLabel.LabelId
WHERE (Type!=4 AND VerifyFlag=0)
AND NickName != ''
ORDER BY
CASE
WHEN RemarkPYInitial = '' THEN PYInitial
ELSE RemarkPYInitial
END ASC
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.OperationalError:
# 处理ContactLabel表不存在的情况
query = """
SELECT Contact.UserName, Contact.Alias, Contact.Type, Contact.Remark, Contact.NickName,
Contact.PYInitial, Contact.RemarkPYInitial,
ContactHeadImgUrl.smallHeadImgUrl, ContactHeadImgUrl.bigHeadImgUrl,
Contact.ExTraBuf,
'None' as labelName
FROM Contact
INNER JOIN ContactHeadImgUrl ON Contact.UserName = ContactHeadImgUrl.usrName
WHERE (Type!=4 AND VerifyFlag=0)
AND NickName != ''
ORDER BY
CASE
WHEN RemarkPYInitial = '' THEN PYInitial
ELSE RemarkPYInitial
END ASC
"""
cursor.execute(query)
results = cursor.fetchall()
print(f"\n找到 {len(results)} 个联系人")
contacts = []
for result in results:
original_name = result[3] if result[3] else result[4] # 优先使用备注名
display_name = re.sub(r'[\\/:*?"<>|\s\.]', '_', original_name) # 用于文件名等需要处理特殊字符的场景
if original_name: # 确保名称不为空
contacts.append({
'wxid': result[0],
'alias': result[1],
'type': result[2],
'name': display_name,
'original_name': original_name,
'nickname': result[4],
'py_initial': result[5],
'remark_py_initial': result[6],
'small_avatar_url': result[7],
'big_avatar_url': result[8],
'extra_buf': result[9],
'label_name': result[10]
})
return contacts
except sqlite3.Error as e:
raise Exception(f"数据库查询失败:{str(e)}")
finally:
if cursor:
cursor.close()
def get_chat_history(self, contact_id: str, start_date: str, end_date: str) -> List[Dict]:
"""获取指定时间范围内的聊天记录
Args:
contact_id: 联系人ID
start_date: 开始日期格式YYYY-MM-DD
end_date: 结束日期格式YYYY-MM-DD
Returns:
List[Dict]: 聊天记录列表
"""
try:
cursor = self.msg_conn.cursor()
# 查询聊天记录
query = """
SELECT localId, TalkerId, Type, SubType, IsSender, CreateTime, Status,
StrContent, strftime('%Y-%m-%d %H:%M:%S', CreateTime, 'unixepoch', 'localtime') as create_time
FROM MSG
WHERE StrTalker = ?
AND CreateTime BETWEEN strftime('%s', ?) AND strftime('%s', ?)
AND Type = 1 -- 只获取文本消息
ORDER BY CreateTime ASC
"""
cursor.execute(query, (contact_id, start_date, end_date))
results = cursor.fetchall()
# 转换为字典列表
chat_history = []
for msg in results:
chat_history.append({
'local_id': msg[0],
'talker_id': msg[1],
'type': msg[2],
'sub_type': msg[3],
'is_sender': msg[4],
'timestamp': msg[5],
'status': msg[6],
'message': msg[7],
'create_time': msg[8]
})
return chat_history
except sqlite3.Error as e:
raise Exception(f"数据库查询失败:{str(e)}")
finally:
if cursor:
cursor.close()
def analyze_chat_content(self, chat_history: List[Dict]) -> Dict:
"""分析聊天内容,提取关键信息
Args:
chat_history: 聊天记录列表
Returns:
Dict: 分析结果包含关键词聊天频率等信息
"""
# 实现聊天内容分析逻辑
pass
def get_contact_avatar(self, wxid: str) -> bytes:
"""获取联系人头像数据
Args:
wxid: 联系人的微信ID
Returns:
bytes: 头像的二进制数据
"""
try:
print(f"[DataProcessor] 开始获取头像 - 联系人ID: {wxid}")
cursor = self.misc_conn.cursor()
# 从ContactHeadImg1表获取头像数据
query = "SELECT smallHeadBuf FROM ContactHeadImg1 WHERE usrName=?"
cursor.execute(query, (wxid,))
result = cursor.fetchone()
if result and result[0]:
data = result[0]
print(f"[DataProcessor] 成功获取头像数据 - 联系人ID: {wxid}, 数据大小: {len(data)} 字节")
return data
else:
print(f"[DataProcessor] 未找到头像数据 - 联系人ID: {wxid}")
return None
except sqlite3.Error as e:
print(f"[DataProcessor] 获取头像失败 - 联系人ID: {wxid}, 错误: {str(e)}")
return None
finally:
if cursor:
cursor.close()
def close(self):
"""关闭数据库连接"""
if self.micro_msg_conn:
self.micro_msg_conn.close()
if self.msg_conn:
self.msg_conn.close()
if self.misc_conn:
self.misc_conn.close()
def __del__(self):
self.close()