fix decompress_CompressContent func parameter
This commit is contained in:
parent
11860acbac
commit
03e1d93782
@ -13,9 +13,12 @@ import wave
|
||||
import pyaudio
|
||||
import requests
|
||||
import hashlib
|
||||
import lz4.block
|
||||
import blackboxprotobuf
|
||||
|
||||
from PIL import Image
|
||||
import xml.etree.ElementTree as ET
|
||||
#import xml.etree.ElementTree as ET
|
||||
import lxml.etree as ET #这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败
|
||||
|
||||
|
||||
def get_md5(data):
|
||||
@ -40,6 +43,8 @@ def parse_xml_string(xml_string):
|
||||
result = {}
|
||||
|
||||
# 解析当前元素的属性
|
||||
if element is None or element.attrib is None: #有时可能会遇到没有属性,要处理下
|
||||
return result
|
||||
for key, value in element.attrib.items():
|
||||
result[key] = value
|
||||
|
||||
@ -64,7 +69,8 @@ def parse_xml_string(xml_string):
|
||||
if xml_string is None or not isinstance(xml_string, str):
|
||||
return None
|
||||
try:
|
||||
root = ET.fromstring(xml_string)
|
||||
parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误
|
||||
root = ET.fromstring(xml_string,parser)
|
||||
except Exception as e:
|
||||
return xml_string
|
||||
return parse_xml(root)
|
||||
@ -147,54 +153,9 @@ def decompress_CompressContent(data):
|
||||
"""
|
||||
if data is None or not isinstance(data, bytes):
|
||||
return None
|
||||
i = 0
|
||||
uncompressed_data = []
|
||||
|
||||
while i < len(data):
|
||||
# 读取第一个字节
|
||||
byte1 = data[i]
|
||||
# 从高四位得到无匹配的明文长度Lh
|
||||
Lh = byte1 >> 4
|
||||
Li = byte1 & 0x0F # 从低四位得到匹配的数据长度Li
|
||||
if Lh == 0x0f:
|
||||
# 继续读取下一个字节L1
|
||||
i = i + 1
|
||||
L1 = data[i]
|
||||
Lh = L1 + 0x0f
|
||||
|
||||
while data[i] == 0xFF:
|
||||
# 继续读取下一个字节,并累加
|
||||
i = i + 1
|
||||
Lh += data[i]
|
||||
i += 1
|
||||
uncompressed_data.extend(data[i:i + Lh])
|
||||
i = i + Lh
|
||||
|
||||
# 读取匹配的偏移量Offset
|
||||
bias = data[i:i + 2]
|
||||
offset = int.from_bytes(bias, byteorder='little')
|
||||
i = i + 2
|
||||
|
||||
# 读取匹配的数据长度Li
|
||||
if Li != 0x0F:
|
||||
# 实际的匹配压缩长度即为Li = Li + 4
|
||||
Li += 4
|
||||
else:
|
||||
# 从偏移量后面的可选匹配长度区域读取一个字节M1
|
||||
M1 = data[i]
|
||||
Li += M1
|
||||
while M1 == 0xFF:
|
||||
# 继续读取下一个字节M2
|
||||
i += 1
|
||||
M1 = data[i]
|
||||
Li += M1
|
||||
Li += 4
|
||||
# 复制匹配的数据到解压缩数据缓冲区
|
||||
uncompressed_data.extend(uncompressed_data[-offset:-offset + Li])
|
||||
# break
|
||||
|
||||
# 转换为字符串
|
||||
uncompressed_data = bytes(uncompressed_data) # .decode('utf-8')
|
||||
dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8)
|
||||
dst.decode().replace('\x00', '') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错
|
||||
uncompressed_data = dst.encode()
|
||||
return uncompressed_data
|
||||
|
||||
|
||||
@ -285,13 +246,15 @@ def wordcloud_generator(text, out_path="", is_show=False, img_path="", font="C:\
|
||||
|
||||
|
||||
def read_BytesExtra(data):
|
||||
if data[0:2] == '0x':
|
||||
data = data[2:]
|
||||
data = bytes.fromhex(data)
|
||||
print(data)
|
||||
print('*' * 50)
|
||||
print(data.decode('utf-8', errors='ignore'))
|
||||
|
||||
if bytes_extra is None:
|
||||
return None
|
||||
deserialize_data = None
|
||||
try:
|
||||
deserialize_data, message_type = blackboxprotobuf.decode_message(bytes_extra)
|
||||
except Exception as e:
|
||||
print(f"can not decode bytes_extra:{e}")
|
||||
return None
|
||||
return deserialize_data
|
||||
|
||||
if __name__ == '__main__':
|
||||
data = ''
|
||||
|
@ -24,10 +24,15 @@ def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = No
|
||||
try:
|
||||
# 打开注册表路径
|
||||
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
|
||||
r"HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
|
||||
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
|
||||
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
|
||||
winreg.CloseKey(key) # 关闭注册表
|
||||
w_dir = documents_path
|
||||
documents_paths = os.path.split(documents_path)
|
||||
if "%" in documents_paths[0]:
|
||||
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
|
||||
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
|
||||
else:
|
||||
w_dir = documents_path
|
||||
except Exception as e:
|
||||
profile = os.path.expanduser("~")
|
||||
w_dir = os.path.join(profile, "Documents")
|
||||
|
@ -81,14 +81,20 @@ def get_info_filePath(wxid="all"):
|
||||
# 获取文档实际目录
|
||||
try:
|
||||
# 打开注册表路径
|
||||
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,r"HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
|
||||
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
|
||||
documents_path = winreg.QueryValueEx(key, "Personal")[0]# 读取文档实际目录路径
|
||||
winreg.CloseKey(key) # 关闭注册表
|
||||
w_dir = documents_path
|
||||
documents_paths = os.path.split(documents_path)
|
||||
if "%" in documents_paths[0]:
|
||||
w_dir = os.environ.get(documents_paths[0].replace("%",""))
|
||||
w_dir = os.path.join(w_dir,os.path.join(*documents_paths[1:]))
|
||||
else:
|
||||
w_dir = documents_path
|
||||
except Exception as e:
|
||||
profile = os.path.expanduser("~")
|
||||
w_dir = os.path.join(profile, "Documents")
|
||||
msg_dir = os.path.join(w_dir, "WeChat Files")
|
||||
|
||||
if wxid == "all" and os.path.exists(msg_dir):
|
||||
return msg_dir
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user