2023-10-14 21:48:35 +08:00
|
|
|
# -*- coding: utf-8 -*-#
|
|
|
|
# -------------------------------------------------------------------------------
|
|
|
|
# Name: get_wx_db.py
|
|
|
|
# Description:
|
|
|
|
# Author: xaoyaoo
|
|
|
|
# Date: 2023/10/14
|
|
|
|
# -------------------------------------------------------------------------------
|
|
|
|
import os
|
|
|
|
import re
|
|
|
|
import winreg
|
2023-10-24 18:33:30 +08:00
|
|
|
from typing import List, Union
|
2023-10-14 21:48:35 +08:00
|
|
|
|
2023-11-15 15:04:45 +08:00
|
|
|
|
|
|
|
def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None,
|
|
|
|
is_logging: bool = False):
|
2023-10-14 21:48:35 +08:00
|
|
|
if not msg_dir:
|
|
|
|
try:
|
|
|
|
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
|
|
|
|
value, _ = winreg.QueryValueEx(key, "FileSavePath")
|
|
|
|
winreg.CloseKey(key)
|
|
|
|
w_dir = value
|
|
|
|
except Exception as e:
|
|
|
|
w_dir = "MyDocument:"
|
|
|
|
|
|
|
|
if w_dir == "MyDocument:":
|
|
|
|
profile = os.path.expanduser("~")
|
|
|
|
msg_dir = os.path.join(profile, "Documents", "WeChat Files")
|
|
|
|
else:
|
|
|
|
msg_dir = os.path.join(w_dir, "WeChat Files")
|
|
|
|
|
|
|
|
if not os.path.exists(msg_dir):
|
2023-11-15 15:04:45 +08:00
|
|
|
error = "[-] 目录不存在"
|
|
|
|
if is_logging: print(error)
|
|
|
|
return error
|
2023-10-14 21:48:35 +08:00
|
|
|
|
|
|
|
user_dirs = {} # wx用户目录
|
|
|
|
files = os.listdir(msg_dir)
|
2023-11-15 15:04:45 +08:00
|
|
|
if wxid: # 如果指定wxid
|
|
|
|
if isinstance(wxid, str):
|
|
|
|
wxid = wxid.split(";")
|
|
|
|
for file_name in files:
|
|
|
|
if file_name in wxid:
|
|
|
|
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
|
|
|
|
else: # 如果未指定wxid
|
|
|
|
for file_name in files:
|
|
|
|
if file_name == "All Users" or file_name == "Applet" or file_name == "WMPF":
|
|
|
|
continue
|
|
|
|
user_dirs[os.path.join(msg_dir, file_name)] = os.path.join(msg_dir, file_name)
|
2023-10-14 21:48:35 +08:00
|
|
|
|
|
|
|
if isinstance(require_list, str):
|
|
|
|
require_list = require_list.split(";")
|
|
|
|
|
2023-11-15 15:04:45 +08:00
|
|
|
# generate pattern
|
2023-10-14 21:48:35 +08:00
|
|
|
if "all" in require_list:
|
|
|
|
pattern = {"all": re.compile(r".*\.db$")}
|
|
|
|
elif isinstance(require_list, list):
|
|
|
|
pattern = {}
|
|
|
|
for require in require_list:
|
2023-10-24 16:58:16 +08:00
|
|
|
pattern[require] = re.compile(r"%s.*\.db$" % require)
|
2023-10-14 21:48:35 +08:00
|
|
|
else:
|
2023-11-15 15:04:45 +08:00
|
|
|
error = "[-] 参数错误"
|
|
|
|
if is_logging: print(error)
|
|
|
|
return error
|
2023-10-14 21:48:35 +08:00
|
|
|
|
|
|
|
# 获取数据库路径
|
|
|
|
for user, user_dir in user_dirs.items(): # 遍历用户目录
|
|
|
|
user_dirs[user] = {n: [] for n in pattern.keys()}
|
|
|
|
for root, dirs, files in os.walk(user_dir):
|
|
|
|
for file_name in files:
|
|
|
|
for n, p in pattern.items():
|
|
|
|
if p.match(file_name):
|
|
|
|
src_path = os.path.join(root, file_name)
|
|
|
|
user_dirs[user][n].append(src_path)
|
2023-11-15 15:04:45 +08:00
|
|
|
|
|
|
|
if is_logging:
|
|
|
|
for user, user_dir in user_dirs.items():
|
|
|
|
print(f"[+] user_path: {user}")
|
|
|
|
for n, paths in user_dir.items():
|
|
|
|
print(f" {n}:")
|
|
|
|
for path in paths:
|
|
|
|
print(f" {path.replace(user, '')}")
|
|
|
|
print("-" * 32)
|
|
|
|
print(f"[+] 共 {len(user_dirs)} 个微信账号")
|
|
|
|
|
2023-10-14 21:48:35 +08:00
|
|
|
return user_dirs
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
require_list = ["MediaMSG", "MicroMsg", "FTSMSG", "MSG", "Sns", "Emotion"]
|
|
|
|
# require_list = "all"
|
2023-11-15 15:04:45 +08:00
|
|
|
user_dirs = get_wechat_db(require_list, is_logging=True)
|