This commit is contained in:
xaoyaoo 2024-08-03 00:40:11 +08:00
parent eddff9ae95
commit c3fb774585
12 changed files with 23 additions and 946 deletions

View File

@ -85,7 +85,7 @@ QQ GROUP[276392799](https://s.xaoyo.top/gOLUDl) or [276392799](https://s.xaoy
* Project address: https://github.com/xaoyaoo/PyWxDump
* Currently tested only under Windows, there may be issues under mac and Linux.
* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [version_list.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/version_list.json), please submit an issue on GitHub.
* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [WX_OFFS.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/WX_OFFS.json), please submit an issue on GitHub.
* For common issues, please refer to [FAQ](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/FAQ.md), and for the update log, please refer to [CHANGELOG](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/CHANGELOG.md)
* Web UI repository location [wxdump_web](https://github.com/xaoyaoo/wxdump_web )
* If you are interested in the implementation principle of wxdump, please pay attention to the Official Accounts: `逍遥之芯`, reply: `原理` to get the principle analysis.

View File

@ -8,7 +8,7 @@
- fix 部分图片无法读取
- update README
- fix 多重引用消息显示错误
- Update version_list.json (#110)
- Update WX_OFFS.json (#110)
- Merge branch 'master' of github.com:xaoyaoo/PyWxDump
## v3.0.41
@ -845,7 +845,7 @@
## v2.1.11
- 修改version_list
- 修改WX_OFFS
- 修复3.9.2.*版本无法正常运行
- 添加自动发布到pypi的github action
@ -860,7 +860,7 @@
## v2.1.7
- update version_list
- update WX_OFFS
- 添加自动发布到pypi的github action
- add auto get bias addr ,not need input key or wx folder path.

View File

@ -84,7 +84,7 @@ QQ交流群[276392799](https://s.xaoyo.top/gOLUDl) or [276392799](https://s.x
* 项目地址https://github.com/xaoyaoo/PyWxDump
* 目前只在windows下测试过mac、linux下可能会存在问题。
* 如发现[version_list.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/version_list.json)缺失或错误、bug有改进意见、想要新增功能, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues).
* 如发现[WX_OFFS.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/WX_OFFS.json)缺失或错误、bug有改进意见、想要新增功能, 请提交[issues](https://github.com/xaoyaoo/PyWxDump/issues).
* 常见问题请参考[FAQ](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/FAQ.md),更新日志请参考[CHANGELOG](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/CHANGELOG.md)
* Web UI的仓库位置 [wxdump_web](https://github.com/xaoyaoo/wxdump_web)
* 如果对wxdump实现原理感兴趣请关注公众号`逍遥之芯`,回复:`原理` 获取原理解析。

View File

@ -85,7 +85,7 @@ QQ GROUP[276392799](https://s.xaoyo.top/gOLUDl) or [276392799](https://s.xaoy
* Project address: https://github.com/xaoyaoo/PyWxDump
* Currently tested only under Windows, there may be issues under mac and Linux.
* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [version_list.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/version_list.json), please submit an issue on GitHub.
* If you find any missing or incorrect information, bugs, or suggestions for improvement in the [WX_OFFS.json](https://github.com/xaoyaoo/PyWxDump/tree/master/pywxdump/WX_OFFS.json), please submit an issue on GitHub.
* For common issues, please refer to [FAQ](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/FAQ.md), and for the update log, please refer to [CHANGELOG](https://github.com/xaoyaoo/PyWxDump/tree/master/doc/CHANGELOG.md)
* Web UI repository location [wxdump_web](https://github.com/xaoyaoo/wxdump_web )
* If you are interested in the implementation principle of wxdump, please pay attention to the Official Accounts: `逍遥之芯`, reply: `原理` to get the principle analysis.

View File

@ -93,14 +93,14 @@ wxdump -h # 查看具体帮助
```bash
wxdump bias -h # 查看具体帮助
wxdump bias --mobile <手机号> --name <微信昵称> --account <微信账号> [--key <密钥>] [--db_path <已登录账号的微信文件夹路径>] [--version_list_path <微信版本偏移文件路径>]
wxdump bias --mobile <手机号> --name <微信昵称> --account <微信账号> [--key <密钥>] [--db_path <已登录账号的微信文件夹路径>] [--WX_OFFS_path <微信版本偏移文件路径>]
```
##### 获取微信信息
```bash
wxdump info -h # 查看具体帮助
wxdump info [--version_list_path <微信版本偏移文件路径>]
wxdump info [--WX_OFFS_path <微信版本偏移文件路径>]
```
##### 获取微信文件夹路径
@ -160,13 +160,13 @@ args = {
"account": "微信账号", # 微信账号
"key": "密钥", # 密钥(可选)
"db_path": "已登录账号的微信文件夹路径", # 微信文件夹路径(可选)
"version_list_path": "微信版本偏移文件路径" # 微信版本偏移文件路径(可选)
"WX_OFFS_path": "微信版本偏移文件路径" # 微信版本偏移文件路径(可选)
}
bias_addr = BiasAddr(args["account"], args["mobile"], args["name"], args["key"], args["db_path"])
result = bias_addr.run(True, args["version_list_path"])
result = bias_addr.run(True, args["WX_OFFS_path"])
# ************************************************************************************************ #
# 获取微信信息
wx_info = read_info(VERSION_LIST, True)
wx_info = read_info(WX_OFFS, True)
# 获取微信文件夹路径
args = {

View File

@ -1,455 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: getwxinfo.py
# Description:
# Author: xaoyaoo
# Date: 2023/08/21
# -------------------------------------------------------------------------------
import ctypes
import json
import os
import re
import winreg
from typing import List, Union
from .utils import verify_key, get_exe_version, get_exe_bit, info_error
from .ctypes_utils import get_process_list, get_info_with_key, get_memory_maps, get_process_exe_path, \
get_file_version_info
from .memory_search import search_memory
import ctypes.wintypes as wintypes
# 定义常量
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
OpenProcess = kernel32.OpenProcess
OpenProcess.restype = wintypes.HANDLE
OpenProcess.argtypes = [wintypes.DWORD, wintypes.BOOL, wintypes.DWORD]
CloseHandle = kernel32.CloseHandle
CloseHandle.restype = wintypes.BOOL
CloseHandle.argtypes = [wintypes.HANDLE]
ReadProcessMemory = kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
# 读取内存中的字符串(key部分)
@info_error
def get_info_with_key(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_string = bytes(key).hex()
return key_string
# 读取内存中的字符串(非key部分)
@info_error
def get_info_string(h_process, address, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
text = array.decode('utf-8', errors='ignore')
return text.strip() if text.strip() != "" else "None"
# 读取内存中的字符串(昵称部分name)
@info_error
def get_info_name(h_process, address, address_len=8, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
address1 = int.from_bytes(array[:address_len], byteorder='little') # 逆序转换为int地址key地址
info_name = get_info_string(h_process, address1, n_size)
if info_name != "None":
return info_name
array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
text = array.decode('utf-8', errors='ignore')
return text.strip() if text.strip() != "" else "None"
# 读取内存中的wxid
@info_error
def get_info_wxid(h_process):
find_num = 100
addrs = search_memory(h_process, br'\\Msg\\FTSContact', max_num=find_num)
wxids = []
for addr in addrs:
array = ctypes.create_string_buffer(80)
if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return "None"
array = bytes(array) # .split(b"\\")[0]
array = array.split(b"\\Msg")[0]
array = array.split(b"\\")[-1]
wxids.append(array.decode('utf-8', errors='ignore'))
wxid = max(wxids, key=wxids.count) if wxids else "None"
CloseHandle(h_process)
return wxid
# 读取内存中的filePath基于wxid
@info_error
def get_info_filePath_base_wxid(h_process, wxid=""):
find_num = 10
addrs = search_memory(h_process, wxid.encode() + br'\\Msg\\FTSContact', max_num=find_num)
filePath = []
for addr in addrs:
win_addr_len = 260
array = ctypes.create_string_buffer(win_addr_len)
if ReadProcessMemory(h_process, void_p(addr - win_addr_len + 50), array, win_addr_len, 0) == 0: return "None"
array = bytes(array).split(b"\\Msg")[0]
array = array.split(b"\00")[-1]
filePath.append(array.decode('utf-8', errors='ignore'))
filePath = max(filePath, key=filePath.count) if filePath else "None"
CloseHandle(h_process)
return filePath
@info_error
def get_info_filePath(wxid="all"):
"""
# 读取filePath (微信文件路径) (快)
:param wxid: 微信id
:return: 返回filePath
"""
if not wxid:
return "None"
w_dir = "MyDocument:"
is_w_dir = False
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if not is_w_dir:
try:
user_profile = os.environ.get("USERPROFILE")
path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users", "config",
"3ebffe94.ini")
with open(path_3ebffe94, "r", encoding="utf-8") as f:
w_dir = f.read()
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if w_dir == "MyDocument:":
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
# print(1, w_dir)
else:
w_dir = documents_path
except Exception as e:
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
if wxid == "all" and os.path.exists(msg_dir):
return msg_dir
filePath = os.path.join(msg_dir, wxid)
return filePath if os.path.exists(filePath) else "None"
@info_error
def get_key(pid, db_path, addr_len):
"""
获取key
:param pid: 进程id
:param db_path: 微信数据库路径
:param addr_len: 地址长度
:return: 返回key
"""
def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址key地址
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_bytes = bytes(key)
return key_bytes
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
phone_type3 = "ipad\x00"
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
start_adress = 0
end_adress = 0x7FFFFFFFFFFFFFFF
memory_maps = get_memory_maps(pid)
for module in memory_maps:
if module.FileName and 'WeChatWin.dll' in module.FileName:
start_adress = module.BaseAddress
end_adress = module.BaseAddress + module.RegionSize
break
# print(start_adress, end_adress)
hProcess = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
type1_addrs = search_memory(hProcess, phone_type1.encode(), max_num=2, start_address=start_adress,
end_address=end_adress)
type2_addrs = search_memory(hProcess, phone_type2.encode(), max_num=2, start_address=start_adress,
end_address=end_adress)
type3_addrs = search_memory(hProcess, phone_type3.encode(), max_num=2, start_address=start_adress,
end_address=end_adress)
type_addrs = []
if len(type1_addrs) >= 2: type_addrs += type1_addrs
if len(type2_addrs) >= 2: type_addrs += type2_addrs
if len(type3_addrs) >= 2: type_addrs += type3_addrs
if len(type_addrs) == 0: return "None"
type_addrs.sort() # 从小到大排序
for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(hProcess, j, addr_len)
if key_bytes == "None":
continue
if verify_key(key_bytes, MicroMsg_path):
return key_bytes.hex()
return "None"
def get_details(pid, version_list: dict = None, is_logging: bool = False):
path = get_process_exe_path(pid)
rd = {'pid': pid, 'version': get_file_version_info(path),
"account": "None", "mobile": "None", "name": "None", "mail": "None",
"wxid": "None", "key": "None", "filePath": "None"}
try:
bias_list = version_list.get(rd['version'], None)
Handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
addrLen = get_exe_bit(path) // 8
if not isinstance(bias_list, list) or len(bias_list) <= 4:
error = f"[-] WeChat Current Version Is Not Supported(maybe not get account,mobile,name,mail)"
if is_logging: print(error)
else:
wechat_base_address = 0
memory_maps = get_memory_maps(pid)
for module in memory_maps:
if module.FileName and 'WeChatWin.dll' in module.FileName:
wechat_base_address = module.BaseAddress
rd['version'] = get_file_version_info(module.FileName) if os.path.exists(module.FileName) else rd[
'version']
bias_list = version_list.get(rd['version'], None)
break
if wechat_base_address == 0:
error = f"[-] WeChat WeChatWin.dll Not Found"
if is_logging: print(error)
name_baseaddr = wechat_base_address + bias_list[0]
account_baseaddr = wechat_base_address + bias_list[1]
mobile_baseaddr = wechat_base_address + bias_list[2]
mail_baseaddr = wechat_base_address + bias_list[3]
key_baseaddr = wechat_base_address + bias_list[4]
rd['account'] = get_info_string(Handle, account_baseaddr, 32) if bias_list[1] != 0 else "None"
rd['mobile'] = get_info_string(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None"
rd['name'] = get_info_name(Handle, name_baseaddr, addrLen, 64) if bias_list[0] != 0 else "None"
rd['mail'] = get_info_string(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None"
rd['key'] = get_info_with_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None"
rd['wxid'] = get_info_wxid(Handle)
rd['filePath'] = get_info_filePath(rd['wxid']) if rd['wxid'] != "None" else "None"
if rd['wxid'] != "None" and rd['filePath'] == "None": # 通过wxid获取filePath,如果filePath为空则通过wxid获取filePath
rd['filePath'] = get_info_filePath_base_wxid(Handle, wxid=rd['wxid'])
isKey = verify_key(
bytes.fromhex(rd["key"]),
os.path.join(rd['filePath'], "MSG", "MicroMsg.db")) if rd['key'] != "None" and rd[
'filePath'] != "None" else False
if rd['filePath'] != "None" and rd['key'] == "None" and not isKey:
rd['key'] = get_key(rd['pid'], rd['filePath'], addrLen)
CloseHandle(Handle)
except Exception as e:
error = f"[-] WeChat Get Info Error:{e}"
if is_logging: print(error)
return rd
# 读取微信信息(account,mobile,name,mail,wxid,key)
def read_info(version_list: dict = None, is_logging: bool = False, save_path: str = None):
"""
读取微信信息(account,mobile,name,mail,wxid,key)
:param version_list: 版本偏移量
:param is_logging: 是否打印日志
:param save_path: 保存路径
:return: 返回微信信息 [{"pid": pid, "version": version, "account": account,
"mobile": mobile, "name": name, "mail": mail, "wxid": wxid,
"key": key, "filePath": filePath}, ...]
"""
if version_list is None:
version_list = {}
wechat_pids = []
result = []
error = ""
processes = get_process_list()
for pid, name in processes:
if name == "WeChat.exe":
wechat_pids.append(pid)
if len(wechat_pids) <= 0:
error = "[-] WeChat No Run"
if is_logging: print(error)
return error
for pid in wechat_pids:
rd = get_details(pid, version_list, is_logging)
result.append(rd)
if is_logging:
print("=" * 32)
if isinstance(result, str): # 输出报错
print(result)
else: # 输出结果
for i, rlt in enumerate(result):
for k, v in rlt.items():
print(f"[+] {k:>8}: {v}")
print(end="-" * 32 + "\n" if i != len(result) - 1 else "")
print("=" * 32)
if save_path:
try:
infos = json.load(open(save_path, "r", encoding="utf-8")) if os.path.exists(save_path) else []
except:
infos = []
with open(save_path, "w", encoding="utf-8") as f:
infos += result
json.dump(infos, f, ensure_ascii=False, indent=4)
return result
def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None,
is_logging: bool = False, is_return_list: bool = False) -> Union[str, dict, list]:
r"""
获取微信数据库路径
:param require_list: 需要获取的数据库类型
:param msg_dir: 微信数据库目录 eg: C:\Users\user\Documents\WeChat Files
:param wxid: 微信id
:param is_logging: 是否打印日志
:return:
"""
if not msg_dir:
msg_dir = get_info_filePath(wxid="all")
if not os.path.exists(msg_dir):
error = f"[-] 目录不存在: {msg_dir}"
if is_logging: print(error)
return error
user_dirs = {} # wx用户目录
files = os.listdir(msg_dir)
if wxid: # 如果指定wxid
if isinstance(wxid, str):
wxid = wxid.split(";")
for file_name in files:
if file_name in wxid:
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
else: # 如果未指定wxid
for file_name in files:
if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF":
continue
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
if isinstance(require_list, str):
require_list = require_list.split(";")
# generate pattern
if "all" in require_list:
pattern = {"all": re.compile(r".*\.db$")}
elif isinstance(require_list, list):
pattern = {}
for require in require_list:
pattern[require] = re.compile(r"%s.*?\.db$" % require)
else:
error = f"[-] 参数错误: {require_list}"
if is_logging: print(error)
return error
if is_return_list: # 如果返回列表,返回值:{wxid:[db_path1,db_path2]}
db_list = {}
# 获取数据库路径
for user, user_dir in user_dirs.items(): # 遍历用户目录
db_list[user] = []
for root, dirs, files in os.walk(user_dir):
for file_name in files:
for n, p in pattern.items():
if p.match(file_name):
src_path = os.path.join(root, file_name)
db_list[user].append(src_path)
return db_list
# 获取数据库路径
for user, user_dir in user_dirs.items(): # 遍历用户目录
user_dirs[user] = {n: [] for n in pattern.keys()}
for root, dirs, files in os.walk(user_dir):
for file_name in files:
for n, p in pattern.items():
if p.match(file_name):
src_path = os.path.join(root, file_name)
user_dirs[user][n].append(src_path)
if is_logging:
for user, user_dir in user_dirs.items():
print(f"[+] user_path: {user}")
for n, paths in user_dir.items():
print(f" {n}:")
for path in paths:
print(f" {path.replace(user, '')}")
print("-" * 32)
print(f"[+] 共 {len(user_dirs)} 个微信账号")
return user_dirs
def get_core_db(wx_path: str, db_type: list = None) -> [str]:
"""
获取聊天消息核心数据库路径
:param wx_path: 微信文件夹路径 egC:\*****\WeChat Files\wxid*******
:param db_type: 数据库类型 eg: ["MSG", "MediaMSG", "MicroMsg"]三个中选择一个或多个
:return: 返回数据库路径 eg:["",""]
"""
if not os.path.exists(wx_path):
return False, f"[-] 目录不存在: {wx_path}"
db_type_all = ["MSG", "MediaMSG", "MicroMsg", "OpenIMContact", "OpenIMMedia", "OpenIMMsg", "Favorite", "PublicMsg"]
if not db_type:
db_type = db_type_all
db_type = [dt for dt in db_type if dt in db_type_all]
msg_dir = os.path.dirname(wx_path)
my_wxid = os.path.basename(wx_path)
WxDbPath = get_wechat_db(db_type, msg_dir, wxid=my_wxid, is_logging=False, is_return_list=True) # 获取微信数据库路径
if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错
return False, WxDbPath
wxdbpaths = WxDbPath.get(wx_path, [])
if len(wxdbpaths) == 0:
return False, "未获取到数据库路径"
return True, wxdbpaths
if __name__ == '__main__':
from pywxdump import VERSION_LIST
read_info(VERSION_LIST, is_logging=True)

View File

@ -1,468 +0,0 @@
# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: merge_db.py
# Description:
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
import logging
import os
import random
import shutil
import sqlite3
import subprocess
import time
from typing import List
def merge_copy_db(db_path, save_path):
logging.warning("merge_copy_db is deprecated, use merge_db instead, will be removed in the future.")
if isinstance(db_path, list) and len(db_path) == 1:
db_path = db_path[0]
if not os.path.exists(db_path):
raise FileNotFoundError("目录不存在")
shutil.move(db_path, save_path)
# 合并相同名称的数据库 MSG0-MSG9.db
def merge_msg_db(db_path: list, save_path: str, CreateTime: int = 0): # CreateTime: 从这个时间开始的消息 10位时间戳
logging.warning("merge_msg_db is deprecated, use merge_db instead, will be removed in the future.")
# 判断save_path是否为文件夹
if os.path.isdir(save_path):
save_path = os.path.join(save_path, "merge_MSG.db")
merged_conn = sqlite3.connect(save_path)
merged_cursor = merged_conn.cursor()
for db_file in db_path:
c_tabels = merged_cursor.execute(
"select tbl_name from sqlite_master where type='table' and tbl_name!='sqlite_sequence'")
tabels_all = c_tabels.fetchall() # 所有表名
tabels_all = [row[0] for row in tabels_all]
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# 创建表
if len(tabels_all) < 4:
cursor.execute(
"select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'")
c_part = cursor.fetchall()
for tbl_name, sql in c_part:
if tbl_name in tabels_all:
continue
try:
merged_cursor.execute(sql)
tabels_all.append(tbl_name)
except Exception as e:
print(f"error: {db_file}\n{tbl_name}\n{sql}\n{e}\n**********")
raise e
merged_conn.commit()
# 写入数据
for tbl_name in tabels_all:
if tbl_name == "MSG":
MsgSvrIDs = merged_cursor.execute(
f"select MsgSvrID from MSG where CreateTime>{CreateTime} and MsgSvrID!=0").fetchall()
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
columns = [column[1] for column in columns[1:]]
ex_sql = f"select {','.join(columns)} from {tbl_name} where CreateTime>{CreateTime} and MsgSvrID not in ({','.join([str(MsgSvrID[0]) for MsgSvrID in MsgSvrIDs])})"
cursor.execute(ex_sql)
insert_sql = f"INSERT INTO {tbl_name} ({','.join(columns)}) VALUES ({','.join(['?' for _ in range(len(columns))])})"
try:
merged_cursor.executemany(insert_sql, cursor.fetchall())
except Exception as e:
print(
f"error: {db_file}\n{tbl_name}\n{insert_sql}\n{cursor.fetchall()}\n{len(cursor.fetchall())}\n{e}\n**********")
raise e
merged_conn.commit()
else:
ex_sql = f"select * from {tbl_name}"
cursor.execute(ex_sql)
for r in cursor.fetchall():
cursor.execute(f"PRAGMA table_info({tbl_name})")
columns = cursor.fetchall()
if len(columns) > 1:
columns = [column[1] for column in columns[1:]]
values = r[1:]
else:
columns = [columns[0][1]]
values = [r[0]]
query_1 = "select * from " + tbl_name + " where " + columns[0] + "=?" # 查询语句 用于判断是否存在
c2 = merged_cursor.execute(query_1, values)
if len(c2.fetchall()) > 0: # 已存在
continue
query = "INSERT INTO " + tbl_name + " (" + ",".join(columns) + ") VALUES (" + ",".join(
["?" for _ in range(len(values))]) + ")"
try:
merged_cursor.execute(query, values)
except Exception as e:
print(f"error: {db_file}\n{tbl_name}\n{query}\n{values}\n{len(values)}\n{e}\n**********")
raise e
merged_conn.commit()
conn.close()
sql = '''delete from MSG where localId in (SELECT localId from MSG
where MsgSvrID != 0 and MsgSvrID in (select MsgSvrID from MSG
where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1)
and localId not in (select min(localId) from MSG
where MsgSvrID != 0 GROUP BY MsgSvrID HAVING COUNT(*) > 1))'''
c = merged_cursor.execute(sql)
merged_conn.commit()
merged_conn.close()
return save_path
def merge_media_msg_db(db_path: list, save_path: str):
logging.warning("merge_media_msg_db is deprecated, use merge_db instead, will be removed in the future.")
# 判断save_path是否为文件夹
if os.path.isdir(save_path):
save_path = os.path.join(save_path, "merge_Media.db")
merged_conn = sqlite3.connect(save_path)
merged_cursor = merged_conn.cursor()
for db_file in db_path:
s = "select tbl_name,sql from sqlite_master where type='table' and tbl_name!='sqlite_sequence'"
have_tables = merged_cursor.execute(s).fetchall()
have_tables = [row[0] for row in have_tables]
conn_part = sqlite3.connect(db_file)
cursor = conn_part.cursor()
if len(have_tables) < 1:
cursor.execute(s)
table_part = cursor.fetchall()
tblname, sql = table_part[0]
sql = "CREATE TABLE Media(localId INTEGER PRIMARY KEY AUTOINCREMENT,Key TEXT,Reserved0 INT,Buf BLOB,Reserved1 INT,Reserved2 TEXT)"
try:
merged_cursor.execute(sql)
have_tables.append(tblname)
except Exception as e:
print(f"error: {db_file}\n{tblname}\n{sql}\n{e}\n**********")
raise e
merged_conn.commit()
for tblname in have_tables:
s = "select Reserved0 from " + tblname
merged_cursor.execute(s)
r0 = merged_cursor.fetchall()
ex_sql = f"select `Key`,Reserved0,Buf,Reserved1,Reserved2 from {tblname} where Reserved0 not in ({','.join([str(r[0]) for r in r0])})"
cursor.execute(ex_sql)
data = cursor.fetchall()
insert_sql = f"INSERT INTO {tblname} (Key,Reserved0,Buf,Reserved1,Reserved2) VALUES ({','.join(['?' for _ in range(5)])})"
try:
merged_cursor.executemany(insert_sql, data)
except Exception as e:
print(f"error: {db_file}\n{tblname}\n{insert_sql}\n{data}\n{len(data)}\n{e}\n**********")
raise e
merged_conn.commit()
conn_part.close()
merged_conn.close()
return save_path
def execute_sql(connection, sql, params=None):
"""
执行给定的SQL语句返回结果
参数
- connection SQLite连接
- sql要执行的SQL语句
- paramsSQL语句中的参数
"""
try:
# connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.fetchall()
except Exception as e:
try:
connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
rdata = cursor.fetchall()
connection.text_factory = str
return rdata
except Exception as e:
logging.error(f"**********\nSQL: {sql}\nparams: {params}\n{e}\n**********", exc_info=True)
return None
def merge_db(db_paths, save_path="merge.db", startCreateTime: int = 0, endCreateTime: int = 0):
"""
合并数据库 会忽略主键以及重复的行
:param db_paths:
:param save_path:
:param CreateTime:
:return:
"""
if os.path.isdir(save_path):
save_path = os.path.join(save_path, f"merge_{int(time.time())}.db")
_db_paths = []
if isinstance(db_paths, str):
if os.path.isdir(db_paths):
_db_paths = [os.path.join(db_paths, i) for i in os.listdir(db_paths) if i.endswith(".db")]
elif os.path.isfile(db_paths):
_db_paths = [db_paths]
else:
raise FileNotFoundError("db_paths 不存在")
if isinstance(db_paths, list):
# alias, file_path
databases = {f"MSG{i}": db_path for i, db_path in enumerate(db_paths)}
else:
raise TypeError("db_paths 类型错误")
outdb = sqlite3.connect(save_path)
out_cursor = outdb.cursor()
# 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间
sync_log_status = execute_sql(outdb, "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_log'")
if len(sync_log_status) < 1:
# db_path 微信数据库路径tbl_name 表名src_count 源数据库记录数current_count 当前合并后的数据库对应表记录数
sync_record_create_sql = ("CREATE TABLE sync_log ("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"db_path TEXT NOT NULL,"
"tbl_name TEXT NOT NULL,"
"src_count INT,"
"current_count INT,"
"createTime INT DEFAULT (strftime('%s', 'now')), "
"updateTime INT DEFAULT (strftime('%s', 'now'))"
");")
out_cursor.execute(sync_record_create_sql)
# 创建索引
out_cursor.execute("CREATE INDEX idx_sync_log_db_path ON sync_log (db_path);")
out_cursor.execute("CREATE INDEX idx_sync_log_tbl_name ON sync_log (tbl_name);")
# 创建联合索引,防止重复
out_cursor.execute("CREATE UNIQUE INDEX idx_sync_log_db_tbl ON sync_log (db_path, tbl_name);")
outdb.commit()
# 将MSG_db_paths中的数据合并到out_db_path中
for alias, path in databases.items():
# 附加数据库
sql_attach = f"ATTACH DATABASE '{path}' AS {alias}"
out_cursor.execute(sql_attach)
outdb.commit()
sql_query_tbl_name = f"SELECT name FROM {alias}.sqlite_master WHERE type='table' ORDER BY name;"
tables = execute_sql(outdb, sql_query_tbl_name)
for table in tables:
table = table[0]
if table == "sqlite_sequence":
continue
# 获取表中的字段名
sql_query_columns = f"PRAGMA table_info({table})"
columns = execute_sql(outdb, sql_query_columns)
col_type = {
(i[1] if isinstance(i[1], str) else i[1].decode(),
i[2] if isinstance(i[2], str) else i[2].decode())
for i in columns}
columns = [i[0] for i in col_type]
if not columns or len(columns) < 1:
continue
# 创建表table
sql_create_tbl = f"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM {alias}.{table} WHERE 0 = 1;"
out_cursor.execute(sql_create_tbl)
# 创建包含 NULL 值比较的 UNIQUE 索引
index_name = f"{table}_unique_index"
coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns)
sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})"
out_cursor.execute(sql)
# 插入sync_log
sql_query_sync_log = f"SELECT * FROM sync_log WHERE db_path=? AND tbl_name=?"
sync_log = execute_sql(outdb, sql_query_sync_log, (path, table))
if not sync_log or len(sync_log) < 1:
sql_insert_sync_log = "INSERT INTO sync_log (db_path, tbl_name, src_count, current_count) VALUES (?, ?, ?, ?)"
out_cursor.execute(sql_insert_sync_log, (path, table, 0, 0))
outdb.commit()
# 比较源数据库和合并后的数据库记录数
log_src_count = execute_sql(outdb, sql_query_sync_log, (path, table))[0][3]
src_count = execute_sql(outdb, f"SELECT COUNT(*) FROM {alias}.{table}")[0][0]
if src_count <= log_src_count:
continue
sql_base = f"SELECT {','.join([i for i in columns])} FROM {alias}.{table} "
# 构建WHERE子句
where_clauses, params = [], []
if "CreateTime" in columns:
if startCreateTime > 0:
where_clauses.append("CreateTime > ?")
params.append(startCreateTime)
if endCreateTime > 0:
where_clauses.append("CreateTime < ?")
params.append(endCreateTime)
# 如果有WHERE子句将其添加到SQL语句中并添加ORDER BY子句
sql = f"{sql_base} WHERE {' AND '.join(where_clauses)} ORDER BY CreateTime" if where_clauses else sql_base
src_data = execute_sql(outdb, sql, tuple(params))
if not src_data or len(src_data) < 1:
continue
# 插入数据
sql = f"INSERT OR IGNORE INTO {table} ({','.join([i for i in columns])}) VALUES ({','.join(['?'] * len(columns))})"
try:
out_cursor.executemany(sql, src_data)
except Exception as e:
logging.error(f"error: {path}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n", exc_info=True)
# 分离数据库
sql_detach = f"DETACH DATABASE {alias}"
out_cursor.execute(sql_detach)
outdb.commit()
outdb.close()
return save_path
def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0, endCreateTime: int = 0, db_type: List[str] = []) -> (
bool, str):
"""
解密合并数据库 msg.db, microMsg.db, media.db,注意会删除原数据库
:param wx_path: 微信路径 eg: C:\\*******\\WeChat Files\\wxid_*********
:param key: 解密密钥
:return: (true,解密后的数据库路径) or (false,错误信息)
"""
from .decryption import batch_decrypt
from .get_wx_info import get_core_db
outpath = outpath if outpath else "decrypt_merge_tmp"
merge_save_path = os.path.join(outpath, "merge_all.db")
decrypted_path = os.path.join(outpath, "decrypted")
if not wx_path or not key:
return False, "参数错误"
# 分割wx_path的文件名和父目录
msg_dir = os.path.dirname(wx_path)
my_wxid = os.path.basename(wx_path)
db_type_set: set[str] = {"MSG", "MediaMSG", "MicroMsg", "OpenIMContact", "OpenIMMedia", "OpenIMMsg", "Favorite",
"PublicMsg"}
if len(db_type) == 0:
db_type = list(db_type_set)
else:
for i in db_type:
if i not in db_type_set:
return False, f"db_type参数错误, 可用选项 {db_type_set}"
# 解密
code, wxdbpaths = get_core_db(wx_path, db_type)
if not code:
return False, wxdbpaths
# 判断out_path是否为空目录
if os.path.exists(decrypted_path) and os.listdir(decrypted_path):
for root, dirs, files in os.walk(decrypted_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
if not os.path.exists(decrypted_path):
os.makedirs(decrypted_path)
# 调用 decrypt 函数,并传入参数 # 解密
code, ret = batch_decrypt(key, wxdbpaths, decrypted_path, False)
if not code:
return False, ret
out_dbs = []
for code1, ret1 in ret:
if code1:
out_dbs.append(ret1[1])
parpare_merge_db_path = []
for i in out_dbs:
for j in db_type:
if j in i:
parpare_merge_db_path.append(i)
break
de_db_type = [f"de_{i}" for i in db_type]
parpare_merge_db_path = [i for i in out_dbs if any(keyword in i for keyword in de_db_type)]
merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, startCreateTime=CreateTime,
endCreateTime=endCreateTime)
return True, merge_save_path
def merge_real_time_db(key, merge_path: str, db_paths: [str] or str):
"""
合并实时数据库消息,暂时只支持64位系统
:param key: 解密密钥
:param db_paths: 数据库路径
:param merge_path: 合并后的数据库路径
:return:
"""
try:
import platform
except:
raise ImportError("未找到模块 platform")
# 判断系统位数是否为64位如果不是则抛出异常
if platform.architecture()[0] != '64bit':
raise Exception("System is not 64-bit.")
if isinstance(db_paths, str):
db_paths = [db_paths]
endbs = []
for db_path in db_paths:
if not os.path.exists(db_path):
# raise FileNotFoundError("数据库不存在")
continue
if "MSG" not in db_path and "MicroMsg" not in db_path and "MediaMSG" not in db_path:
# raise FileNotFoundError("数据库不是消息数据库") # MicroMsg实时数据库
continue
endbs.append(db_path)
endbs = '" "'.join(list(set(endbs)))
merge_path_base = os.path.dirname(merge_path) # 合并后的数据库路径
# 获取当前文件夹路径
current_path = os.path.dirname(__file__)
real_time_exe_path = os.path.join(current_path, "tools", "realTime.exe")
# 调用cmd命令
cmd = f'{real_time_exe_path} "{key}" "{merge_path}" "{endbs}"'
# os.system(cmd)
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=merge_path_base,
creationflags=subprocess.CREATE_NO_WINDOW)
p.communicate()
return True, merge_path
def all_merge_real_time_db(key, wx_path, merge_path):
"""
合并所有实时数据库
这是全量合并会有可能产生重复数据需要自行去重
:param key: 解密密钥
:param wx_path: 微信路径
:param merge_path: 合并后的数据库路径 eg: C:\\*******\\WeChat Files\\wxid_*********\\merge.db
:return:
"""
if not merge_path or not key or not wx_path or not wx_path:
return False, "msg_path or media_path or wx_path or key is required"
try:
from pywxdump import get_core_db
except ImportError:
return False, "未找到模块 pywxdump"
db_paths = get_core_db(wx_path, ["MediaMSG", "MSG", "MicroMsg"])
if not db_paths[0]:
return False, db_paths[1]
db_paths = db_paths[1]
merge_real_time_db(key=key, merge_path=merge_path, db_paths=db_paths)
return True, merge_path

View File

@ -41,19 +41,19 @@ setup(
url="https://github.com/xaoyaoo/PyWxDump",
license='MIT',
packages=['pywxdump', 'pywxdump.ui', 'pywxdump.wx_info', 'pywxdump.analyzer', 'pywxdump.api',
'pywxdump.dbpreprocess', 'pywxdump.dbpreprocess.export'],
packages=['pywxdump', 'pywxdump.ui', 'pywxdump.wx_core', 'pywxdump.analyzer', 'pywxdump.api',
'pywxdump.db', 'pywxdump.db.export'],
package_dir={'pywxdump': 'pywxdump',
'pywxdump.wx_info': 'pywxdump/wx_info',
'pywxdump.wx_core': 'pywxdump/wx_core',
'pywxdump.analyzer': 'pywxdump/analyzer',
'pywxdump.ui': 'pywxdump/ui',
'pywxdump.api': 'pywxdump/api',
'pywxdump.dbpreprocess': 'pywxdump/dbpreprocess',
'pywxdump.dbpreprocess.export': 'pywxdump/dbpreprocess/export'
'pywxdump.db': 'pywxdump/db',
'pywxdump.db.export': 'pywxdump/db/export'
},
package_data={
'pywxdump': ['version_list.json', 'ui/templates/*', 'ui/web/*', 'ui/web/assets/*', 'wx_info/tools/*',
'pywxdump': ['WX_OFFS.json', 'ui/templates/*', 'ui/web/*', 'ui/web/assets/*', 'wx_core/tools/*',
"ui/export/*", "ui/export/assets/*", "ui/export/assets/css/*", "ui/export/assets/js/*",
]
},

View File

@ -41,7 +41,7 @@ block_cipher = None
a = Analysis(['tmp.py'],
pathex=[],
binaries=[],
datas=[(r'{root_path}\\version_list.json', 'pywxdump'),
datas=[(r'{root_path}\\WX_OFFS.json', 'pywxdump'),
(r'{root_path}/ui/templates/chat.html', 'pywxdump/ui/templates'),
(r'{root_path}/ui/templates/index.html', 'pywxdump/ui/templates'),
{datas_741258}

View File

@ -6,7 +6,7 @@
# Date: 2023/10/15
# -------------------------------------------------------------------------------
import pywxdump
from pywxdump import VERSION_LIST_PATH, VERSION_LIST
from pywxdump import WX_OFFS_PATH, WX_OFFS
from pywxdump import BiasAddr
from pywxdump.wx_info import read_info
@ -15,7 +15,7 @@ name = '张三'
account = 'xxxxxx'
key = None # "xxxxxx"
db_path = None # "xxxxxx"
vlp = None # VERSION_LIST_PATH
vlp = None # WX_OFFS_PATH
# 调用 run 函数,并传入参数
rdata = BiasAddr(account, mobile, name, key, db_path).run(True, vlp)

View File

@ -6,7 +6,7 @@
# Date: 2023/11/15
# -------------------------------------------------------------------------------
from pywxdump import VERSION_LIST_PATH, VERSION_LIST
from pywxdump import WX_OFFS_PATH, WX_OFFS
from pywxdump import batch_decrypt
key = "xxxxxx" # 解密密钥

View File

@ -7,8 +7,8 @@
# -------------------------------------------------------------------------------
from pywxdump.wx_info import read_info
from pywxdump import VERSION_LIST_PATH, VERSION_LIST
from pywxdump import WX_OFFS_PATH, WX_OFFS
def test_read_info():
result = read_info(VERSION_LIST, is_logging=True) # 读取微信信息
result = read_info(WX_OFFS, is_logging=True) # 读取微信信息
assert result is not None