PyWxDump/pywxdump/analyzer/utils.py

290 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: utils.py
# Description:
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
import hashlib
import os
import re
import sqlite3
import time
def time_int2str(time_int):
"""
时间戳转换为时间字符串
:param time_int: 时间戳
:return: 时间字符串
"""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_int))
def read_dict_all_values(data):
"""
读取字典中所有的值(单层)
:param dict_data: 字典
:return: 所有值的list
"""
result = []
if isinstance(data, list):
for item in data:
result.extend(read_dict_all_values(item))
elif isinstance(data, dict):
for key, value in data.items():
result.extend(read_dict_all_values(value))
else:
if isinstance(data, bytes):
tmp = data.decode("utf-8")
else:
tmp = str(data) if isinstance(data, int) else data
result.append(tmp)
for i in range(len(result)):
if isinstance(result[i], bytes):
result[i] = result[i].decode("utf-8")
return result
def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"):
"""
匹配 BytesExtra
:param BytesExtra: BytesExtra
:param pattern: 匹配模式
:return:
"""
if not BytesExtra:
return False
BytesExtra = read_dict_all_values(BytesExtra)
BytesExtra = "'" + "'".join(BytesExtra) + "'"
# print(BytesExtra)
match = re.search(pattern, BytesExtra)
if match:
video_path = match.group(0).replace("'", "")
return video_path
else:
return ""
def get_type_name(type_id: tuple):
"""
获取消息类型名称
:param type_id: 消息类型ID 元组 eg: (1, 0)
:return:
"""
type_name_dict = {
(1, 0): "文本",
(3, 0): "图片",
(34, 0): "语音",
(43, 0): "视频",
(47, 0): "动画表情",
(37, 0): "添加好友", # 感谢 https://github.com/zhyc9de
(42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de
(48, 0): "地图信息", # 感谢 https://github.com/zhyc9de
(49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de
(49, 53): "接龙", # 感谢 https://github.com/zhyc9de
(49, 0): "文件",
(49, 1): "类似文字消息而不一样的消息",
(49, 5): "卡片式链接",
(49, 6): "文件",
(49, 8): "用户上传的GIF表情",
(49, 19): "合并转发的聊天记录",
(49, 33): "分享的小程序",
(49, 36): "分享的小程序",
(49, 57): "带有引用的文本消息",
(49, 63): "视频号直播或直播回放等",
(49, 87): "群公告",
(49, 88): "视频号直播或直播回放等",
(49, 2000): "转账消息",
(49, 2003): "赠送红包封面",
(50, 0): "语音通话",
(10000, 0): "系统通知",
(10000, 4): "拍一拍",
(10000, 8000): "系统通知"
}
if type_id in type_name_dict:
return type_name_dict[type_id]
else:
return "未知"
def get_name_typeid(type_name: str):
"""
获取消息类型名称
:param type_id: 消息类型ID 元组 eg: (1, 0)
:return:
"""
type_name_dict = {
(1, 0): "文本",
(3, 0): "图片",
(34, 0): "语音",
(43, 0): "视频",
(47, 0): "动画表情",
(37, 0): "添加好友", # 感谢 https://github.com/zhyc9de
(42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de
(48, 0): "地图信息", # 感谢 https://github.com/zhyc9de
(49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de
(49, 53): "接龙", # 感谢 https://github.com/zhyc9de
(49, 0): "文件",
(49, 1): "类似文字消息而不一样的消息",
(49, 5): "卡片式链接",
(49, 6): "文件",
(49, 8): "用户上传的GIF表情",
(49, 19): "合并转发的聊天记录",
(49, 33): "分享的小程序",
(49, 36): "分享的小程序",
(49, 57): "带有引用的文本消息",
(49, 63): "视频号直播或直播回放等",
(49, 87): "群公告",
(49, 88): "视频号直播或直播回放等",
(49, 2000): "转账消息",
(49, 2003): "赠送红包封面",
(50, 0): "语音通话",
(10000, 0): "系统通知",
(10000, 4): "拍一拍",
(10000, 8000): "系统通知"
}
type_tup = []
for k, v in type_name_dict.items():
if v == type_name:
type_tup.append(k)
return type_tup
def get_md5(data):
"""
获取数据的 MD5 值
:param data: 数据bytes
:return:
"""
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
import threading
def get_thread_id():
current_thread = threading.current_thread()
thread_id = current_thread.ident
return thread_id
class DBPool:
__db_pool = {}
__thread_pool = {}
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_instance'):
cls._instance = super(DBPool, cls).__new__(cls)
return cls._instance
@classmethod
def create_connection(cls, db_path):
if db_path == "DBPOOL_INIT":
return
if not os.path.exists(db_path):
raise FileNotFoundError(f"数据库文件不存在:{db_path}")
if db_path not in cls.__db_pool:
cls.__db_pool[db_path] = sqlite3.connect(db_path, check_same_thread=False)
cls.connection = cls.__db_pool[db_path]
def __init__(self, db_path):
if db_path == "DBPOOL_INIT":
return
self.db_path = db_path
if db_path not in self.__db_pool:
self.create_connection(db_path)
self.connection = self.__db_pool.get(db_path)
# 检查数据库是否关闭
if not self.connection:
self.create_connection(db_path)
self.connection = self.__db_pool.get(db_path)
def __enter__(self):
return self.connection
def __exit__(self, exc_type, exc_val, exc_tb):
self.connection = None
def close_all(self):
for db_path, connection in self.__db_pool.items():
connection.close()
self.__db_pool.clear()
def attach_databases(connection, databases):
"""
将多个数据库附加到给定的SQLite连接。
参数:
-连接SQLite连接
-数据库:包含数据库别名和文件路径的词典
"""
cursor = connection.cursor()
for alias, file_path in databases.items():
attach_command = f"ATTACH DATABASE '{file_path}' AS {alias};"
cursor.execute(attach_command)
connection.commit()
def detach_databases(connection, aliases):
"""
从给定的 SQLite 连接中分离多个数据库。
参数:
- connection SQLite连接
- aliases要分离的数据库别名列表
"""
cursor = connection.cursor()
for alias in aliases:
detach_command = f"DETACH DATABASE {alias};"
cursor.execute(detach_command)
connection.commit()
def execute_sql(connection, sql, params=None):
"""
执行给定的SQL语句返回结果。
参数:
- connection SQLite连接
- sql要执行的SQL语句
- paramsSQL语句中的参数
"""
try:
# connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.fetchall()
except Exception as e1:
try:
connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
rdata = cursor.fetchall()
connection.text_factory = str
return rdata
except Exception as e2:
print(f"**********\nSQL: {sql}\nparams: {params}\n{e1}\n{e2}\n**********")
return None
if __name__ == '__main__':
pass