UPDATE CHANGELOG.md

This commit is contained in:
xaoyaoo 2024-04-16 23:16:22 +08:00
parent 48a9efe95e
commit dc7cdab520
8 changed files with 864 additions and 1 deletions

View File

@ -1,7 +1,18 @@
## v2.4.63.(待发布)
## v2.4.71.(待发布)
- 修改数据库匹配规则
- 增加3.9.10.19支持
- fix 图片优先显示清晰版本
- UPDATE README.md
- 读取ExtraBuf联系人表
- fix 部分情况下视频不能正常显示
- MSG数量超过10个无法获取最新数据的bug
## v2.4.70
- 增加对引用消息的解析
- Update README.md
- UPDATE CHANGELOG.md
- 读取群聊数据,主要为 wxid以及对应昵称
## v2.4.62

View File

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: __init__.py.py
# Description:
# Author: xaoyaoo
# Date: 2024/04/15
# -------------------------------------------------------------------------------
from .parsingMSG import ParsingMSG
from .parsingMicroMsg import ParsingMicroMsg
from .parsingMediaMSG import ParsingMediaMSG

View File

@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: dbbase.py
# Description:
# Author: xaoyaoo
# Date: 2024/04/15
# -------------------------------------------------------------------------------
import os
import sqlite3
import logging
class DatabaseBase:
_singleton_instances = {} # 使用字典存储不同db_path对应的单例实例
def __new__(cls, db_path):
if db_path not in cls._singleton_instances:
cls._singleton_instances[db_path] = super().__new__(cls)
return cls._singleton_instances[db_path]
def __init__(self, db_path):
self._db_path = db_path
self._db_connection = self._connect_to_database(db_path)
@classmethod
def _connect_to_database(cls, db_path):
if not os.path.exists(db_path):
raise FileNotFoundError(f"文件不存在: {db_path}")
connection = sqlite3.connect(db_path, check_same_thread=False)
logging.info(f"{connection} 连接句柄创建 {db_path}")
return connection
def execute_sql(self, sql, params=None):
# 检测数据库连接是否关闭
if not self._db_connection:
logging.warning(f"重新连接数据库 - {self._db_path}")
self._connect_to_database(self._db_path)
connection = self._db_connection
try:
# connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.fetchall()
except Exception as e1:
try:
connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
rdata = cursor.fetchall()
connection.text_factory = str
return rdata
except Exception as e2:
logging.error(f"**********\nSQL: {sql}\nparams: {params}\n{e1}\n{e2}\n**********")
return None
def close_connection(self):
if self._db_connection:
self._db_connection.close()
logging.info(f"关闭数据库 - {self._db_path}")
self._db_connection = None
def show__singleton_instances(self):
print(self._singleton_instances)
def __del__(self):
self.close_connection()
del self._singleton_instances[self._db_path]
if __name__ == '__main__':
a = DatabaseBase("test.db")
b = DatabaseBase("test1.db")
d1 = a.execute_sql("select * from sqlite_master;")
d2 = b.execute_sql("select * from sqlite_master;")
print([i[1] for i in d1])
print([i[1] for i in d2])
a.close_connection()
b.close_connection()

View File

@ -0,0 +1,272 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: parsingMSG.py
# Description:
# Author: xaoyaoo
# Date: 2024/04/15
# -------------------------------------------------------------------------------
import os
import re
import pandas as pd
from .dbbase import DatabaseBase
from .utils import get_md5, name2typeid, typeid2name, timestamp2str, xml2dict, match_BytesExtra
import lz4.block
import blackboxprotobuf
class ParsingMSG(DatabaseBase):
def __init__(self, db_path):
super().__init__(db_path)
def decompress_CompressContent(self, data):
"""
解压缩MsgCompressContent内容
:param data: CompressContent内容 bytes
:return:
"""
if data is None or not isinstance(data, bytes):
return None
try:
dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8)
dst = dst.replace(b'\x00', b'') # 已经解码完成后还含有0x00的部分要删掉要不后面ET识别的时候会报错
uncompressed_data = dst.decode('utf-8', errors='ignore')
return uncompressed_data
except Exception as e:
return data.decode('utf-8', errors='ignore')
def get_BytesExtra(self, BytesExtra):
if BytesExtra is None or not isinstance(BytesExtra, bytes):
return None
try:
deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra)
return deserialize_data
except Exception as e:
return None
def chat_count(self, wxid: str = ""):
"""
获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量不传wxid则获取所有联系人的聊天记录数量
:param MSG_db_path: MSG.db 文件路径
:return: 聊天记录数量列表
"""
if wxid:
sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{wxid}';"
else:
sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
result = self.execute_sql(sql)
df = pd.DataFrame(result, columns=["wxid", "chat_count"])
# chat_counts {wxid: chat_count}
chat_counts = df.set_index("wxid").to_dict()["chat_count"]
return chat_counts
def chat_count_total(self):
"""
获取聊天记录总数
:return: 聊天记录总数
"""
sql = "SELECT COUNT(*) FROM MSG;"
result = self.execute_sql(sql)
if result and len(result) > 0:
chat_counts = result[0][0]
return chat_counts
return 0
# def room_user_list(self, selected_talker):
# """
# 获取群聊中包含的所有用户列表
# :param MSG_db_path: MSG.db 文件路径
# :param selected_talker: 选中的聊天对象 wxid
# :return: 聊天用户列表
# """
# sql = (
# "SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
# "FROM MSG WHERE StrTalker=? "
# "ORDER BY CreateTime ASC")
#
# result1 = self.execute_sql(sql, (selected_talker,))
# user_list = []
# read_user_wx_id = []
# for row in result1:
# localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row
# bytes_extra = self.get_BytesExtra(BytesExtra)
# if bytes_extra:
# try:
# talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
# except:
# continue
# if talker in read_user_wx_id:
# continue
# user = get_contact(MSG_db_path, talker)
# if not user:
# continue
# user_list.append(user)
# read_user_wx_id.append(talker)
# return user_list
# 单条消息处理
def msg_detail(self, row):
"""
获取单条消息详情,格式化输出
"""
localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType, CreateTime, MsgSvrID, DisplayContent, CompressContent, BytesExtra, id = row
CreateTime = timestamp2str(CreateTime)
type_id = (Type, SubType)
type_name = typeid2name(type_id)
content = {"src": "", "msg": StrContent}
if type_id == (1, 0): # 文本
content["msg"] = StrContent
elif type_id == (3, 0): # 图片
DictExtra = self.get_BytesExtra(BytesExtra)
DictExtra_str = str(DictExtra)
img_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True)
if img_paths:
img_path = img_paths[0].replace("'", "")
img_path = [i for i in img_path.split("\\") if i]
img_path = os.path.join(*img_path)
content["src"] = img_path
else:
content["src"] = ""
content["msg"] = "图片"
elif type_id == (34, 0): # 语音
tmp_c = xml2dict(StrContent)
voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
if voicelength.isdigit():
voicelength = int(voicelength) / 1000
voicelength = f"{voicelength:.2f}"
content[
"msg"] = f"语音时长:{voicelength}\n翻译结果:{transtext}" if transtext else f"语音时长:{voicelength}"
content["src"] = os.path.join("audio", f"{StrTalker}",
f"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav")
elif type_id == (43, 0): # 视频
DictExtra = self.get_BytesExtra(BytesExtra)
DictExtra = str(DictExtra)
DictExtra_str = str(DictExtra)
video_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True)
if video_paths:
video_path = video_paths[0].replace("'", "")
video_path = [i for i in video_path.split("\\") if i]
video_path = os.path.join(*video_path)
content["src"] = video_path
else:
content["src"] = ""
content["msg"] = "视频"
elif type_id == (47, 0): # 动画表情
content_tmp = xml2dict(StrContent)
cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
if cdnurl:
content = {"src": cdnurl, "msg": "表情"}
elif type_id == (49, 0):
DictExtra = self.get_BytesExtra(BytesExtra)
url = match_BytesExtra(DictExtra)
content["src"] = url
file_name = os.path.basename(url)
content["msg"] = file_name
elif type_id == (49, 19): # 合并转发的聊天记录
CompressContent = self.decompress_CompressContent(CompressContent)
content_tmp = xml2dict(CompressContent)
title = content_tmp.get("appmsg", {}).get("title", "")
des = content_tmp.get("appmsg", {}).get("des", "")
recorditem = content_tmp.get("appmsg", {}).get("recorditem", "")
recorditem = xml2dict(recorditem)
content["msg"] = f"{title}\n{des}"
content["src"] = recorditem
elif type_id == (49, 57): # 带有引用的文本消息
CompressContent = self.decompress_CompressContent(CompressContent)
content_tmp = xml2dict(CompressContent)
appmsg = content_tmp.get("appmsg", {})
title = appmsg.get("title", "")
refermsg = appmsg.get("refermsg", {})
displayname = refermsg.get("displayname", "")
display_content = refermsg.get("content", "")
display_createtime = refermsg.get("createtime", "")
display_createtime = timestamp2str(
int(display_createtime)) if display_createtime.isdigit() else display_createtime
content["msg"] = f"{title}\n\n[引用]({display_createtime}){displayname}:{display_content}"
content["src"] = ""
elif type_id == (49, 2000): # 转账消息
CompressContent = self.decompress_CompressContent(CompressContent)
content_tmp = xml2dict(CompressContent)
feedesc = content_tmp.get("appmsg", {}).get("wcpayinfo", {}).get("feedesc", "")
content["msg"] = f"转账:{feedesc}"
content["src"] = ""
elif type_id[0] == 49 and type_id[1] != 0:
DictExtra = self.get_BytesExtra(BytesExtra)
url = match_BytesExtra(DictExtra)
content["src"] = url
content["msg"] = type_name
elif type_id == (50, 0): # 语音通话
content["msg"] = "语音/视频通话[%s]" % DisplayContent
# elif type_id == (10000, 0):
# content["msg"] = StrContent
# elif type_id == (10000, 4):
# content["msg"] = StrContent
# elif type_id == (10000, 8000):
# content["msg"] = StrContent
talker = "未知"
if IsSender == 1:
talker = ""
else:
if StrTalker.endswith("@chatroom"):
bytes_extra = self.get_BytesExtra(BytesExtra)
if bytes_extra:
try:
talker = bytes_extra['3'][0]['2'].decode('utf-8', errors='ignore')
if "publisher-id" in talker:
talker = "系统"
except:
pass
else:
talker = StrTalker
row_data = {"MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, "talker": talker,
"room_name": StrTalker, "content": content, "CreateTime": CreateTime, "id": id}
return row_data
def msg_list(self, wxid="", start_index=0, page_size=500):
if wxid:
sql = (
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM MSG WHERE StrTalker=? "
"ORDER BY CreateTime ASC LIMIT ?,?")
result1 = self.execute_sql(sql, (wxid, start_index, page_size))
else:
sql = (
"SELECT localId, IsSender, StrContent, StrTalker, Sequence, Type, SubType,CreateTime,MsgSvrID,DisplayContent,CompressContent,BytesExtra,ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
"FROM MSG ORDER BY CreateTime ASC LIMIT ?,?")
result1 = self.execute_sql(sql, (start_index, page_size))
# df = pd.DataFrame(result1, columns=[
# 'localId', 'IsSender', 'StrContent', 'StrTalker', 'Sequence', 'Type', 'SubType', 'CreateTime', 'MsgSvrID',
# 'DisplayContent', 'CompressContent', 'BytesExtra', 'id'
# ])
# df['msg_detail'] = df.apply(lambda row: self.msg_detail(row), axis=1)
# return df['msg_detail'].tolist()
data = []
for row in result1:
data.append(self.msg_detail(row))
return data
# return rdata

View File

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: MediaMSG_parsing.py
# Description:
# Author: xaoyaoo
# Date: 2024/04/15
# -------------------------------------------------------------------------------
from .dbbase import DatabaseBase
from .utils import silk2audio
class ParsingMediaMSG(DatabaseBase):
def __init__(self, db_path):
super().__init__(db_path)
def get_audio(self, MsgSvrID, is_play=False, is_wave=False, save_path=None, rate=24000):
sql = "select Buf from Media where Reserved0={}".format(MsgSvrID)
DBdata = self.execute_sql(sql)
if len(DBdata) == 0:
return False
data = DBdata[0][0] # [1:] + b'\xFF\xFF'
try:
pcm_data = silk2audio(buf_data=data, is_play=is_play, is_wave=is_wave, save_path=save_path, rate=rate)
return pcm_data
except Exception as e:
return False

View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: parsingMicroMsg.py
# Description:
# Author: xaoyaoo
# Date: 2024/04/15
# -------------------------------------------------------------------------------
from .dbbase import DatabaseBase
class ParsingMicroMsg(DatabaseBase):
def __init__(self, db_path):
super().__init__(db_path)
def wxid2userinfo(self, wx_id):
"""
获取单个联系人信息
:param wx_id: 微信id
:return: 联系人信息
"""
# 获取username是wx_id的用户
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
"FROM Contact A,ContactHeadImgUrl B "
f"WHERE A.UserName = '{wx_id}' AND A.UserName = B.usrName "
"ORDER BY NickName ASC;")
result = self.execute_sql(sql)
if not result:
return None
result = result[0]
return {"wxid": result[0], "nickname": result[1], "remark": result[2], "account": result[3],
"describe": result[4], "headImgUrl": result[5]}
def user_list(self):
"""
获取联系人列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 联系人列表
"""
users = []
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
"FROM Contact A,ContactHeadImgUrl B "
"where UserName==usrName "
"ORDER BY NickName ASC;")
result = self.execute_sql(sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
username, nickname, remark, Alias, describe, headImgUrl = row
users.append(
{"wxid": username, "nickname": nickname, "remark": remark, "account": Alias,
"describe": describe, "headImgUrl": headImgUrl})
return users
def chatroom_list(self):
"""
获取群聊列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 群聊列表
"""
rooms = []
# 连接 MicroMsg.db 数据库,并执行查询
sql = ("SELECT A.ChatRoomName,A.UserNameList, A.DisplayNameList, B.Announcement,B.AnnouncementEditor "
"FROM ChatRoom A,ChatRoomInfo B "
"where A.ChatRoomName==B.ChatRoomName "
"ORDER BY A.ChatRoomName ASC;")
result = self.execute_sql(sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
ChatRoomName, UserNameList, DisplayNameList, Announcement, AnnouncementEditor = row
UserNameList = UserNameList.split("^G")
DisplayNameList = DisplayNameList.split("^G")
rooms.append(
{"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList,
"Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor})
return rooms

View File

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: parsingOpenIMContact.py
# Description:
# Author: xaoyaoo
# Date: 2024/04/16
# -------------------------------------------------------------------------------
from .dbbase import DatabaseBase
class ParsingOpenIMContact(DatabaseBase):
def __init__(self, db_path):
super().__init__(db_path)
def user_list(self):
"""
获取联系人列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 联系人列表
"""
users = []
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.BigHeadImgUrl FROM OpenIMContact A "
"ORDER BY NickName ASC;")
result = self.execute_sql(sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
username, nickname, remark, headImgUrl = row
users.append(
{"wxid": username, "nickname": nickname, "remark": remark, "account": "", "describe": "",
"headImgUrl": headImgUrl})
return users

View File

@ -0,0 +1,349 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: utils.py
# Description:
# Author: xaoyaoo
# Date: 2024/04/15
# -------------------------------------------------------------------------------
import hashlib
import re
import time
import wave
import requests
from io import BytesIO
import pysilk
import lxml.etree as ET # 这个模块更健壮些微信XML格式有时有非标格式会导致xml.etree.ElementTree处理失败
def typeid2name(type_id: tuple):
"""
获取消息类型名称
:param type_id: 消息类型ID 元组 eg: (1, 0)
:return:
"""
type_name_dict = {
(1, 0): "文本",
(3, 0): "图片",
(34, 0): "语音",
(43, 0): "视频",
(47, 0): "动画表情",
(37, 0): "添加好友", # 感谢 https://github.com/zhyc9de
(42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de
(48, 0): "地图信息", # 感谢 https://github.com/zhyc9de
(49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de
(49, 53): "接龙", # 感谢 https://github.com/zhyc9de
(49, 0): "文件",
(49, 1): "类似文字消息而不一样的消息",
(49, 5): "卡片式链接",
(49, 6): "文件",
(49, 8): "用户上传的GIF表情",
(49, 19): "合并转发的聊天记录",
(49, 33): "分享的小程序",
(49, 36): "分享的小程序",
(49, 57): "带有引用的文本消息",
(49, 63): "视频号直播或直播回放等",
(49, 87): "群公告",
(49, 88): "视频号直播或直播回放等",
(49, 2000): "转账消息",
(49, 2003): "赠送红包封面",
(50, 0): "语音通话",
(10000, 0): "系统通知",
(10000, 4): "拍一拍",
(10000, 8000): "系统通知"
}
if type_id in type_name_dict:
return type_name_dict[type_id]
else:
return "未知"
def name2typeid(type_name: str):
"""
获取消息类型名称
:param type_id: 消息类型ID 元组 eg: (1, 0)
:return:
"""
type_name_dict = {
(1, 0): "文本",
(3, 0): "图片",
(34, 0): "语音",
(43, 0): "视频",
(47, 0): "动画表情",
(37, 0): "添加好友", # 感谢 https://github.com/zhyc9de
(42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de
(48, 0): "地图信息", # 感谢 https://github.com/zhyc9de
(49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de
(49, 53): "接龙", # 感谢 https://github.com/zhyc9de
(49, 0): "文件",
(49, 1): "类似文字消息而不一样的消息",
(49, 5): "卡片式链接",
(49, 6): "文件",
(49, 8): "用户上传的GIF表情",
(49, 19): "合并转发的聊天记录",
(49, 33): "分享的小程序",
(49, 36): "分享的小程序",
(49, 57): "带有引用的文本消息",
(49, 63): "视频号直播或直播回放等",
(49, 87): "群公告",
(49, 88): "视频号直播或直播回放等",
(49, 2000): "转账消息",
(49, 2003): "赠送红包封面",
(50, 0): "语音通话",
(10000, 0): "系统通知",
(10000, 4): "拍一拍",
(10000, 8000): "系统通知"
}
type_tup = []
for k, v in type_name_dict.items():
if v == type_name:
type_tup.append(k)
return type_tup
def get_md5(data):
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
def timestamp2str(timestamp):
"""
时间戳转换为时间字符串
:param timestamp: 时间戳
:return: 时间字符串
"""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
def dat2img(input_data):
"""
读取图片文件dat格式
:param input_data: 图片文件路径或者图片文件数据
:return: 图片格式图片md5图片数据
"""
# 常见图片格式的文件头
img_head = {
b"\xFF\xD8\xFF": ".jpg",
b"\x89\x50\x4E\x47": ".png",
b"\x47\x49\x46\x38": ".gif",
b"\x42\x4D": ".BMP",
b"\x49\x49": ".TIFF",
b"\x4D\x4D": ".TIFF",
b"\x00\x00\x01\x00": ".ICO",
b"\x52\x49\x46\x46": ".WebP",
b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC",
}
if isinstance(input_data, str):
with open(input_data, "rb") as f:
input_bytes = f.read()
else:
input_bytes = input_data
try:
import numpy as np
input_bytes = np.frombuffer(input_bytes, dtype=np.uint8)
for hcode in img_head: # 遍历文件头
t = input_bytes[0] ^ hcode[0] # 异或解密
if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8),
np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作并进行类型转换
fomt = img_head[hcode] # 获取文件格式
out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作
md5 = get_md5(out_bytes)
return fomt, md5, out_bytes
return False
except ImportError:
pass
for hcode in img_head:
t = input_bytes[0] ^ hcode[0]
for i in range(1, len(hcode)):
if t == input_bytes[i] ^ hcode[i]:
fomt = img_head[hcode]
out_bytes = bytearray()
for nowByte in input_bytes: # 读取文件
newByte = nowByte ^ t # 异或解密
out_bytes.append(newByte)
md5 = get_md5(out_bytes)
return fomt, md5, out_bytes
return False
def xml2dict(xml_string):
"""
解析 XML 字符串
:param xml_string: 要解析的 XML 字符串
:return: 解析结果以字典形式返回
"""
def parse_xml(element):
"""
递归解析 XML 元素
:param element: 要解析的 XML 元素
:return: 解析结果以字典形式返回
"""
result = {}
# 解析当前元素的属性
if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下
return result
for key, value in element.attrib.items():
result[key] = value
# 解析当前元素的子元素
for child in element:
child_result = parse_xml(child)
# 如果子元素的标签已经在结果中存在,则将其转换为列表
if child.tag in result:
if not isinstance(result[child.tag], list):
result[child.tag] = [result[child.tag]]
result[child.tag].append(child_result)
else:
result[child.tag] = child_result
# 如果当前元素没有子元素,则将其文本内容作为值保存
if not result and element.text:
result = element.text
return result
if xml_string is None or not isinstance(xml_string, str):
return None
try:
parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面会冒出来xml格式不对的情况这里把parser设置成忽略错误
root = ET.fromstring(xml_string, parser)
except Exception as e:
return xml_string
return parse_xml(root)
def download_file(url, save_path=None):
"""
下载文件
:param url: 文件下载地址
:param save_path: 保存路径
:return: 保存路径
"""
headers = {
"User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K40 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36"
}
r = requests.get(url, headers=headers)
if r.status_code != 200:
return None
data = r.content
if save_path and isinstance(save_path, str):
with open(save_path, "wb") as f:
f.write(data)
return data
def bytes2str(d):
"""
遍历字典并将bytes转换为字符串
:param d:
:return:
"""
for k, v in d.items():
if isinstance(v, dict):
bytes2str(v)
elif isinstance(v, list):
for item in v:
if isinstance(item, dict):
bytes2str(item)
elif isinstance(item, bytes):
item = item.decode('utf-8') # 将bytes转换为字符串
elif isinstance(v, bytes):
d[k] = v.decode('utf-8')
def read_dict_all_values(data):
"""
读取字典中所有的值单层
:param dict_data: 字典
:return: 所有值的list
"""
result = []
if isinstance(data, list):
for item in data:
result.extend(read_dict_all_values(item))
elif isinstance(data, dict):
for key, value in data.items():
result.extend(read_dict_all_values(value))
else:
if isinstance(data, bytes):
tmp = data.decode("utf-8")
else:
tmp = str(data) if isinstance(data, int) else data
result.append(tmp)
for i in range(len(result)):
if isinstance(result[i], bytes):
result[i] = result[i].decode("utf-8")
return result
def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"):
"""
匹配 BytesExtra
:param BytesExtra: BytesExtra
:param pattern: 匹配模式
:return:
"""
if not BytesExtra:
return False
BytesExtra = read_dict_all_values(BytesExtra)
BytesExtra = "'" + "'".join(BytesExtra) + "'"
# print(BytesExtra)
match = re.search(pattern, BytesExtra)
if match:
video_path = match.group(0).replace("'", "")
return video_path
else:
return ""
def silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=24000):
silk_file = BytesIO(buf_data) # 读取silk文件
pcm_file = BytesIO() # 创建pcm文件
pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件
pcm_data = pcm_file.getvalue() # 获取pcm文件数据
silk_file.close() # 关闭silk文件
pcm_file.close() # 关闭pcm文件
if is_play: # 播放音频
def play_audio(pcm_data, rate):
try:
import pyaudio
except ImportError:
raise ImportError("请先安装pyaudio库[ pip install pyaudio ]")
p = pyaudio.PyAudio() # 实例化pyaudio
stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象
stream.write(pcm_data) # 写入音频流
stream.stop_stream() # 停止音频流
stream.close() # 关闭音频流
p.terminate() # 关闭pyaudio
play_audio(pcm_data, rate)
if is_wave: # 转换为wav文件
wave_file = BytesIO() # 创建wav文件
with wave.open(wave_file, 'wb') as wf:
wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数
wf.writeframes(pcm_data) # 写入wav文件
rdata = wave_file.getvalue() # 获取wav文件数据
wave_file.close() # 关闭wav文件
if save_path and isinstance(save_path, str):
with open(save_path, "wb") as f:
f.write(rdata)
return rdata
return pcm_data