diff --git a/README.md b/README.md index 46473bc..90b5d5f 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ QQ GROUP:[276392799](https://s.xaoyo.top/gOLUDl) or [276392799](https://s.xaoy * Project address: https://github.com/xaoyaoo/PyWxDump * Currently tested only under Windows, there may be issues under mac and Linux. -* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [version_list.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/version_list.json), please submit an issue on GitHub. +* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [WX_OFFS.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/WX_OFFS.json), please submit an issue on GitHub. * For common issues, please refer to [FAQ](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/FAQ.md), and for the update log, please refer to [CHANGELOG](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/CHANGELOG.md) * Web UI repository location [wxdump_web](https://github.com/xaoyaoo/wxdump_web ) * If you are interested in the implementation principle of wxdump, please pay attention to the Official Accounts: `逍遥之芯`, reply: `原理` to get the principle analysis. diff --git a/doc/CHANGELOG.md b/doc/CHANGELOG.md index 1463ad1..16e3437 100644 --- a/doc/CHANGELOG.md +++ b/doc/CHANGELOG.md @@ -8,7 +8,7 @@ - fix 部分图片无法读取 - update README - fix 多重引用消息显示错误 -- Update version_list.json (#110) +- Update WX_OFFS.json (#110) - Merge branch 'master' of github.com:xaoyaoo/PyWxDump ## v3.0.41 @@ -845,7 +845,7 @@ ## v2.1.11 -- 修改version_list +- 修改WX_OFFS - 修复3.9.2.*版本无法正常运行 - 添加自动发布到pypi的github action @@ -860,7 +860,7 @@ ## v2.1.7 -- update version_list +- update WX_OFFS - 添加自动发布到pypi的github action - add auto get bias addr ,not need input key or wx folder path. diff --git a/doc/README_CN.md b/doc/README_CN.md index e125eb9..cdf9546 100644 --- a/doc/README_CN.md +++ b/doc/README_CN.md @@ -84,7 +84,7 @@ QQ交流群:[276392799](https://s.xaoyo.top/gOLUDl) or [276392799](https://s.x * 项目地址:https://github.com/xaoyaoo/PyWxDump * 目前只在windows下测试过,mac、linux下可能会存在问题。 -* 如发现[version_list.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/version_list.json)缺失或错误、bug,有改进意见、想要新增功能, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues). +* 如发现[WX_OFFS.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/WX_OFFS.json)缺失或错误、bug,有改进意见、想要新增功能, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues). * 常见问题请参考[FAQ](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/FAQ.md),更新日志请参考[CHANGELOG](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/CHANGELOG.md) * Web UI的仓库位置 [wxdump_web](https://github.com/xaoyaoo/wxdump_web) * 如果对wxdump实现原理感兴趣,请关注公众号:`逍遥之芯`,回复:`原理` 获取原理解析。 diff --git a/doc/README_EN.md b/doc/README_EN.md index 46473bc..90b5d5f 100644 --- a/doc/README_EN.md +++ b/doc/README_EN.md @@ -85,7 +85,7 @@ QQ GROUP:[276392799](https://s.xaoyo.top/gOLUDl) or [276392799](https://s.xaoy * Project address: https://github.com/xaoyaoo/PyWxDump * Currently tested only under Windows, there may be issues under mac and Linux. -* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [version_list.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/version_list.json), please submit an issue on GitHub. +* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [WX_OFFS.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/WX_OFFS.json), please submit an issue on GitHub. * For common issues, please refer to [FAQ](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/FAQ.md), and for the update log, please refer to [CHANGELOG](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/CHANGELOG.md) * Web UI repository location [wxdump_web](https://github.com/xaoyaoo/wxdump_web ) * If you are interested in the implementation principle of wxdump, please pay attention to the Official Accounts: `逍遥之芯`, reply: `原理` to get the principle analysis. diff --git a/doc/UserGuide.md b/doc/UserGuide.md index 234fac3..240e2c9 100644 --- a/doc/UserGuide.md +++ b/doc/UserGuide.md @@ -93,14 +93,14 @@ wxdump -h # 查看具体帮助 ```bash wxdump bias -h # 查看具体帮助 -wxdump bias --mobile <手机号> --name <微信昵称> --account <微信账号> [--key <密钥>] [--db_path <已登录账号的微信文件夹路径>] [--version_list_path <微信版本偏移文件路径>] +wxdump bias --mobile <手机号> --name <微信昵称> --account <微信账号> [--key <密钥>] [--db_path <已登录账号的微信文件夹路径>] [--WX_OFFS_path <微信版本偏移文件路径>] ``` ##### 获取微信信息 ```bash wxdump info -h # 查看具体帮助 -wxdump info [--version_list_path <微信版本偏移文件路径>] +wxdump info [--WX_OFFS_path <微信版本偏移文件路径>] ``` ##### 获取微信文件夹路径 @@ -160,13 +160,13 @@ args = { "account": "微信账号", # 微信账号 "key": "密钥", # 密钥(可选) "db_path": "已登录账号的微信文件夹路径", # 微信文件夹路径(可选) - "version_list_path": "微信版本偏移文件路径" # 微信版本偏移文件路径(可选) + "WX_OFFS_path": "微信版本偏移文件路径" # 微信版本偏移文件路径(可选) } bias_addr = BiasAddr(args["account"], args["mobile"], args["name"], args["key"], args["db_path"]) -result = bias_addr.run(True, args["version_list_path"]) +result = bias_addr.run(True, args["WX_OFFS_path"]) # ************************************************************************************************ # # 获取微信信息 -wx_info = read_info(VERSION_LIST, True) +wx_info = read_info(WX_OFFS, True) # 获取微信文件夹路径 args = { diff --git a/pywxdump/wx_info/get_wx_info.py b/pywxdump/wx_info/get_wx_info.py deleted file mode 100644 index d00a0b3..0000000 --- a/pywxdump/wx_info/get_wx_info.py +++ /dev/null @@ -1,455 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: getwxinfo.py -# Description: -# Author: xaoyaoo -# Date: 2023/08/21 -# ------------------------------------------------------------------------------- -import ctypes -import json -import os -import re -import winreg -from typing import List, Union -from .utils import verify_key, get_exe_version, get_exe_bit, info_error -from .ctypes_utils import get_process_list, get_info_with_key, get_memory_maps, get_process_exe_path, \ - get_file_version_info -from .memory_search import search_memory -import ctypes.wintypes as wintypes - -# 定义常量 -PROCESS_QUERY_INFORMATION = 0x0400 -PROCESS_VM_READ = 0x0010 - -kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) -OpenProcess = kernel32.OpenProcess -OpenProcess.restype = wintypes.HANDLE -OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD] - -CloseHandle = kernel32.CloseHandle -CloseHandle.restype = wintypes.BOOL -CloseHandle.argtypes = [wintypes.HANDLE] - -ReadProcessMemory = kernel32.ReadProcessMemory -void_p = ctypes.c_void_p - - -# 读取内存中的字符串(key部分) -@info_error -def get_info_with_key(h_process, address, address_len=8): - array = ctypes.create_string_buffer(address_len) - if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None" - address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址) - key = ctypes.create_string_buffer(32) - if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None" - key_string = bytes(key).hex() - return key_string - - -# 读取内存中的字符串(非key部分) -@info_error -def get_info_string(h_process, address, n_size=64): - array = ctypes.create_string_buffer(n_size) - if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None" - array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array) - text = array.decode('utf-8', errors='ignore') - return text.strip() if text.strip() != "" else "None" - - -# 读取内存中的字符串(昵称部分name) -@info_error -def get_info_name(h_process, address, address_len=8, n_size=64): - array = ctypes.create_string_buffer(n_size) - if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None" - address1 = int.from_bytes(array[:address_len], byteorder='little') # 逆序转换为int地址(key地址) - info_name = get_info_string(h_process, address1, n_size) - if info_name != "None": - return info_name - array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array) - text = array.decode('utf-8', errors='ignore') - return text.strip() if text.strip() != "" else "None" - - -# 读取内存中的wxid -@info_error -def get_info_wxid(h_process): - find_num = 100 - addrs = search_memory(h_process, br'\\Msg\\FTSContact', max_num=find_num) - wxids = [] - for addr in addrs: - array = ctypes.create_string_buffer(80) - if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return "None" - array = bytes(array) # .split(b"\\")[0] - array = array.split(b"\\Msg")[0] - array = array.split(b"\\")[-1] - wxids.append(array.decode('utf-8', errors='ignore')) - wxid = max(wxids, key=wxids.count) if wxids else "None" - CloseHandle(h_process) - return wxid - - -# 读取内存中的filePath基于wxid(慢) -@info_error -def get_info_filePath_base_wxid(h_process, wxid=""): - find_num = 10 - addrs = search_memory(h_process, wxid.encode() + br'\\Msg\\FTSContact', max_num=find_num) - filePath = [] - for addr in addrs: - win_addr_len = 260 - array = ctypes.create_string_buffer(win_addr_len) - if ReadProcessMemory(h_process, void_p(addr - win_addr_len + 50), array, win_addr_len, 0) == 0: return "None" - array = bytes(array).split(b"\\Msg")[0] - array = array.split(b"\00")[-1] - filePath.append(array.decode('utf-8', errors='ignore')) - filePath = max(filePath, key=filePath.count) if filePath else "None" - CloseHandle(h_process) - return filePath - - -@info_error -def get_info_filePath(wxid="all"): - """ - # 读取filePath (微信文件路径) (快) - :param wxid: 微信id - :return: 返回filePath - """ - if not wxid: - return "None" - w_dir = "MyDocument:" - is_w_dir = False - - try: - key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ) - value, _ = winreg.QueryValueEx(key, "FileSavePath") - winreg.CloseKey(key) - w_dir = value - is_w_dir = True - except Exception as e: - w_dir = "MyDocument:" - - if not is_w_dir: - try: - user_profile = os.environ.get("USERPROFILE") - path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users", "config", - "3ebffe94.ini") - with open(path_3ebffe94, "r", encoding="utf-8") as f: - w_dir = f.read() - is_w_dir = True - except Exception as e: - w_dir = "MyDocument:" - - if w_dir == "MyDocument:": - try: - # 打开注册表路径 - key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, - r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders") - documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径 - winreg.CloseKey(key) # 关闭注册表 - documents_paths = os.path.split(documents_path) - if "%" in documents_paths[0]: - w_dir = os.environ.get(documents_paths[0].replace("%", "")) - w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:])) - # print(1, w_dir) - else: - w_dir = documents_path - except Exception as e: - profile = os.environ.get("USERPROFILE") - w_dir = os.path.join(profile, "Documents") - - msg_dir = os.path.join(w_dir, "WeChat Files") - - if wxid == "all" and os.path.exists(msg_dir): - return msg_dir - - filePath = os.path.join(msg_dir, wxid) - return filePath if os.path.exists(filePath) else "None" - - -@info_error -def get_key(pid, db_path, addr_len): - """ - 获取key (慢) - :param pid: 进程id - :param db_path: 微信数据库路径 - :param addr_len: 地址长度 - :return: 返回key - """ - - def read_key_bytes(h_process, address, address_len=8): - array = ctypes.create_string_buffer(address_len) - if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None" - address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址) - key = ctypes.create_string_buffer(32) - if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None" - key_bytes = bytes(key) - return key_bytes - - phone_type1 = "iphone\x00" - phone_type2 = "android\x00" - phone_type3 = "ipad\x00" - - MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db") - - start_adress = 0 - end_adress = 0x7FFFFFFFFFFFFFFF - - memory_maps = get_memory_maps(pid) - for module in memory_maps: - if module.FileName and 'WeChatWin.dll' in module.FileName: - start_adress = module.BaseAddress - end_adress = module.BaseAddress + module.RegionSize - break - # print(start_adress, end_adress) - hProcess = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid) - type1_addrs = search_memory(hProcess, phone_type1.encode(), max_num=2, start_address=start_adress, - end_address=end_adress) - type2_addrs = search_memory(hProcess, phone_type2.encode(), max_num=2, start_address=start_adress, - end_address=end_adress) - type3_addrs = search_memory(hProcess, phone_type3.encode(), max_num=2, start_address=start_adress, - end_address=end_adress) - - type_addrs = [] - if len(type1_addrs) >= 2: type_addrs += type1_addrs - if len(type2_addrs) >= 2: type_addrs += type2_addrs - if len(type3_addrs) >= 2: type_addrs += type3_addrs - if len(type_addrs) == 0: return "None" - - type_addrs.sort() # 从小到大排序 - - for i in type_addrs[::-1]: - for j in range(i, i - 2000, -addr_len): - key_bytes = read_key_bytes(hProcess, j, addr_len) - if key_bytes == "None": - continue - if verify_key(key_bytes, MicroMsg_path): - return key_bytes.hex() - return "None" - - -def get_details(pid, version_list: dict = None, is_logging: bool = False): - path = get_process_exe_path(pid) - rd = {'pid': pid, 'version': get_file_version_info(path), - "account": "None", "mobile": "None", "name": "None", "mail": "None", - "wxid": "None", "key": "None", "filePath": "None"} - try: - bias_list = version_list.get(rd['version'], None) - - Handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid) - - addrLen = get_exe_bit(path) // 8 - if not isinstance(bias_list, list) or len(bias_list) <= 4: - error = f"[-] WeChat Current Version Is Not Supported(maybe not get account,mobile,name,mail)" - if is_logging: print(error) - else: - wechat_base_address = 0 - memory_maps = get_memory_maps(pid) - for module in memory_maps: - if module.FileName and 'WeChatWin.dll' in module.FileName: - wechat_base_address = module.BaseAddress - rd['version'] = get_file_version_info(module.FileName) if os.path.exists(module.FileName) else rd[ - 'version'] - bias_list = version_list.get(rd['version'], None) - break - if wechat_base_address == 0: - error = f"[-] WeChat WeChatWin.dll Not Found" - if is_logging: print(error) - name_baseaddr = wechat_base_address + bias_list[0] - account_baseaddr = wechat_base_address + bias_list[1] - mobile_baseaddr = wechat_base_address + bias_list[2] - mail_baseaddr = wechat_base_address + bias_list[3] - key_baseaddr = wechat_base_address + bias_list[4] - - rd['account'] = get_info_string(Handle, account_baseaddr, 32) if bias_list[1] != 0 else "None" - rd['mobile'] = get_info_string(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None" - rd['name'] = get_info_name(Handle, name_baseaddr, addrLen, 64) if bias_list[0] != 0 else "None" - rd['mail'] = get_info_string(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None" - rd['key'] = get_info_with_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None" - - rd['wxid'] = get_info_wxid(Handle) - - rd['filePath'] = get_info_filePath(rd['wxid']) if rd['wxid'] != "None" else "None" - if rd['wxid'] != "None" and rd['filePath'] == "None": # 通过wxid获取filePath,如果filePath为空则通过wxid获取filePath - rd['filePath'] = get_info_filePath_base_wxid(Handle, wxid=rd['wxid']) - - isKey = verify_key( - bytes.fromhex(rd["key"]), - os.path.join(rd['filePath'], "MSG", "MicroMsg.db")) if rd['key'] != "None" and rd[ - 'filePath'] != "None" else False - - if rd['filePath'] != "None" and rd['key'] == "None" and not isKey: - rd['key'] = get_key(rd['pid'], rd['filePath'], addrLen) - CloseHandle(Handle) - except Exception as e: - error = f"[-] WeChat Get Info Error:{e}" - if is_logging: print(error) - return rd - - -# 读取微信信息(account,mobile,name,mail,wxid,key) -def read_info(version_list: dict = None, is_logging: bool = False, save_path: str = None): - """ - 读取微信信息(account,mobile,name,mail,wxid,key) - :param version_list: 版本偏移量 - :param is_logging: 是否打印日志 - :param save_path: 保存路径 - :return: 返回微信信息 [{"pid": pid, "version": version, "account": account, - "mobile": mobile, "name": name, "mail": mail, "wxid": wxid, - "key": key, "filePath": filePath}, ...] - """ - if version_list is None: - version_list = {} - - wechat_pids = [] - result = [] - error = "" - - processes = get_process_list() - for pid, name in processes: - if name == "WeChat.exe": - wechat_pids.append(pid) - - if len(wechat_pids) <= 0: - error = "[-] WeChat No Run" - if is_logging: print(error) - return error - - for pid in wechat_pids: - rd = get_details(pid, version_list, is_logging) - result.append(rd) - - if is_logging: - print("=" * 32) - if isinstance(result, str): # 输出报错 - print(result) - else: # 输出结果 - for i, rlt in enumerate(result): - for k, v in rlt.items(): - print(f"[+] {k:>8}: {v}") - print(end="-" * 32 + "\n" if i != len(result) - 1 else "") - print("=" * 32) - - if save_path: - try: - infos = json.load(open(save_path, "r", encoding="utf-8")) if os.path.exists(save_path) else [] - except: - infos = [] - with open(save_path, "w", encoding="utf-8") as f: - infos += result - json.dump(infos, f, ensure_ascii=False, indent=4) - return result - - -def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None, - is_logging: bool = False, is_return_list: bool = False) -> Union[str, dict, list]: - r""" - 获取微信数据库路径 - :param require_list: 需要获取的数据库类型 - :param msg_dir: 微信数据库目录 eg: C:\Users\user\Documents\WeChat Files - :param wxid: 微信id - :param is_logging: 是否打印日志 - :return: - """ - - if not msg_dir: - msg_dir = get_info_filePath(wxid="all") - - if not os.path.exists(msg_dir): - error = f"[-] 目录不存在: {msg_dir}" - if is_logging: print(error) - return error - - user_dirs = {} # wx用户目录 - files = os.listdir(msg_dir) - if wxid: # 如果指定wxid - if isinstance(wxid, str): - wxid = wxid.split(";") - for file_name in files: - if file_name in wxid: - user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name) - else: # 如果未指定wxid - for file_name in files: - if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF": - continue - user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name) - - if isinstance(require_list, str): - require_list = require_list.split(";") - - # generate pattern - if "all" in require_list: - pattern = {"all": re.compile(r".*\.db$")} - elif isinstance(require_list, list): - pattern = {} - for require in require_list: - pattern[require] = re.compile(r"%s.*?\.db$" % require) - else: - error = f"[-] 参数错误: {require_list}" - if is_logging: print(error) - return error - - if is_return_list: # 如果返回列表,返回值:{wxid:[db_path1,db_path2]} - db_list = {} - # 获取数据库路径 - for user, user_dir in user_dirs.items(): # 遍历用户目录 - db_list[user] = [] - for root, dirs, files in os.walk(user_dir): - for file_name in files: - for n, p in pattern.items(): - if p.match(file_name): - src_path = os.path.join(root, file_name) - db_list[user].append(src_path) - return db_list - - # 获取数据库路径 - for user, user_dir in user_dirs.items(): # 遍历用户目录 - user_dirs[user] = {n: [] for n in pattern.keys()} - for root, dirs, files in os.walk(user_dir): - for file_name in files: - for n, p in pattern.items(): - if p.match(file_name): - src_path = os.path.join(root, file_name) - user_dirs[user][n].append(src_path) - - if is_logging: - for user, user_dir in user_dirs.items(): - print(f"[+] user_path: {user}") - for n, paths in user_dir.items(): - print(f" {n}:") - for path in paths: - print(f" {path.replace(user, '')}") - print("-" * 32) - print(f"[+] 共 {len(user_dirs)} 个微信账号") - return user_dirs - - -def get_core_db(wx_path: str, db_type: list = None) -> [str]: - """ - 获取聊天消息核心数据库路径 - :param wx_path: 微信文件夹路径 eg:C:\*****\WeChat Files\wxid******* - :param db_type: 数据库类型 eg: ["MSG", "MediaMSG", "MicroMsg"],三个中选择一个或多个 - :return: 返回数据库路径 eg:["",""] - """ - if not os.path.exists(wx_path): - return False, f"[-] 目录不存在: {wx_path}" - db_type_all = ["MSG", "MediaMSG", "MicroMsg", "OpenIMContact", "OpenIMMedia", "OpenIMMsg", "Favorite", "PublicMsg"] - - if not db_type: - db_type = db_type_all - - db_type = [dt for dt in db_type if dt in db_type_all] - - msg_dir = os.path.dirname(wx_path) - my_wxid = os.path.basename(wx_path) - WxDbPath = get_wechat_db(db_type, msg_dir, wxid=my_wxid, is_logging=False, is_return_list=True) # 获取微信数据库路径 - if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错 - return False, WxDbPath - wxdbpaths = WxDbPath.get(wx_path, []) - if len(wxdbpaths) == 0: - return False, "未获取到数据库路径" - return True, wxdbpaths - - -if __name__ == '__main__': - from pywxdump import VERSION_LIST - - read_info(VERSION_LIST, is_logging=True) diff --git a/pywxdump/wx_info/merge_db.py b/pywxdump/wx_info/merge_db.py deleted file mode 100644 index 7c1f3a2..0000000 --- a/pywxdump/wx_info/merge_db.py +++ /dev/null @@ -1,468 +0,0 @@ -# -*- coding: utf-8 -*-# -# ------------------------------------------------------------------------------- -# Name: merge_db.py -# Description: -# Author: xaoyaoo -# Date: 2023/12/03 -# ------------------------------------------------------------------------------- -import logging -import os -import random -import shutil -import sqlite3 -import subprocess -import time -from typing import List - - -def merge_copy_db(db_path, save_path): - logging.warning("merge_copy_db is deprecated, use merge_db instead, will be removed in the future.") - if isinstance(db_path, list) and len(db_path) == 1: - db_path = db_path[0] - if not os.path.exists(db_path): - raise FileNotFoundError("目录不存在") - shutil.move(db_path, save_path) - - -# 合并相同名称的数据库 MSG0-MSG9.db -def merge_msg_db(db_path: list, save_path: str, CreateTime: int = 0): # CreateTime: 从这个时间开始的消息 10位时间戳 - logging.warning("merge_msg_db is deprecated, use merge_db instead, will be removed in the future.") - # 判断save_path是否为文件夹 - if os.path.isdir(save_path): - save_path = os.path.join(save_path, "merge_MSG.db") - - merged_conn = sqlite3.connect(save_path) - merged_cursor = merged_conn.cursor() - - for db_file in db_path: - c_tabels = merged_cursor.execute( - "select tbl_name from sqlite_master where type='table' and tbl_name!='sqlite_sequence'") - tabels_all = c_tabels.fetchall() # 所有表名 - tabels_all = [row[0] for row in tabels_all] - - conn = sqlite3.connect(db_file) - cursor = conn.cursor() - - # 创建表 - if len(tabels_all) < 4: - cursor.execute( - "select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'") - c_part = cursor.fetchall() - - for tbl_name, sql in c_part: - if tbl_name in tabels_all: - continue - try: - merged_cursor.execute(sql) - tabels_all.append(tbl_name) - except Exception as e: - print(f"error: {db_file}\n{tbl_name}\n{sql}\n{e}\n**********") - raise e - merged_conn.commit() - - # 写入数据 - for tbl_name in tabels_all: - if tbl_name == "MSG": - MsgSvrIDs = merged_cursor.execute( - f"select MsgSvrID from MSG where CreateTime>{CreateTime} and MsgSvrID!=0").fetchall() - - cursor.execute(f"PRAGMA table_info({tbl_name})") - columns = cursor.fetchall() - columns = [column[1] for column in columns[1:]] - - ex_sql = f"select {','.join(columns)} from {tbl_name} where CreateTime>{CreateTime} and MsgSvrID not in ({','.join([str(MsgSvrID[0]) for MsgSvrID in MsgSvrIDs])})" - cursor.execute(ex_sql) - - insert_sql = f"INSERT INTO {tbl_name} ({','.join(columns)}) VALUES ({','.join(['?' for _ in range(len(columns))])})" - try: - merged_cursor.executemany(insert_sql, cursor.fetchall()) - except Exception as e: - print( - f"error: {db_file}\n{tbl_name}\n{insert_sql}\n{cursor.fetchall()}\n{len(cursor.fetchall())}\n{e}\n**********") - raise e - merged_conn.commit() - else: - ex_sql = f"select * from {tbl_name}" - cursor.execute(ex_sql) - - for r in cursor.fetchall(): - cursor.execute(f"PRAGMA table_info({tbl_name})") - columns = cursor.fetchall() - if len(columns) > 1: - columns = [column[1] for column in columns[1:]] - values = r[1:] - else: - columns = [columns[0][1]] - values = [r[0]] - - query_1 = "select * from " + tbl_name + " where " + columns[0] + "=?" # 查询语句 用于判断是否存在 - c2 = merged_cursor.execute(query_1, values) - if len(c2.fetchall()) > 0: # 已存在 - continue - query = "INSERT INTO " + tbl_name + " (" + ",".join(columns) + ") VALUES (" + ",".join( - ["?" for _ in range(len(values))]) + ")" - - try: - merged_cursor.execute(query, values) - except Exception as e: - print(f"error: {db_file}\n{tbl_name}\n{query}\n{values}\n{len(values)}\n{e}\n**********") - raise e - merged_conn.commit() - - conn.close() - sql = '''delete from MSG where localId in (SELECT localId from MSG - where MsgSvrID != 0 and MsgSvrID in (select MsgSvrID from MSG - where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1) - and localId not in (select min(localId) from MSG - where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1))''' - c = merged_cursor.execute(sql) - merged_conn.commit() - merged_conn.close() - return save_path - - -def merge_media_msg_db(db_path: list, save_path: str): - logging.warning("merge_media_msg_db is deprecated, use merge_db instead, will be removed in the future.") - # 判断save_path是否为文件夹 - if os.path.isdir(save_path): - save_path = os.path.join(save_path, "merge_Media.db") - merged_conn = sqlite3.connect(save_path) - merged_cursor = merged_conn.cursor() - - for db_file in db_path: - - s = "select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'" - have_tables = merged_cursor.execute(s).fetchall() - have_tables = [row[0] for row in have_tables] - - conn_part = sqlite3.connect(db_file) - cursor = conn_part.cursor() - - if len(have_tables) < 1: - cursor.execute(s) - table_part = cursor.fetchall() - tblname, sql = table_part[0] - - sql = "CREATE TABLE Media(localId INTEGER PRIMARY KEY AUTOINCREMENT,Key TEXT,Reserved0 INT,Buf BLOB,Reserved1 INT,Reserved2 TEXT)" - try: - merged_cursor.execute(sql) - have_tables.append(tblname) - except Exception as e: - print(f"error: {db_file}\n{tblname}\n{sql}\n{e}\n**********") - raise e - merged_conn.commit() - - for tblname in have_tables: - s = "select Reserved0 from " + tblname - merged_cursor.execute(s) - r0 = merged_cursor.fetchall() - - ex_sql = f"select `Key`,Reserved0,Buf,Reserved1,Reserved2 from {tblname} where Reserved0 not in ({','.join([str(r[0]) for r in r0])})" - cursor.execute(ex_sql) - data = cursor.fetchall() - - insert_sql = f"INSERT INTO {tblname} (Key,Reserved0,Buf,Reserved1,Reserved2) VALUES ({','.join(['?' for _ in range(5)])})" - try: - merged_cursor.executemany(insert_sql, data) - except Exception as e: - print(f"error: {db_file}\n{tblname}\n{insert_sql}\n{data}\n{len(data)}\n{e}\n**********") - raise e - merged_conn.commit() - conn_part.close() - - merged_conn.close() - return save_path - - -def execute_sql(connection, sql, params=None): - """ - 执行给定的SQL语句,返回结果。 - 参数: - - connection: SQLite连接 - - sql:要执行的SQL语句 - - params:SQL语句中的参数 - """ - try: - # connection.text_factory = bytes - cursor = connection.cursor() - if params: - cursor.execute(sql, params) - else: - cursor.execute(sql) - return cursor.fetchall() - except Exception as e: - try: - connection.text_factory = bytes - cursor = connection.cursor() - if params: - cursor.execute(sql, params) - else: - cursor.execute(sql) - rdata = cursor.fetchall() - connection.text_factory = str - return rdata - except Exception as e: - logging.error(f"**********\nSQL: {sql}\nparams: {params}\n{e}\n**********", exc_info=True) - return None - - -def merge_db(db_paths, save_path="merge.db", startCreateTime: int = 0, endCreateTime: int = 0): - """ - 合并数据库 会忽略主键以及重复的行。 - :param db_paths: - :param save_path: - :param CreateTime: - :return: - """ - if os.path.isdir(save_path): - save_path = os.path.join(save_path, f"merge_{int(time.time())}.db") - - _db_paths = [] - if isinstance(db_paths, str): - if os.path.isdir(db_paths): - _db_paths = [os.path.join(db_paths, i) for i in os.listdir(db_paths) if i.endswith(".db")] - elif os.path.isfile(db_paths): - _db_paths = [db_paths] - else: - raise FileNotFoundError("db_paths 不存在") - - if isinstance(db_paths, list): - # alias, file_path - databases = {f"MSG{i}": db_path for i, db_path in enumerate(db_paths)} - else: - raise TypeError("db_paths 类型错误") - - outdb = sqlite3.connect(save_path) - out_cursor = outdb.cursor() - - # 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间 - sync_log_status = execute_sql(outdb, "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_log'") - if len(sync_log_status) < 1: - # db_path 微信数据库路径,tbl_name 表名,src_count 源数据库记录数,current_count 当前合并后的数据库对应表记录数 - sync_record_create_sql = ("CREATE TABLE sync_log (" - "id INTEGER PRIMARY KEY AUTOINCREMENT," - "db_path TEXT NOT NULL," - "tbl_name TEXT NOT NULL," - "src_count INT," - "current_count INT," - "createTime INT DEFAULT (strftime('%s', 'now')), " - "updateTime INT DEFAULT (strftime('%s', 'now'))" - ");") - out_cursor.execute(sync_record_create_sql) - # 创建索引 - out_cursor.execute("CREATE INDEX idx_sync_log_db_path ON sync_log (db_path);") - out_cursor.execute("CREATE INDEX idx_sync_log_tbl_name ON sync_log (tbl_name);") - # 创建联合索引,防止重复 - out_cursor.execute("CREATE UNIQUE INDEX idx_sync_log_db_tbl ON sync_log (db_path, tbl_name);") - outdb.commit() - - # 将MSG_db_paths中的数据合并到out_db_path中 - for alias, path in databases.items(): - # 附加数据库 - sql_attach = f"ATTACH DATABASE '{path}' AS {alias}" - out_cursor.execute(sql_attach) - outdb.commit() - sql_query_tbl_name = f"SELECT name FROM {alias}.sqlite_master WHERE type='table' ORDER BY name;" - tables = execute_sql(outdb, sql_query_tbl_name) - for table in tables: - table = table[0] - if table == "sqlite_sequence": - continue - # 获取表中的字段名 - sql_query_columns = f"PRAGMA table_info({table})" - columns = execute_sql(outdb, sql_query_columns) - col_type = { - (i[1] if isinstance(i[1], str) else i[1].decode(), - i[2] if isinstance(i[2], str) else i[2].decode()) - for i in columns} - columns = [i[0] for i in col_type] - if not columns or len(columns) < 1: - continue - # 创建表table - sql_create_tbl = f"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM {alias}.{table} WHERE 0 = 1;" - out_cursor.execute(sql_create_tbl) - # 创建包含 NULL 值比较的 UNIQUE 索引 - index_name = f"{table}_unique_index" - coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns) - sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})" - out_cursor.execute(sql) - - # 插入sync_log - sql_query_sync_log = f"SELECT * FROM sync_log WHERE db_path=? AND tbl_name=?" - sync_log = execute_sql(outdb, sql_query_sync_log, (path, table)) - if not sync_log or len(sync_log) < 1: - sql_insert_sync_log = "INSERT INTO sync_log (db_path, tbl_name, src_count, current_count) VALUES (?, ?, ?, ?)" - out_cursor.execute(sql_insert_sync_log, (path, table, 0, 0)) - outdb.commit() - - # 比较源数据库和合并后的数据库记录数 - log_src_count = execute_sql(outdb, sql_query_sync_log, (path, table))[0][3] - src_count = execute_sql(outdb, f"SELECT COUNT(*) FROM {alias}.{table}")[0][0] - if src_count <= log_src_count: - continue - - sql_base = f"SELECT {','.join([i for i in columns])} FROM {alias}.{table} " - # 构建WHERE子句 - where_clauses, params = [], [] - if "CreateTime" in columns: - if startCreateTime > 0: - where_clauses.append("CreateTime > ?") - params.append(startCreateTime) - if endCreateTime > 0: - where_clauses.append("CreateTime < ?") - params.append(endCreateTime) - # 如果有WHERE子句,将其添加到SQL语句中,并添加ORDER BY子句 - sql = f"{sql_base} WHERE {' AND '.join(where_clauses)} ORDER BY CreateTime" if where_clauses else sql_base - src_data = execute_sql(outdb, sql, tuple(params)) - if not src_data or len(src_data) < 1: - continue - # 插入数据 - sql = f"INSERT OR IGNORE INTO {table} ({','.join([i for i in columns])}) VALUES ({','.join(['?'] * len(columns))})" - try: - out_cursor.executemany(sql, src_data) - except Exception as e: - logging.error(f"error: {path}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n", exc_info=True) - # 分离数据库 - sql_detach = f"DETACH DATABASE {alias}" - out_cursor.execute(sql_detach) - outdb.commit() - outdb.close() - return save_path - - -def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0, endCreateTime: int = 0, db_type: List[str] = []) -> ( - bool, str): - """ - 解密合并数据库 msg.db, microMsg.db, media.db,注意:会删除原数据库 - :param wx_path: 微信路径 eg: C:\\*******\\WeChat Files\\wxid_********* - :param key: 解密密钥 - :return: (true,解密后的数据库路径) or (false,错误信息) - """ - from .decryption import batch_decrypt - from .get_wx_info import get_core_db - - outpath = outpath if outpath else "decrypt_merge_tmp" - merge_save_path = os.path.join(outpath, "merge_all.db") - decrypted_path = os.path.join(outpath, "decrypted") - - if not wx_path or not key: - return False, "参数错误" - - # 分割wx_path的文件名和父目录 - msg_dir = os.path.dirname(wx_path) - my_wxid = os.path.basename(wx_path) - db_type_set: set[str] = {"MSG", "MediaMSG", "MicroMsg", "OpenIMContact", "OpenIMMedia", "OpenIMMsg", "Favorite", - "PublicMsg"} - if len(db_type) == 0: - db_type = list(db_type_set) - else: - for i in db_type: - if i not in db_type_set: - return False, f"db_type参数错误, 可用选项 {db_type_set}" - # 解密 - code, wxdbpaths = get_core_db(wx_path, db_type) - if not code: - return False, wxdbpaths - # 判断out_path是否为空目录 - if os.path.exists(decrypted_path) and os.listdir(decrypted_path): - for root, dirs, files in os.walk(decrypted_path, topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - for name in dirs: - os.rmdir(os.path.join(root, name)) - - if not os.path.exists(decrypted_path): - os.makedirs(decrypted_path) - - # 调用 decrypt 函数,并传入参数 # 解密 - code, ret = batch_decrypt(key, wxdbpaths, decrypted_path, False) - if not code: - return False, ret - - out_dbs = [] - for code1, ret1 in ret: - if code1: - out_dbs.append(ret1[1]) - parpare_merge_db_path = [] - for i in out_dbs: - for j in db_type: - if j in i: - parpare_merge_db_path.append(i) - break - de_db_type = [f"de_{i}" for i in db_type] - parpare_merge_db_path = [i for i in out_dbs if any(keyword in i for keyword in de_db_type)] - - merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, startCreateTime=CreateTime, - endCreateTime=endCreateTime) - - return True, merge_save_path - - -def merge_real_time_db(key, merge_path: str, db_paths: [str] or str): - """ - 合并实时数据库消息,暂时只支持64位系统 - :param key: 解密密钥 - :param db_paths: 数据库路径 - :param merge_path: 合并后的数据库路径 - :return: - """ - try: - import platform - except: - raise ImportError("未找到模块 platform") - # 判断系统位数是否为64位,如果不是则抛出异常 - if platform.architecture()[0] != '64bit': - raise Exception("System is not 64-bit.") - - if isinstance(db_paths, str): - db_paths = [db_paths] - - endbs = [] - - for db_path in db_paths: - if not os.path.exists(db_path): - # raise FileNotFoundError("数据库不存在") - continue - if "MSG" not in db_path and "MicroMsg" not in db_path and "MediaMSG" not in db_path: - # raise FileNotFoundError("数据库不是消息数据库") # MicroMsg实时数据库 - continue - endbs.append(db_path) - endbs = '" "'.join(list(set(endbs))) - - merge_path_base = os.path.dirname(merge_path) # 合并后的数据库路径 - - # 获取当前文件夹路径 - current_path = os.path.dirname(__file__) - real_time_exe_path = os.path.join(current_path, "tools", "realTime.exe") - - # 调用cmd命令 - cmd = f'{real_time_exe_path} "{key}" "{merge_path}" "{endbs}"' - # os.system(cmd) - p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=merge_path_base, - creationflags=subprocess.CREATE_NO_WINDOW) - p.communicate() - return True, merge_path - - -def all_merge_real_time_db(key, wx_path, merge_path): - """ - 合并所有实时数据库 - 注:这是全量合并,会有可能产生重复数据,需要自行去重 - :param key: 解密密钥 - :param wx_path: 微信路径 - :param merge_path: 合并后的数据库路径 eg: C:\\*******\\WeChat Files\\wxid_*********\\merge.db - :return: - """ - if not merge_path or not key or not wx_path or not wx_path: - return False, "msg_path or media_path or wx_path or key is required" - try: - from pywxdump import get_core_db - except ImportError: - return False, "未找到模块 pywxdump" - - db_paths = get_core_db(wx_path, ["MediaMSG", "MSG", "MicroMsg"]) - if not db_paths[0]: - return False, db_paths[1] - db_paths = db_paths[1] - merge_real_time_db(key=key, merge_path=merge_path, db_paths=db_paths) - return True, merge_path diff --git a/setup.py b/setup.py index 63c7c4c..f1d6e3b 100644 --- a/setup.py +++ b/setup.py @@ -41,19 +41,19 @@ setup( url="https://github.com/xaoyaoo/PyWxDump", license='MIT', - packages=['pywxdump', 'pywxdump.ui', 'pywxdump.wx_info', 'pywxdump.analyzer', 'pywxdump.api', - 'pywxdump.dbpreprocess', 'pywxdump.dbpreprocess.export'], + packages=['pywxdump', 'pywxdump.ui', 'pywxdump.wx_core', 'pywxdump.analyzer', 'pywxdump.api', + 'pywxdump.db', 'pywxdump.db.export'], package_dir={'pywxdump': 'pywxdump', - 'pywxdump.wx_info': 'pywxdump/wx_info', + 'pywxdump.wx_core': 'pywxdump/wx_core', 'pywxdump.analyzer': 'pywxdump/analyzer', 'pywxdump.ui': 'pywxdump/ui', 'pywxdump.api': 'pywxdump/api', - 'pywxdump.dbpreprocess': 'pywxdump/dbpreprocess', - 'pywxdump.dbpreprocess.export': 'pywxdump/dbpreprocess/export' + 'pywxdump.db': 'pywxdump/db', + 'pywxdump.db.export': 'pywxdump/db/export' }, package_data={ - 'pywxdump': ['version_list.json', 'ui/templates/*', 'ui/web/*', 'ui/web/assets/*', 'wx_info/tools/*', + 'pywxdump': ['WX_OFFS.json', 'ui/templates/*', 'ui/web/*', 'ui/web/assets/*', 'wx_core/tools/*', "ui/export/*", "ui/export/assets/*", "ui/export/assets/css/*", "ui/export/assets/js/*", ] }, diff --git a/tests/build_exe.py b/tests/build_exe.py index e867f06..f4fd79b 100644 --- a/tests/build_exe.py +++ b/tests/build_exe.py @@ -41,7 +41,7 @@ block_cipher = None a = Analysis(['tmp.py'], pathex=[], binaries=[], - datas=[(r'{root_path}\\version_list.json', 'pywxdump'), + datas=[(r'{root_path}\\WX_OFFS.json', 'pywxdump'), (r'{root_path}/ui/templates/chat.html', 'pywxdump/ui/templates'), (r'{root_path}/ui/templates/index.html', 'pywxdump/ui/templates'), {datas_741258} diff --git a/tests/test_Bias.py b/tests/test_Bias.py index d8b2c6d..912e0bc 100644 --- a/tests/test_Bias.py +++ b/tests/test_Bias.py @@ -6,7 +6,7 @@ # Date: 2023/10/15 # ------------------------------------------------------------------------------- import pywxdump -from pywxdump import VERSION_LIST_PATH, VERSION_LIST +from pywxdump import WX_OFFS_PATH, WX_OFFS from pywxdump import BiasAddr from pywxdump.wx_info import read_info @@ -15,7 +15,7 @@ name = '张三' account = 'xxxxxx' key = None # "xxxxxx" db_path = None # "xxxxxx" -vlp = None # VERSION_LIST_PATH +vlp = None # WX_OFFS_PATH # 调用 run 函数,并传入参数 rdata = BiasAddr(account, mobile, name, key, db_path).run(True, vlp) diff --git a/tests/test_decrypt.py b/tests/test_decrypt.py index fe6e506..49e2652 100644 --- a/tests/test_decrypt.py +++ b/tests/test_decrypt.py @@ -6,7 +6,7 @@ # Date: 2023/11/15 # ------------------------------------------------------------------------------- -from pywxdump import VERSION_LIST_PATH, VERSION_LIST +from pywxdump import WX_OFFS_PATH, WX_OFFS from pywxdump import batch_decrypt key = "xxxxxx" # 解密密钥 diff --git a/tests/test_read_info.py b/tests/test_read_info.py index 379d52a..4c8b5fd 100644 --- a/tests/test_read_info.py +++ b/tests/test_read_info.py @@ -7,8 +7,8 @@ # ------------------------------------------------------------------------------- from pywxdump.wx_info import read_info -from pywxdump import VERSION_LIST_PATH, VERSION_LIST +from pywxdump import WX_OFFS_PATH, WX_OFFS def test_read_info(): - result = read_info(VERSION_LIST, is_logging=True) # 读取微信信息 + result = read_info(WX_OFFS, is_logging=True) # 读取微信信息 assert result is not None \ No newline at end of file