# -*- coding: utf-8 -*-# # ------------------------------------------------------------------------------- # Name: parsingMSG.py # Description: # Author: xaoyaoo # Date: 2024/04/15 # ------------------------------------------------------------------------------- import json import os import re import pandas as pd from .dbbase import DatabaseBase from .utils import get_md5, name2typeid, typeid2name, type_converter, timestamp2str, xml2dict, match_BytesExtra import lz4.block import blackboxprotobuf class ParsingMSG(DatabaseBase): _class_name = "MSG" BytesExtra_message_type = { "1": { "type": "message", "message_typedef": { "1": { "type": "int", "name": "" }, "2": { "type": "int", "name": "" } }, "name": "1" }, "3": { "type": "message", "message_typedef": { "1": { "type": "int", "name": "" }, "2": { "type": "str", "name": "" } }, "name": "3", "alt_typedefs": { "1": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": {}, "name": "" } }, "2": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "13": { "type": "fixed32", "name": "" }, "12": { "type": "fixed32", "name": "" } }, "name": "" } }, "3": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "15": { "type": "fixed64", "name": "" } }, "name": "" } }, "4": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "15": { "type": "int", "name": "" }, "14": { "type": "fixed32", "name": "" } }, "name": "" } }, "5": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "12": { "type": "fixed32", "name": "" }, "7": { "type": "fixed64", "name": "" }, "6": { "type": "fixed64", "name": "" } }, "name": "" } }, "6": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "7": { "type": "fixed64", "name": "" }, "6": { "type": "fixed32", "name": "" } }, "name": "" } }, "7": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "12": { "type": "fixed64", "name": "" } }, "name": "" } }, "8": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "6": { "type": "fixed64", "name": "" }, "12": { "type": "fixed32", "name": "" } }, "name": "" } }, "9": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "15": { "type": "int", "name": "" }, "12": { "type": "fixed64", "name": "" }, "6": { "type": "int", "name": "" } }, "name": "" } }, "10": { "1": { "type": "int", "name": "" }, "2": { "type": "message", "message_typedef": { "6": { "type": "fixed32", "name": "" }, "12": { "type": "fixed64", "name": "" } }, "name": "" } }, } } } def __init__(self, db_path): super().__init__(db_path) def decompress_CompressContent(self, data): """ 解压缩Msg:CompressContent内容 :param data: CompressContent内容 bytes :return: """ if data is None or not isinstance(data, bytes): return None try: dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8) dst = dst.replace(b'\x00', b'') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错 uncompressed_data = dst.decode('utf-8', errors='ignore') return uncompressed_data except Exception as e: return data.decode('utf-8', errors='ignore') def get_BytesExtra(self, BytesExtra): if BytesExtra is None or not isinstance(BytesExtra, bytes): return None try: deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra, self.BytesExtra_message_type) return deserialize_data except Exception as e: return None def msg_count(self, wxid: str = ""): """ 获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量,不传wxid则获取所有联系人的聊天记录数量 :param MSG_db_path: MSG.db 文件路径 :return: 聊天记录数量列表 {wxid: chat_count} """ if wxid: sql = f"SELECT StrTalker, COUNT(*) FROM MSG WHERE StrTalker='{wxid}';" else: sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;" result = self.execute_sql(sql) if not result: return {} df = pd.DataFrame(result, columns=["wxid", "msg_count"]) # # 排序 df = df.sort_values(by="msg_count", ascending=False) # chat_counts : {wxid: chat_count} chat_counts = df.set_index("wxid").to_dict()["msg_count"] return chat_counts def msg_count_total(self): """ 获取聊天记录总数 :return: 聊天记录总数 """ sql = "SELECT COUNT(*) FROM MSG;" result = self.execute_sql(sql) if result and len(result) > 0: chat_counts = result[0][0] return chat_counts return 0 # def room_user_list(self, selected_talker): # """ # 获取群聊中包含的所有用户列表 # :param MSG_db_path: MSG.db 文件路径 # :param selected_talker: 选中的聊天对象 wxid # :return: 聊天用户列表 # """ # sql = ( # "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " # "FROM MSG WHERE StrTalker=? " # "ORDER BY CreateTime ASC") # # result1 = self.execute_sql(sql, (selected_talker,)) # user_list = [] # read_user_wx_id = [] # for row in result1: # localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row # bytes_extra = self.get_BytesExtra(BytesExtra) # if bytes_extra: # try: # talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore') # except: # continue # if talker in read_user_wx_id: # continue # user = get_contact(MSG_db_path, talker) # if not user: # continue # user_list.append(user) # read_user_wx_id.append(talker) # return user_list # 单条消息处理 def msg_detail(self, row): """ 获取单条消息详情,格式化输出 """ (localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id) = row CreateTime = timestamp2str(CreateTime) type_id = (Type, SubType) type_name = typeid2name(type_id) content = {"src": "", "msg": StrContent} if type_id == (1, 0): # 文本 content["msg"] = StrContent elif type_id == (3, 0): # 图片 DictExtra = self.get_BytesExtra(BytesExtra) DictExtra_str = str(DictExtra) img_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)] img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True) if img_paths: img_path = img_paths[0].replace("'", "") img_path = [i for i in img_path.split("\\") if i] img_path = os.path.join(*img_path) content["src"] = img_path else: content["src"] = "" content["msg"] = "图片" elif type_id == (34, 0): # 语音 tmp_c = xml2dict(StrContent) voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") transtext = tmp_c.get("voicetrans", {}).get("transtext", "") if voicelength.isdigit(): voicelength = int(voicelength) / 1000 voicelength = f"{voicelength:.2f}" content[ "msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" if transtext else f"语音时长:{voicelength}秒" content["src"] = os.path.join("audio", f"{StrTalker}", f"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav") elif type_id == (43, 0): # 视频 DictExtra = self.get_BytesExtra(BytesExtra) DictExtra = str(DictExtra) DictExtra_str = str(DictExtra) video_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)] video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True) if video_paths: video_path = video_paths[0].replace("'", "") video_path = [i for i in video_path.split("\\") if i] video_path = os.path.join(*video_path) content["src"] = video_path else: content["src"] = "" content["msg"] = "视频" elif type_id == (47, 0): # 动画表情 content_tmp = xml2dict(StrContent) cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") if not cdnurl: DictExtra = self.get_BytesExtra(BytesExtra) cdnurl = match_BytesExtra(DictExtra) if cdnurl: content = {"src": cdnurl, "msg": "表情"} elif type_id == (48, 0): # 地图信息 content_tmp = xml2dict(StrContent) location = content_tmp.get("location", {}) content["msg"] = (f"纬度:【{location.pop('x')}】 经度:【{location.pop('y')}】\n" f"位置:{location.pop('label')} {location.pop('poiname')}\n" f"其他信息:{json.dumps(location, ensure_ascii=False, indent=4)}" ) content["src"] = "" elif type_id == (49, 0): # 文件 DictExtra = self.get_BytesExtra(BytesExtra) url = match_BytesExtra(DictExtra) content["src"] = url file_name = os.path.basename(url) content["msg"] = file_name elif type_id == (49, 5): # (分享)卡片式链接 CompressContent = self.decompress_CompressContent(CompressContent) CompressContent_tmp = xml2dict(CompressContent) appmsg = CompressContent_tmp.get("appmsg", {}) title = appmsg.get("title", "") des = appmsg.get("des", "") url = appmsg.get("url", "") content["msg"] = f"{title}\n{des}\n\n{url}" content["src"] = url elif type_id == (49, 19): # 合并转发的聊天记录 CompressContent = self.decompress_CompressContent(CompressContent) content_tmp = xml2dict(CompressContent) title = content_tmp.get("appmsg", {}).get("title", "") des = content_tmp.get("appmsg", {}).get("des", "") recorditem = content_tmp.get("appmsg", {}).get("recorditem", "") recorditem = xml2dict(recorditem) content["msg"] = f"{title}\n{des}" content["src"] = recorditem elif type_id == (49, 57): # 带有引用的文本消息 CompressContent = self.decompress_CompressContent(CompressContent) content_tmp = xml2dict(CompressContent) appmsg = content_tmp.get("appmsg", {}) title = appmsg.get("title", "") refermsg = appmsg.get("refermsg", {}) displayname = refermsg.get("displayname", "") display_content = refermsg.get("content", "") display_createtime = refermsg.get("createtime", "") display_createtime = timestamp2str( int(display_createtime)) if display_createtime.isdigit() else display_createtime content["msg"] = f"{title}\n\n[引用]({display_createtime}){displayname}:{display_content}" content["src"] = "" elif type_id == (49, 2000): # 转账消息 CompressContent = self.decompress_CompressContent(CompressContent) content_tmp = xml2dict(CompressContent) wcpayinfo = content_tmp.get("appmsg", {}).get("wcpayinfo", {}) paysubtype = wcpayinfo.get("paysubtype", "") # 转账类型 feedesc = wcpayinfo.get("feedesc", "") # 转账金额 pay_memo = wcpayinfo.get("pay_memo", "") # 转账备注 begintransfertime = wcpayinfo.get("begintransfertime", "") # 转账开始时间 content["msg"] = (f"{'已收款' if paysubtype == '3' else '转账'}:{feedesc}\n" f"转账说明:{pay_memo if pay_memo else ''}\n" f"转账时间:{timestamp2str(begintransfertime)}\n" ) content["src"] = "" elif type_id[0] == 49 and type_id[1] != 0: DictExtra = self.get_BytesExtra(BytesExtra) url = match_BytesExtra(DictExtra) content["src"] = url content["msg"] = type_name elif type_id == (50, 0): # 语音通话 content["msg"] = "语音/视频通话[%s]" % DisplayContent # elif type_id == (10000, 0): # content["msg"] = StrContent # elif type_id == (10000, 4): # content["msg"] = StrContent # elif type_id == (10000, 8000): # content["msg"] = StrContent talker = "未知" if IsSender == 1: talker = "我" else: if StrTalker.endswith("@chatroom"): bytes_extra = self.get_BytesExtra(BytesExtra) if bytes_extra: try: talker = bytes_extra['3'][0]['2'] if "publisher-id" in talker: talker = "系统" except: pass else: talker = StrTalker row_data = {"MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker, "room_name": StrTalker, "content": content, "CreateTime": CreateTime, "id": id} return row_data def msg_list(self, wxid="", start_index=0, page_size=500, msg_type: str = ""): if wxid: sql = ( "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " "FROM MSG WHERE StrTalker=? " "ORDER BY CreateTime ASC LIMIT ?,?") if msg_type: sql = sql.replace("ORDER BY CreateTime ASC LIMIT ?,?", f"AND Type={msg_type} ORDER BY CreateTime ASC LIMIT ?,?") result1 = self.execute_sql(sql, (wxid, start_index, page_size)) else: sql = ( "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " "FROM MSG ORDER BY CreateTime ASC LIMIT ?,?") if msg_type: sql = sql.replace("ORDER BY CreateTime ASC LIMIT ?,?", f"AND Type={msg_type} ORDER BY CreateTime ASC LIMIT ?,?") result1 = self.execute_sql(sql, (start_index, page_size)) if not result1: return [], [] data = [] wxid_list = [] for row in result1: tmpdata = self.msg_detail(row) wxid_list.append(tmpdata["talker"]) data.append(tmpdata) wxid_list = list(set(wxid_list)) return data, wxid_list