diff --git a/doc/CHANGELOG.md b/doc/CHANGELOG.md index 92a94c3..f063842 100644 --- a/doc/CHANGELOG.md +++ b/doc/CHANGELOG.md @@ -1,7 +1,18 @@ -## v2.4.63.(待发布) +## v2.4.71.(待发布) + +- 修改数据库匹配规则 +- 增加3.9.10.19支持 +- fix 图片优先显示清晰版本 +- UPDATE README.md +- 读取ExtraBuf(联系人表) +- fix 部分情况下视频不能正常显示 +- MSG数量超过10个无法获取最新数据的bug + +## v2.4.70 - 增加对引用消息的解析 - Update README.md +- UPDATE CHANGELOG.md - 读取群聊数据,主要为 wxid,以及对应昵称 ## v2.4.62 diff --git a/pywxdump/dbpreprocess/__init__.py b/pywxdump/dbpreprocess/__init__.py new file mode 100644 index 0000000..aa2f64d --- /dev/null +++ b/pywxdump/dbpreprocess/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: __init__.py.py +# Description: +# Author: xaoyaoo +# Date: 2024/04/15 +# ------------------------------------------------------------------------------- +from .parsingMSG import ParsingMSG +from .parsingMicroMsg import ParsingMicroMsg +from .parsingMediaMSG import ParsingMediaMSG + + diff --git a/pywxdump/dbpreprocess/dbbase.py b/pywxdump/dbpreprocess/dbbase.py new file mode 100644 index 0000000..30b8a3c --- /dev/null +++ b/pywxdump/dbpreprocess/dbbase.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: dbbase.py +# Description: +# Author: xaoyaoo +# Date: 2024/04/15 +# ------------------------------------------------------------------------------- +import os +import sqlite3 +import logging + + +class DatabaseBase: + _singleton_instances = {} # 使用字典存储不同db_path对应的单例实例 + + def __new__(cls, db_path): + if db_path not in cls._singleton_instances: + cls._singleton_instances[db_path] = super().__new__(cls) + return cls._singleton_instances[db_path] + + def __init__(self, db_path): + self._db_path = db_path + self._db_connection = self._connect_to_database(db_path) + + @classmethod + def _connect_to_database(cls, db_path): + if not os.path.exists(db_path): + raise FileNotFoundError(f"文件不存在: {db_path}") + connection = sqlite3.connect(db_path, check_same_thread=False) + logging.info(f"{connection} 连接句柄创建 {db_path}") + return connection + + def execute_sql(self, sql, params=None): + # 检测数据库连接是否关闭 + if not self._db_connection: + logging.warning(f"重新连接数据库 - {self._db_path}") + self._connect_to_database(self._db_path) + connection = self._db_connection + try: + # connection.text_factory = bytes + cursor = connection.cursor() + if params: + cursor.execute(sql, params) + else: + cursor.execute(sql) + return cursor.fetchall() + except Exception as e1: + try: + connection.text_factory = bytes + cursor = connection.cursor() + if params: + cursor.execute(sql, params) + else: + cursor.execute(sql) + rdata = cursor.fetchall() + connection.text_factory = str + return rdata + except Exception as e2: + logging.error(f"**********\nSQL: {sql}\nparams: {params}\n{e1}\n{e2}\n**********") + return None + + def close_connection(self): + if self._db_connection: + self._db_connection.close() + logging.info(f"关闭数据库 - {self._db_path}") + self._db_connection = None + + def show__singleton_instances(self): + print(self._singleton_instances) + + def __del__(self): + self.close_connection() + del self._singleton_instances[self._db_path] + + +if __name__ == '__main__': + a = DatabaseBase("test.db") + b = DatabaseBase("test1.db") + + d1 = a.execute_sql("select * from sqlite_master;") + d2 = b.execute_sql("select * from sqlite_master;") + print([i[1] for i in d1]) + print([i[1] for i in d2]) + + a.close_connection() + b.close_connection() diff --git a/pywxdump/dbpreprocess/parsingMSG.py b/pywxdump/dbpreprocess/parsingMSG.py new file mode 100644 index 0000000..6be1e77 --- /dev/null +++ b/pywxdump/dbpreprocess/parsingMSG.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: parsingMSG.py +# Description: +# Author: xaoyaoo +# Date: 2024/04/15 +# ------------------------------------------------------------------------------- +import os +import re + +import pandas as pd + +from .dbbase import DatabaseBase +from .utils import get_md5, name2typeid, typeid2name, timestamp2str, xml2dict, match_BytesExtra +import lz4.block +import blackboxprotobuf + + +class ParsingMSG(DatabaseBase): + def __init__(self, db_path): + super().__init__(db_path) + + def decompress_CompressContent(self, data): + """ + 解压缩Msg:CompressContent内容 + :param data: CompressContent内容 bytes + :return: + """ + if data is None or not isinstance(data, bytes): + return None + try: + dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8) + dst = dst.replace(b'\x00', b'') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错 + uncompressed_data = dst.decode('utf-8', errors='ignore') + return uncompressed_data + except Exception as e: + return data.decode('utf-8', errors='ignore') + + def get_BytesExtra(self, BytesExtra): + if BytesExtra is None or not isinstance(BytesExtra, bytes): + return None + try: + deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra) + return deserialize_data + except Exception as e: + return None + + def chat_count(self, wxid: str = ""): + """ + 获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量,不传wxid则获取所有联系人的聊天记录数量 + :param MSG_db_path: MSG.db 文件路径 + :return: 聊天记录数量列表 + """ + if wxid: + sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{wxid}';" + else: + sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;" + + result = self.execute_sql(sql) + df = pd.DataFrame(result, columns=["wxid", "chat_count"]) + # chat_counts : {wxid: chat_count} + chat_counts = df.set_index("wxid").to_dict()["chat_count"] + return chat_counts + + def chat_count_total(self): + """ + 获取聊天记录总数 + :return: 聊天记录总数 + """ + sql = "SELECT COUNT(*) FROM MSG;" + result = self.execute_sql(sql) + if result and len(result) > 0: + chat_counts = result[0][0] + return chat_counts + return 0 + + # def room_user_list(self, selected_talker): + # """ + # 获取群聊中包含的所有用户列表 + # :param MSG_db_path: MSG.db 文件路径 + # :param selected_talker: 选中的聊天对象 wxid + # :return: 聊天用户列表 + # """ + # sql = ( + # "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " + # "FROM MSG WHERE StrTalker=? " + # "ORDER BY CreateTime ASC") + # + # result1 = self.execute_sql(sql, (selected_talker,)) + # user_list = [] + # read_user_wx_id = [] + # for row in result1: + # localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row + # bytes_extra = self.get_BytesExtra(BytesExtra) + # if bytes_extra: + # try: + # talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore') + # except: + # continue + # if talker in read_user_wx_id: + # continue + # user = get_contact(MSG_db_path, talker) + # if not user: + # continue + # user_list.append(user) + # read_user_wx_id.append(talker) + # return user_list + + # 单条消息处理 + def msg_detail(self, row): + """ + 获取单条消息详情,格式化输出 + """ + localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row + CreateTime = timestamp2str(CreateTime) + + type_id = (Type, SubType) + type_name = typeid2name(type_id) + + content = {"src": "", "msg": StrContent} + + if type_id == (1, 0): # 文本 + content["msg"] = StrContent + + elif type_id == (3, 0): # 图片 + DictExtra = self.get_BytesExtra(BytesExtra) + DictExtra_str = str(DictExtra) + img_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)] + img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True) + if img_paths: + img_path = img_paths[0].replace("'", "") + img_path = [i for i in img_path.split("\\") if i] + img_path = os.path.join(*img_path) + content["src"] = img_path + else: + content["src"] = "" + content["msg"] = "图片" + elif type_id == (34, 0): # 语音 + tmp_c = xml2dict(StrContent) + voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "") + transtext = tmp_c.get("voicetrans", {}).get("transtext", "") + if voicelength.isdigit(): + voicelength = int(voicelength) / 1000 + voicelength = f"{voicelength:.2f}" + content[ + "msg"] = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" if transtext else f"语音时长:{voicelength}秒" + content["src"] = os.path.join("audio", f"{StrTalker}", + f"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav") + elif type_id == (43, 0): # 视频 + DictExtra = self.get_BytesExtra(BytesExtra) + DictExtra = str(DictExtra) + + DictExtra_str = str(DictExtra) + video_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)] + video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True) + if video_paths: + video_path = video_paths[0].replace("'", "") + video_path = [i for i in video_path.split("\\") if i] + video_path = os.path.join(*video_path) + content["src"] = video_path + else: + content["src"] = "" + content["msg"] = "视频" + + elif type_id == (47, 0): # 动画表情 + content_tmp = xml2dict(StrContent) + cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "") + if cdnurl: + content = {"src": cdnurl, "msg": "表情"} + + elif type_id == (49, 0): + DictExtra = self.get_BytesExtra(BytesExtra) + url = match_BytesExtra(DictExtra) + content["src"] = url + file_name = os.path.basename(url) + content["msg"] = file_name + + elif type_id == (49, 19): # 合并转发的聊天记录 + CompressContent = self.decompress_CompressContent(CompressContent) + content_tmp = xml2dict(CompressContent) + title = content_tmp.get("appmsg", {}).get("title", "") + des = content_tmp.get("appmsg", {}).get("des", "") + recorditem = content_tmp.get("appmsg", {}).get("recorditem", "") + recorditem = xml2dict(recorditem) + content["msg"] = f"{title}\n{des}" + content["src"] = recorditem + + elif type_id == (49, 57): # 带有引用的文本消息 + CompressContent = self.decompress_CompressContent(CompressContent) + content_tmp = xml2dict(CompressContent) + appmsg = content_tmp.get("appmsg", {}) + title = appmsg.get("title", "") + refermsg = appmsg.get("refermsg", {}) + displayname = refermsg.get("displayname", "") + display_content = refermsg.get("content", "") + display_createtime = refermsg.get("createtime", "") + display_createtime = timestamp2str( + int(display_createtime)) if display_createtime.isdigit() else display_createtime + content["msg"] = f"{title}\n\n[引用]({display_createtime}){displayname}:{display_content}" + content["src"] = "" + + elif type_id == (49, 2000): # 转账消息 + CompressContent = self.decompress_CompressContent(CompressContent) + content_tmp = xml2dict(CompressContent) + feedesc = content_tmp.get("appmsg", {}).get("wcpayinfo", {}).get("feedesc", "") + content["msg"] = f"转账:{feedesc}" + content["src"] = "" + + elif type_id[0] == 49 and type_id[1] != 0: + DictExtra = self.get_BytesExtra(BytesExtra) + url = match_BytesExtra(DictExtra) + content["src"] = url + content["msg"] = type_name + + elif type_id == (50, 0): # 语音通话 + content["msg"] = "语音/视频通话[%s]" % DisplayContent + + # elif type_id == (10000, 0): + # content["msg"] = StrContent + # elif type_id == (10000, 4): + # content["msg"] = StrContent + # elif type_id == (10000, 8000): + # content["msg"] = StrContent + + talker = "未知" + if IsSender == 1: + talker = "我" + else: + if StrTalker.endswith("@chatroom"): + bytes_extra = self.get_BytesExtra(BytesExtra) + if bytes_extra: + try: + talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore') + if "publisher-id" in talker: + talker = "系统" + except: + pass + else: + talker = StrTalker + + row_data = {"MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker, + "room_name": StrTalker, "content": content, "CreateTime": CreateTime, "id": id} + return row_data + + def msg_list(self, wxid="", start_index=0, page_size=500): + if wxid: + sql = ( + "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " + "FROM MSG WHERE StrTalker=? " + "ORDER BY CreateTime ASC LIMIT ?,?") + result1 = self.execute_sql(sql, (wxid, start_index, page_size)) + else: + sql = ( + "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " + "FROM MSG ORDER BY CreateTime ASC LIMIT ?,?") + result1 = self.execute_sql(sql, (start_index, page_size)) + + # df = pd.DataFrame(result1, columns=[ + # 'localId', 'IsSender', 'StrContent', 'StrTalker', 'Sequence', 'Type', 'SubType', 'CreateTime', 'MsgSvrID', + # 'DisplayContent', 'CompressContent', 'BytesExtra', 'id' + # ]) + # df['msg_detail'] = df.apply(lambda row: self.msg_detail(row), axis=1) + # return df['msg_detail'].tolist() + + data = [] + for row in result1: + data.append(self.msg_detail(row)) + return data + + # return rdata + + diff --git a/pywxdump/dbpreprocess/parsingMediaMSG.py b/pywxdump/dbpreprocess/parsingMediaMSG.py new file mode 100644 index 0000000..70ce116 --- /dev/null +++ b/pywxdump/dbpreprocess/parsingMediaMSG.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: MediaMSG_parsing.py +# Description: +# Author: xaoyaoo +# Date: 2024/04/15 +# ------------------------------------------------------------------------------- +from .dbbase import DatabaseBase +from .utils import silk2audio + + +class ParsingMediaMSG(DatabaseBase): + def __init__(self, db_path): + super().__init__(db_path) + + def get_audio(self, MsgSvrID, is_play=False, is_wave=False, save_path=None, rate=24000): + sql = "select Buf from Media where Reserved0={}".format(MsgSvrID) + DBdata = self.execute_sql(sql) + + if len(DBdata) == 0: + return False + data = DBdata[0][0] # [1:] + b'\xFF\xFF' + try: + pcm_data = silk2audio(buf_data=data, is_play=is_play, is_wave=is_wave, save_path=save_path, rate=rate) + return pcm_data + except Exception as e: + return False diff --git a/pywxdump/dbpreprocess/parsingMicroMsg.py b/pywxdump/dbpreprocess/parsingMicroMsg.py new file mode 100644 index 0000000..14b613a --- /dev/null +++ b/pywxdump/dbpreprocess/parsingMicroMsg.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: parsingMicroMsg.py +# Description: +# Author: xaoyaoo +# Date: 2024/04/15 +# ------------------------------------------------------------------------------- +from .dbbase import DatabaseBase + + +class ParsingMicroMsg(DatabaseBase): + def __init__(self, db_path): + super().__init__(db_path) + + def wxid2userinfo(self, wx_id): + """ + 获取单个联系人信息 + :param wx_id: 微信id + :return: 联系人信息 + """ + # 获取username是wx_id的用户 + sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl " + "FROM Contact A,ContactHeadImgUrl B " + f"WHERE A.UserName = '{wx_id}' AND A.UserName = B.usrName " + "ORDER BY NickName ASC;") + result = self.execute_sql(sql) + if not result: + return None + result = result[0] + return {"wxid": result[0], "nickname": result[1], "remark": result[2], "account": result[3], + "describe": result[4], "headImgUrl": result[5]} + + def user_list(self): + """ + 获取联系人列表 + :param MicroMsg_db_path: MicroMsg.db 文件路径 + :return: 联系人列表 + """ + users = [] + sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl " + "FROM Contact A,ContactHeadImgUrl B " + "where UserName==usrName " + "ORDER BY NickName ASC;") + + result = self.execute_sql(sql) + for row in result: + # 获取用户名、昵称、备注和聊天记录数量 + username, nickname, remark, Alias, describe, headImgUrl = row + users.append( + {"wxid": username, "nickname": nickname, "remark": remark, "account": Alias, + "describe": describe, "headImgUrl": headImgUrl}) + return users + + def chatroom_list(self): + """ + 获取群聊列表 + :param MicroMsg_db_path: MicroMsg.db 文件路径 + :return: 群聊列表 + """ + rooms = [] + # 连接 MicroMsg.db 数据库,并执行查询 + sql = ("SELECT A.ChatRoomName,A.UserNameList, A.DisplayNameList, B.Announcement,B.AnnouncementEditor " + "FROM ChatRoom A,ChatRoomInfo B " + "where A.ChatRoomName==B.ChatRoomName " + "ORDER BY A.ChatRoomName ASC;") + result = self.execute_sql(sql) + for row in result: + # 获取用户名、昵称、备注和聊天记录数量 + ChatRoomName, UserNameList, DisplayNameList, Announcement, AnnouncementEditor = row + UserNameList = UserNameList.split("^G") + DisplayNameList = DisplayNameList.split("^G") + rooms.append( + {"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList, + "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor}) + return rooms diff --git a/pywxdump/dbpreprocess/parsingOpenIMContact.py b/pywxdump/dbpreprocess/parsingOpenIMContact.py new file mode 100644 index 0000000..90f7872 --- /dev/null +++ b/pywxdump/dbpreprocess/parsingOpenIMContact.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: parsingOpenIMContact.py +# Description: +# Author: xaoyaoo +# Date: 2024/04/16 +# ------------------------------------------------------------------------------- +from .dbbase import DatabaseBase + + +class ParsingOpenIMContact(DatabaseBase): + def __init__(self, db_path): + super().__init__(db_path) + + def user_list(self): + """ + 获取联系人列表 + :param MicroMsg_db_path: MicroMsg.db 文件路径 + :return: 联系人列表 + """ + users = [] + sql = ("SELECT A.UserName, A.NickName, A.Remark,A.BigHeadImgUrl FROM OpenIMContact A " + "ORDER BY NickName ASC;") + result = self.execute_sql(sql) + for row in result: + # 获取用户名、昵称、备注和聊天记录数量 + username, nickname, remark, headImgUrl = row + users.append( + {"wxid": username, "nickname": nickname, "remark": remark, "account": "", "describe": "", + "headImgUrl": headImgUrl}) + return users diff --git a/pywxdump/dbpreprocess/utils.py b/pywxdump/dbpreprocess/utils.py new file mode 100644 index 0000000..6dc142a --- /dev/null +++ b/pywxdump/dbpreprocess/utils.py @@ -0,0 +1,349 @@ +# -*- coding: utf-8 -*-# +# ------------------------------------------------------------------------------- +# Name: utils.py +# Description: +# Author: xaoyaoo +# Date: 2024/04/15 +# ------------------------------------------------------------------------------- +import hashlib +import re +import time +import wave + +import requests +from io import BytesIO +import pysilk +import lxml.etree as ET # 这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败 + + +def typeid2name(type_id: tuple): + """ + 获取消息类型名称 + :param type_id: 消息类型ID 元组 eg: (1, 0) + :return: + """ + type_name_dict = { + (1, 0): "文本", + (3, 0): "图片", + (34, 0): "语音", + (43, 0): "视频", + (47, 0): "动画表情", + + (37, 0): "添加好友", # 感谢 https://github.com/zhyc9de + (42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de + (48, 0): "地图信息", # 感谢 https://github.com/zhyc9de + (49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de + (49, 53): "接龙", # 感谢 https://github.com/zhyc9de + + (49, 0): "文件", + (49, 1): "类似文字消息而不一样的消息", + (49, 5): "卡片式链接", + (49, 6): "文件", + (49, 8): "用户上传的GIF表情", + (49, 19): "合并转发的聊天记录", + (49, 33): "分享的小程序", + (49, 36): "分享的小程序", + (49, 57): "带有引用的文本消息", + (49, 63): "视频号直播或直播回放等", + (49, 87): "群公告", + (49, 88): "视频号直播或直播回放等", + (49, 2000): "转账消息", + (49, 2003): "赠送红包封面", + + (50, 0): "语音通话", + (10000, 0): "系统通知", + (10000, 4): "拍一拍", + (10000, 8000): "系统通知" + } + + if type_id in type_name_dict: + return type_name_dict[type_id] + else: + return "未知" + + +def name2typeid(type_name: str): + """ + 获取消息类型名称 + :param type_id: 消息类型ID 元组 eg: (1, 0) + :return: + """ + type_name_dict = { + (1, 0): "文本", + (3, 0): "图片", + (34, 0): "语音", + (43, 0): "视频", + (47, 0): "动画表情", + + (37, 0): "添加好友", # 感谢 https://github.com/zhyc9de + (42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de + (48, 0): "地图信息", # 感谢 https://github.com/zhyc9de + (49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de + (49, 53): "接龙", # 感谢 https://github.com/zhyc9de + + (49, 0): "文件", + (49, 1): "类似文字消息而不一样的消息", + (49, 5): "卡片式链接", + (49, 6): "文件", + (49, 8): "用户上传的GIF表情", + (49, 19): "合并转发的聊天记录", + (49, 33): "分享的小程序", + (49, 36): "分享的小程序", + (49, 57): "带有引用的文本消息", + (49, 63): "视频号直播或直播回放等", + (49, 87): "群公告", + (49, 88): "视频号直播或直播回放等", + (49, 2000): "转账消息", + (49, 2003): "赠送红包封面", + + (50, 0): "语音通话", + (10000, 0): "系统通知", + (10000, 4): "拍一拍", + (10000, 8000): "系统通知" + } + type_tup = [] + for k, v in type_name_dict.items(): + if v == type_name: + type_tup.append(k) + return type_tup + + +def get_md5(data): + md5 = hashlib.md5() + md5.update(data) + return md5.hexdigest() + + +def timestamp2str(timestamp): + """ + 时间戳转换为时间字符串 + :param timestamp: 时间戳 + :return: 时间字符串 + """ + return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + + +def dat2img(input_data): + """ + 读取图片文件dat格式 + :param input_data: 图片文件路径或者图片文件数据 + :return: 图片格式,图片md5,图片数据 + """ + # 常见图片格式的文件头 + img_head = { + b"\xFF\xD8\xFF": ".jpg", + b"\x89\x50\x4E\x47": ".png", + b"\x47\x49\x46\x38": ".gif", + b"\x42\x4D": ".BMP", + b"\x49\x49": ".TIFF", + b"\x4D\x4D": ".TIFF", + b"\x00\x00\x01\x00": ".ICO", + b"\x52\x49\x46\x46": ".WebP", + b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC", + } + + if isinstance(input_data, str): + with open(input_data, "rb") as f: + input_bytes = f.read() + else: + input_bytes = input_data + + try: + import numpy as np + input_bytes = np.frombuffer(input_bytes, dtype=np.uint8) + for hcode in img_head: # 遍历文件头 + t = input_bytes[0] ^ hcode[0] # 异或解密 + if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8), + np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换 + fomt = img_head[hcode] # 获取文件格式 + + out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作 + md5 = get_md5(out_bytes) + return fomt, md5, out_bytes + return False + except ImportError: + pass + + for hcode in img_head: + t = input_bytes[0] ^ hcode[0] + for i in range(1, len(hcode)): + if t == input_bytes[i] ^ hcode[i]: + fomt = img_head[hcode] + out_bytes = bytearray() + for nowByte in input_bytes: # 读取文件 + newByte = nowByte ^ t # 异或解密 + out_bytes.append(newByte) + md5 = get_md5(out_bytes) + return fomt, md5, out_bytes + return False + + +def xml2dict(xml_string): + """ + 解析 XML 字符串 + :param xml_string: 要解析的 XML 字符串 + :return: 解析结果,以字典形式返回 + """ + + def parse_xml(element): + """ + 递归解析 XML 元素 + :param element: 要解析的 XML 元素 + :return: 解析结果,以字典形式返回 + """ + result = {} + # 解析当前元素的属性 + if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下 + return result + for key, value in element.attrib.items(): + result[key] = value + # 解析当前元素的子元素 + for child in element: + child_result = parse_xml(child) + # 如果子元素的标签已经在结果中存在,则将其转换为列表 + if child.tag in result: + if not isinstance(result[child.tag], list): + result[child.tag] = [result[child.tag]] + result[child.tag].append(child_result) + else: + result[child.tag] = child_result + # 如果当前元素没有子元素,则将其文本内容作为值保存 + if not result and element.text: + result = element.text + return result + + if xml_string is None or not isinstance(xml_string, str): + return None + try: + parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误 + root = ET.fromstring(xml_string, parser) + except Exception as e: + return xml_string + return parse_xml(root) + + +def download_file(url, save_path=None): + """ + 下载文件 + :param url: 文件下载地址 + :param save_path: 保存路径 + :return: 保存路径 + """ + headers = { + "User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K40 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36" + + } + r = requests.get(url, headers=headers) + if r.status_code != 200: + return None + data = r.content + if save_path and isinstance(save_path, str): + with open(save_path, "wb") as f: + f.write(data) + return data + + +def bytes2str(d): + """ + 遍历字典并将bytes转换为字符串 + :param d: + :return: + """ + for k, v in d.items(): + if isinstance(v, dict): + bytes2str(v) + elif isinstance(v, list): + for item in v: + if isinstance(item, dict): + bytes2str(item) + elif isinstance(item, bytes): + item = item.decode('utf-8') # 将bytes转换为字符串 + elif isinstance(v, bytes): + d[k] = v.decode('utf-8') + + +def read_dict_all_values(data): + """ + 读取字典中所有的值(单层) + :param dict_data: 字典 + :return: 所有值的list + """ + result = [] + if isinstance(data, list): + for item in data: + result.extend(read_dict_all_values(item)) + elif isinstance(data, dict): + for key, value in data.items(): + result.extend(read_dict_all_values(value)) + else: + if isinstance(data, bytes): + tmp = data.decode("utf-8") + else: + tmp = str(data) if isinstance(data, int) else data + result.append(tmp) + + for i in range(len(result)): + if isinstance(result[i], bytes): + result[i] = result[i].decode("utf-8") + return result + + +def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"): + """ + 匹配 BytesExtra + :param BytesExtra: BytesExtra + :param pattern: 匹配模式 + :return: + """ + if not BytesExtra: + return False + BytesExtra = read_dict_all_values(BytesExtra) + BytesExtra = "'" + "'".join(BytesExtra) + "'" + # print(BytesExtra) + + match = re.search(pattern, BytesExtra) + if match: + video_path = match.group(0).replace("'", "") + return video_path + else: + return "" + + +def silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=24000): + silk_file = BytesIO(buf_data) # 读取silk文件 + pcm_file = BytesIO() # 创建pcm文件 + + pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件 + pcm_data = pcm_file.getvalue() # 获取pcm文件数据 + + silk_file.close() # 关闭silk文件 + pcm_file.close() # 关闭pcm文件 + if is_play: # 播放音频 + def play_audio(pcm_data, rate): + try: + import pyaudio + except ImportError: + raise ImportError("请先安装pyaudio库[ pip install pyaudio ]") + + p = pyaudio.PyAudio() # 实例化pyaudio + stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象 + stream.write(pcm_data) # 写入音频流 + stream.stop_stream() # 停止音频流 + stream.close() # 关闭音频流 + p.terminate() # 关闭pyaudio + + play_audio(pcm_data, rate) + + if is_wave: # 转换为wav文件 + wave_file = BytesIO() # 创建wav文件 + with wave.open(wave_file, 'wb') as wf: + wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数 + wf.writeframes(pcm_data) # 写入wav文件 + rdata = wave_file.getvalue() # 获取wav文件数据 + wave_file.close() # 关闭wav文件 + if save_path and isinstance(save_path, str): + with open(save_path, "wb") as f: + f.write(rdata) + return rdata + + return pcm_data