# -*- coding: utf-8 -*-# # ------------------------------------------------------------------------------- # Name: common_utils.py # Description: # Author: xaoyaoo # Date: 2024/04/15 # ------------------------------------------------------------------------------- import hashlib import os import re import time import wave import requests from io import BytesIO import pysilk import lxml.etree as ET # 这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败 from collections import defaultdict from ._loger import db_loger def db_error(func): """ 错误处理装饰器 :param func: :return: """ def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: db_loger.error(f"db_error: {e}", exc_info=True) return None return wrapper def type_converter(type_id_or_name: [str, tuple]): """ 消息类型ID与名称转换 名称(str)=>ID(tuple) ID(tuple)=>名称(str) :param type_id_or_name: 消息类型ID或名称 :return: 消息类型ID或名称 """ type_name_dict = defaultdict(lambda: "未知", { (1, 0): "文本", (3, 0): "图片", (34, 0): "语音", (37, 0): "添加好友", (42, 0): "推荐公众号", (43, 0): "视频", (47, 0): "动画表情", (48, 0): "位置", (49, 0): "文件", (49, 1): "粘贴的文本", (49, 3): "(分享)音乐", (49, 4): "(分享)卡片式链接", (49, 5): "(分享)卡片式链接", (49, 6): "文件", (49, 7): "游戏相关", (49, 8): "用户上传的GIF表情", (49, 15): "未知-49,15", (49, 17): "位置共享", (49, 19): "合并转发的聊天记录", (49, 24): "(分享)笔记", (49, 33): "(分享)小程序", (49, 36): "(分享)小程序", (49, 40): "(分享)收藏夹", (49, 44): "(分享)小说(猜)", (49, 50): "(分享)视频号名片", (49, 51): "(分享)视频号视频", (49, 53): "接龙", (49, 57): "引用回复", (49, 63): "视频号直播或直播回放", (49, 74): "文件(猜)", (49, 87): "群公告", (49, 88): "视频号直播或直播回放等", (49, 2000): "转账", (49, 2003): "赠送红包封面", (50, 0): "语音通话", (65, 0): "企业微信打招呼(猜)", (66, 0): "企业微信添加好友(猜)", (10000, 0): "系统通知", (10000, 1): "消息撤回1", (10000, 4): "拍一拍", (10000, 5): "消息撤回5", (10000, 6): "消息撤回6", (10000, 33): "消息撤回33", (10000, 36): "消息撤回36", (10000, 57): "消息撤回57", (10000, 8000): "邀请加群", (11000, 0): "未知-11000,0" }) if isinstance(type_id_or_name, tuple): return type_name_dict[type_id_or_name] elif isinstance(type_id_or_name, str): return next((k for k, v in type_name_dict.items() if v == type_id_or_name), (0, 0)) else: raise ValueError("Invalid input type") def typeid2name(type_id: tuple): """ 获取消息类型名称 :param type_id: 消息类型ID 元组 eg: (1, 0) :return: """ return type_converter(type_id) def name2typeid(type_name: str): """ 获取消息类型ID :param type_name: 消息类型名称 :return: """ return type_converter(type_name) def get_md5(data): md5 = hashlib.md5() md5.update(data) return md5.hexdigest() def timestamp2str(timestamp): """ 时间戳转换为时间字符串 :param timestamp: 时间戳 :return: 时间字符串 """ if isinstance(timestamp, str) and timestamp.isdigit(): timestamp = int(timestamp) elif isinstance(timestamp, int) or isinstance(timestamp, float): pass else: return timestamp if len(str(timestamp)) == 13: timestamp = timestamp / 1000 elif len(str(timestamp)) == 10: pass else: return timestamp return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) def dat2img(input_data): """ 读取图片文件dat格式 :param input_data: 图片文件路径或者图片文件数据 :return: 图片格式,图片md5,图片数据 """ # 常见图片格式的文件头 img_head = { b"\xFF\xD8\xFF": ".jpg", b"\x89\x50\x4E\x47": ".png", b"\x47\x49\x46\x38": ".gif", b"\x42\x4D": ".BMP", b"\x49\x49": ".TIFF", b"\x4D\x4D": ".TIFF", b"\x00\x00\x01\x00": ".ICO", b"\x52\x49\x46\x46": ".WebP", b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC", } if isinstance(input_data, str): with open(input_data, "rb") as f: input_bytes = f.read() else: input_bytes = input_data try: import numpy as np input_bytes = np.frombuffer(input_bytes, dtype=np.uint8) for hcode in img_head: # 遍历文件头 t = input_bytes[0] ^ hcode[0] # 异或解密 if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8), np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换 fomt = img_head[hcode] # 获取文件格式 out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作 md5 = get_md5(out_bytes) return True, fomt, md5, out_bytes return False, False, False, False except ImportError: pass for hcode in img_head: t = input_bytes[0] ^ hcode[0] for i in range(1, len(hcode)): if t == input_bytes[i] ^ hcode[i]: fomt = img_head[hcode] out_bytes = bytearray() for nowByte in input_bytes: # 读取文件 newByte = nowByte ^ t # 异或解密 out_bytes.append(newByte) md5 = get_md5(out_bytes) return True, fomt, md5, out_bytes return False, False, False, False def xml2dict(xml_string): """ 解析 XML 字符串 :param xml_string: 要解析的 XML 字符串 :return: 解析结果,以字典形式返回 """ def parse_xml(element): """ 递归解析 XML 元素 :param element: 要解析的 XML 元素 :return: 解析结果,以字典形式返回 """ result = {} # 解析当前元素的属性 if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下 return result for key, value in element.attrib.items(): result[key] = value # 解析当前元素的子元素 for child in element: child_result = parse_xml(child) # 如果子元素的标签已经在结果中存在,则将其转换为列表 if child.tag in result: if not isinstance(result[child.tag], list): result[child.tag] = [result[child.tag]] result[child.tag].append(child_result) else: result[child.tag] = child_result # 如果当前元素没有子元素,则将其文本内容作为值保存 if not result and element.text: result = element.text return result if xml_string is None or not isinstance(xml_string, str): return None try: parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误 root = ET.fromstring(xml_string, parser) except Exception as e: return xml_string return parse_xml(root) def download_file(url, save_path=None, proxies=None): """ 下载文件 :param url: 文件下载地址 :param save_path: 保存路径 :param proxies: requests 代理 :return: 保存路径 """ headers = { "User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K40 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36" } r = requests.get(url, headers=headers, proxies=proxies) if r.status_code != 200: return None data = r.content if save_path and isinstance(save_path, str): # 创建文件夹 if not os.path.exists(os.path.dirname(save_path)): os.makedirs(os.path.dirname(save_path)) with open(save_path, "wb") as f: f.write(data) return data def bytes2str(d): """ 遍历字典并将bytes转换为字符串 :param d: :return: """ for k, v in d.items(): if isinstance(v, dict): bytes2str(v) elif isinstance(v, list): for item in v: if isinstance(item, dict): bytes2str(item) elif isinstance(item, bytes): item = item.decode('utf-8') # 将bytes转换为字符串 elif isinstance(v, bytes): d[k] = v.decode('utf-8') def read_dict_all_values(data): """ 读取字典中所有的值(单层) :param dict_data: 字典 :return: 所有值的list """ result = [] if isinstance(data, list): for item in data: result.extend(read_dict_all_values(item)) elif isinstance(data, dict): for key, value in data.items(): result.extend(read_dict_all_values(value)) else: if isinstance(data, bytes): tmp = data.decode("utf-8") else: tmp = str(data) if isinstance(data, int) else data result.append(tmp) for i in range(len(result)): if isinstance(result[i], bytes): result[i] = result[i].decode("utf-8") return result def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"): """ 匹配 BytesExtra :param BytesExtra: BytesExtra :param pattern: 匹配模式 :return: """ if not BytesExtra: return False BytesExtra = read_dict_all_values(BytesExtra) BytesExtra = "'" + "'".join(BytesExtra) + "'" # print(BytesExtra) match = re.search(pattern, BytesExtra) if match: video_path = match.group(0).replace("'", "") return video_path else: return "" def silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=24000): silk_file = BytesIO(buf_data) # 读取silk文件 pcm_file = BytesIO() # 创建pcm文件 pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件 pcm_data = pcm_file.getvalue() # 获取pcm文件数据 silk_file.close() # 关闭silk文件 pcm_file.close() # 关闭pcm文件 if is_play: # 播放音频 def play_audio(pcm_data, rate): try: import pyaudio except ImportError: raise ImportError("请先安装pyaudio库[ pip install pyaudio ]") p = pyaudio.PyAudio() # 实例化pyaudio stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象 stream.write(pcm_data) # 写入音频流 stream.stop_stream() # 停止音频流 stream.close() # 关闭音频流 p.terminate() # 关闭pyaudio play_audio(pcm_data, rate) # print(is_play, is_wave, save_path) if is_wave: # 转换为wav文件 wave_file = BytesIO() # 创建wav文件 with wave.open(wave_file, 'wb') as wf: wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数 wf.writeframes(pcm_data) # 写入wav文件 rdata = wave_file.getvalue() # 获取wav文件数据 wave_file.close() # 关闭wav文件 if save_path and isinstance(save_path, str): with open(save_path, "wb") as f: f.write(rdata) print('saved wav file') return rdata return pcm_data