PyWxDump/pywxdump/cli.py

388 lines
16 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: main.py.py
# Description:
# Author: xaoyaoo
# Date: 2023/10/14
# -------------------------------------------------------------------------------
import argparse
2024-08-17 11:51:57 +08:00
import os
import sys
2024-08-17 14:04:03 +08:00
import json
from pywxdump import *
2023-12-18 22:36:11 +08:00
import pywxdump
2023-11-30 17:50:17 +08:00
wxdump_ascii = r"""
"""
PYWXDUMP_VERSION = pywxdump.__version__
models = {}
def create_parser():
class CustomArgumentParser(argparse.ArgumentParser):
def format_help(self):
# 首先显示软件简介
# 定义软件简介文本并进行格式化
line_len = 70
PYWXDUMP_VERSION = pywxdump.__version__
wxdump_line = '\n'.join([f'\033[36m{line:^{line_len}}\033[0m' for line in wxdump_ascii.split('\n') if line])
first_line = f'\033[36m{" PyWxDump v" + PYWXDUMP_VERSION + " ":=^{line_len}}\033[0m'
brief = 'PyWxDump功能获取账号信息、解密数据库、查看聊天记录、导出聊天记录为html等'
other = '更多详情请查看: \033[4m\033[1mhttps://github.com/xaoyaoo/PyWxDump\033[0m'
separator = f'\033[36m{" options ":-^{line_len}}\033[0m'
# 获取帮助信息并添加到软件简介下方
help_text = super().format_help().strip()
return f'\n{wxdump_line}\n\n{first_line}\n{brief}\n{separator}\n{help_text}\n{separator}\n{other}\n{first_line}\n'
# 创建命令行参数解析器
parser = CustomArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-V', '--version', action='version', version=f"PyWxDump v{PYWXDUMP_VERSION}")
# 添加子命令解析器
subparsers = parser.add_subparsers(dest="mode", help="""运行模式:""", required=True, metavar="mode")
return parser, subparsers
main_parser, sub_parsers = create_parser()
class SubMainMetaclass(type):
def is_implemented_method(cls, name: str, method: str):
if not hasattr(cls, method) or not callable(getattr(cls, method)):
raise NotImplementedError("{} NotImplemented [{}]".format(name, method))
def __init__(cls, name, bases, kwargs):
super(SubMainMetaclass, cls).__init__(name, bases, kwargs)
if name in ["BaseSubMainClass"]:
return
mode = getattr(cls, "mode")
if mode in models:
raise TypeError("mode[{}] is used...".format(mode))
cls.is_implemented_method(name, "init_parses")
cls.is_implemented_method(name, "run")
c = cls()
models[mode] = c
c.init_parses(sub_parsers.add_parser(mode, **getattr(c, "parser_kwargs")))
class BaseSubMainClass(metaclass=SubMainMetaclass):
parser_kwargs = {}
@property
def mode(self) -> str:
raise NotImplementedError()
def init_parses(self, parser):
raise NotImplementedError()
def run(self, args: argparse.Namespace):
raise NotImplementedError()
class MainBiasAddr(BaseSubMainClass):
mode = "bias"
parser_kwargs = {"help": "获取微信基址偏移"}
def init_parses(self, parser):
# 添加 'bias_addr' 子命令解析器
parser.add_argument("--mobile", type=str, help="手机号", metavar="", required=True)
parser.add_argument("--name", type=str, help="微信昵称", metavar="", required=True)
parser.add_argument("--account", type=str, help="微信账号", metavar="", required=True)
parser.add_argument("--key", type=str, metavar="", help="(可选)密钥")
parser.add_argument("--db_path", type=str, metavar="", help="(可选)已登录账号的微信文件夹路径")
parser.add_argument("-vlp", '--WX_OFFS_PATH', type=str, metavar="",
help="(可选)微信版本偏移文件路径,如有,则自动更新",
default=None)
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
# 从命令行参数获取值
mobile = args.mobile
name = args.name
account = args.account
key = args.key
db_path = args.db_path
vlp = args.WX_OFFS_PATH
# 调用 run 函数,并传入参数
rdata = BiasAddr(account, mobile, name, key, db_path).run(True, vlp)
return rdata
class MainWxInfo(BaseSubMainClass):
mode = "info"
parser_kwargs = {"help": "获取微信信息"}
def init_parses(self, parser):
# 添加 'wx_info' 子命令解析器
parser.add_argument("-vlp", '--WX_OFFS_PATH', metavar="", type=str,
help="(可选)微信版本偏移文件路径", default=WX_OFFS_PATH)
parser.add_argument("-s", '--save_path', metavar="", type=str, help="(可选)保存路径【json文件】")
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
# 读取微信各版本偏移
path = args.WX_OFFS_PATH
2023-12-20 11:07:17 +08:00
save_path = args.save_path
WX_OFFS = json.load(open(path, "r", encoding="utf-8"))
result = get_wx_info(WX_OFFS, True, save_path) # 读取微信信息
return result
class MainWxDbPath(BaseSubMainClass):
2024-04-20 18:55:53 +08:00
mode = "wx_path"
parser_kwargs = {"help": "获取微信文件夹路径"}
def init_parses(self, parser):
# 添加 'wx_db_path' 子命令解析器
parser.add_argument("-r", "--db_types", type=str,
help="(可选)需要的数据库名称(eg: -r MediaMSG;MicroMsg;FTSMSG;MSG;Sns;Emotion )",
2024-08-04 21:40:02 +08:00
default=None, metavar="")
parser.add_argument("-wf", "--wx_files", type=str, help="(可选)'WeChat Files'路径", default=None,
metavar="")
parser.add_argument("-id", "--wxid", type=str, help="(可选)wxid_,用于确认用户文件夹",
default=None, metavar="")
return parser
def run(self, args):
2024-08-04 21:40:02 +08:00
print(f"[*] PyWxDump v{pywxdump.__version__}")
# 从命令行参数获取值
2024-08-04 21:40:02 +08:00
db_types = args.db_types
msg_dir = args.wx_files
wxid = args.wxid
ret = get_wx_db(msg_dir=msg_dir, db_types=db_types, wxids=wxid)
for i in ret: print(i)
return ret
class MainDecrypt(BaseSubMainClass):
mode = "decrypt"
parser_kwargs = {"help": "解密微信数据库"}
def init_parses(self, parser):
# 添加 'decrypt' 子命令解析器
parser.add_argument("-k", "--key", type=str, help="密钥", required=True, metavar="")
parser.add_argument("-i", "--db_path", type=str, help="数据库路径(目录or文件)", required=True, metavar="")
parser.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"),
help="输出路径(必须是目录)[默认为当前路径下decrypted文件夹]", required=False,
metavar="")
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
# 从命令行参数获取值
key = args.key
db_path = args.db_path
out_path = args.out_path
if not os.path.exists(db_path):
2023-12-03 09:43:33 +08:00
print(f"[-] 数据库路径不存在:{db_path}")
return
if not os.path.exists(out_path):
os.makedirs(out_path)
print(f"[+] 创建输出文件夹:{out_path}")
# 调用 decrypt 函数,并传入参数
result = batch_decrypt(key, db_path, out_path, True)
return result
class MainMerge(BaseSubMainClass):
mode = "merge"
parser_kwargs = {"help": "[测试功能]合并微信数据库(MSG.db or MediaMSG.db)"}
def init_parses(self, parser):
# 添加 'merge' 子命令解析器
parser.add_argument("-i", "--db_path", type=str, help="数据库路径(文件路径,使用英文[,]分割)", required=True,
metavar="")
parser.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"),
help="输出路径(目录或文件名)[默认为当前路径下decrypted文件夹下merge_***.db]",
required=False,
metavar="")
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
# 从命令行参数获取值
db_path = args.db_path
out_path = args.out_path
db_path = db_path.split(",")
2023-12-11 18:59:16 +08:00
db_path = [i.strip() for i in db_path]
dbpaths = []
for i in db_path:
2023-12-11 18:59:16 +08:00
if not os.path.exists(i): # 判断路径是否存在
print(f"[-] 数据库路径不存在:{i}")
return
2023-12-11 18:59:16 +08:00
if os.path.isdir(i): # 如果是文件夹则获取文件夹下所有的db文件
dbpaths += [os.path.join(i, j) for j in os.listdir(i) if j.endswith(".db")]
else: # 如果是文件,则直接添加
dbpaths.append(i)
if (not out_path.endswith(".db")) and (not os.path.exists(out_path)):
os.makedirs(out_path)
print(f"[+] 创建输出文件夹:{out_path}")
print(f"[*] 合并中...(用时较久,耐心等待)")
2024-08-04 21:40:02 +08:00
dbpaths = [{"db_path": i} for i in dbpaths if os.path.exists(i)] # 去除不存在的路径
2023-12-11 18:59:16 +08:00
result = merge_db(dbpaths, out_path)
2023-12-11 18:56:04 +08:00
print(f"[+] 合并完成:{result}")
return result
class MainShowChatRecords(BaseSubMainClass):
mode = "dbshow"
parser_kwargs = {"help": "聊天记录查看"}
def init_parses(self, parser):
# 添加 'dbshow' 子命令解析器
2024-05-20 20:55:01 +08:00
parser.add_argument("-merge", "--merge_path", type=str, help="解密并合并后的 merge_all.db 的路径",
required=False, metavar="")
parser.add_argument("-wid", "--wx_path", type=str,
help="(可选)微信文件夹的路径(用于显示图片)", required=False,
metavar="")
parser.add_argument("-myid", "--my_wxid", type=str, help="(可选)微信账号(本人微信id)", required=False,
2024-05-20 20:55:01 +08:00
default="", metavar="")
parser.add_argument("--online", action='store_true', help="(可选)是否在线查看(局域网查看)", required=False,
default=False)
2024-06-17 10:32:52 +08:00
# parser.add_argument("-k", "--key", type=str, help="(可选)密钥", required=False, metavar="")
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
2024-01-13 16:43:06 +08:00
# (merge)和(msg_path,micro_path,media_path) 二选一
# if not args.merge_path and not (args.msg_path and args.micro_path and args.media_path):
# print("[-] 请输入数据库路径([merge_path] or [msg_path, micro_path, media_path]")
# return
# 目前仅能支持merge database
if not args.merge_path:
print("[-] 请输入数据库路径([merge_path]")
return
# 从命令行参数获取值
merge_path = args.merge_path
online = args.online
2024-05-15 01:00:31 +08:00
if not os.path.exists(merge_path):
print("[-] 输入数据库路径不存在")
return
2024-08-18 20:30:17 +08:00
start_server(merge_path=merge_path, wx_path=args.wx_path, my_wxid=args.my_wxid, online=online)
2024-05-06 17:13:07 +08:00
class MainExportChatRecords(BaseSubMainClass):
mode = "export"
parser_kwargs = {"help": "[已废弃]聊天记录导出为html"}
def init_parses(self, parser):
# 添加 'export' 子命令解析器
2024-05-06 17:13:07 +08:00
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
print("[+] export命令已废弃请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务")
class MainAll(BaseSubMainClass):
mode = "all"
parser_kwargs = {"help": "[已废弃]获取微信信息,解密微信数据库,查看聊天记录"}
def init_parses(self, parser):
# 添加 'all' 子命令解析器
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
print("[+] all命令已废弃请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务")
2023-12-20 11:07:17 +08:00
class MainUi(BaseSubMainClass):
mode = "ui"
parser_kwargs = {"help": "启动UI界面"}
2024-01-14 19:48:30 +08:00
def init_parses(self, parser):
# 添加 'ui' 子命令解析器
parser.add_argument("-p", '--port', metavar="", type=int, help="(可选)端口号", default=5000)
parser.add_argument("--online", help="(可选)是否在线查看(局域网查看)", default=False, action='store_true')
parser.add_argument("--debug", help="(可选)是否开启debug模式", default=False, action='store_true')
2024-08-17 11:51:57 +08:00
parser.add_argument("--noOpenBrowser", dest='isOpenBrowser', default=True, action='store_false',
2024-05-29 18:32:59 +08:00
help="(可选)用于禁用自动打开浏览器")
return parser
2024-01-14 19:48:30 +08:00
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
# 从命令行参数获取值
online = args.online
port = args.port
debug = args.debug
2024-05-29 18:32:59 +08:00
isopenBrowser = args.isOpenBrowser
2024-01-14 19:48:30 +08:00
2024-08-17 11:51:57 +08:00
start_server(port=port, online=online, debug=debug, isopenBrowser=isopenBrowser)
2024-01-14 19:48:30 +08:00
2024-02-07 16:35:11 +08:00
class MainApi(BaseSubMainClass):
mode = "api"
2024-04-20 18:57:47 +08:00
parser_kwargs = {"help": "启动api不打开浏览器"}
def init_parses(self, parser):
# 添加 'api' 子命令解析器
parser.add_argument("-p", '--port', metavar="", type=int, help="(可选)端口号", default=5000)
parser.add_argument("--online", help="(可选)是否在线查看(局域网查看)", default=False, action='store_true')
parser.add_argument("--debug", action='store_true', help="(可选)是否开启debug模式", default=False)
return parser
def run(self, args):
print(f"[*] PyWxDump v{pywxdump.__version__}")
# 从命令行参数获取值
online = args.online
port = args.port
debug = args.debug
2024-08-17 11:51:57 +08:00
start_server(port=port, online=online, debug=debug, isopenBrowser=False)
2024-01-14 19:48:30 +08:00
2023-10-14 23:53:06 +08:00
def console_run():
# 检查是否需要显示帮助信息
if len(sys.argv) == 1:
sys.argv.append(MainUi.mode)
elif len(sys.argv) == 2 and sys.argv[1] not in models.keys():
sys.argv.append('-h')
main_parser.print_help()
return
args = main_parser.parse_args() # 解析命令行参数
2023-10-14 23:53:06 +08:00
if not any(vars(args).values()):
main_parser.print_help()
return
# 根据不同的 'mode' 参数,执行不同的操作
models[args.mode].run(args)
2023-10-14 23:53:06 +08:00
if __name__ == '__main__':
console_run()