PyWxDump/pywxdump/api/local_server.py

291 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: local_server.py
# Description:
# Author: xaoyaoo
# Date: 2024/08/01
# -------------------------------------------------------------------------------
import base64
import json
import logging
import os
import re
import time
import shutil
import pythoncom
import pywxdump
from flask import Flask, request, render_template, g, Blueprint, send_file, make_response, session
from pywxdump import get_core_db, all_merge_real_time_db, get_wx_db
from pywxdump.api.rjson import ReJson, RqJson
from pywxdump.api.utils import get_conf, get_conf_wxids, set_conf, error9999, gen_base64, validate_title, \
get_conf_local_wxid, ls_loger, random_str
from pywxdump import get_wx_info, WX_OFFS, batch_decrypt, BiasAddr, merge_db, decrypt_merge, merge_real_time_db
from pywxdump.db import DBHandler, download_file, export_csv, export_json
ls_api = Blueprint('ls_api', __name__, template_folder='../ui/web', static_folder='../ui/web/assets/', )
ls_api.debug = False
# 以下为初始化相关 *******************************************************************************************************
@ls_api.route('/api/ls/init_last_local_wxid', methods=["GET", 'POST'])
@error9999
def init_last_local_wxid():
"""
初始化包括key
:return:
"""
local_wxid = get_conf_local_wxid(g.caf)
local_wxid.remove(g.at)
if local_wxid:
return ReJson(0, {"local_wxids": local_wxid})
return ReJson(0, {"local_wxids": []})
@ls_api.route('/api/ls/init_last', methods=["GET", 'POST'])
@error9999
def init_last():
"""
是否初始化
:return:
"""
my_wxid = request.json.get("my_wxid", "")
my_wxid = my_wxid.strip().strip("'").strip('"') if isinstance(my_wxid, str) else ""
if not my_wxid:
my_wxid = get_conf(g.caf, "auto_setting", "last")
if my_wxid:
set_conf(g.caf, "auto_setting", "last", my_wxid)
merge_path = get_conf(g.caf, my_wxid, "merge_path")
wx_path = get_conf(g.caf, my_wxid, "wx_path")
key = get_conf(g.caf, my_wxid, "key")
rdata = {
"merge_path": merge_path,
"wx_path": wx_path,
"key": key,
"my_wxid": my_wxid,
"is_init": True,
}
if merge_path and wx_path:
return ReJson(0, rdata)
return ReJson(0, {"is_init": False, "my_wxid": ""})
@ls_api.route('/api/ls/init_key', methods=["GET", 'POST'])
@error9999
def init_key():
"""
初始化包括key
:return:
"""
wx_path = request.json.get("wx_path", "").strip().strip("'").strip('"')
key = request.json.get("key", "").strip().strip("'").strip('"')
my_wxid = request.json.get("my_wxid", "").strip().strip("'").strip('"')
if not wx_path:
return ReJson(1002, body=f"wx_path is required: {wx_path}")
if not os.path.exists(wx_path):
return ReJson(1001, body=f"wx_path not exists: {wx_path}")
if not key:
return ReJson(1002, body=f"key is required: {key}")
if not my_wxid:
return ReJson(1002, body=f"my_wxid is required: {my_wxid}")
# db_config = get_conf(g.caf, my_wxid, "db_config")
# if isinstance(db_config, dict) and db_config and os.path.exists(db_config.get("path")):
# pmsg = DBHandler(db_config)
# # pmsg.close_all_connection()
out_path = os.path.join(g.work_path, "decrypted", my_wxid) if my_wxid else os.path.join(g.work_path, "decrypted")
# 检查文件夹中文件是否被占用
if os.path.exists(out_path):
try:
shutil.rmtree(out_path)
except PermissionError as e:
# 显示堆栈信息
ls_loger.error(f"{e}", exc_info=True)
return ReJson(2001, body=str(e))
code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=str(out_path))
time.sleep(1)
if code:
# 移动merge_save_path到g.work_path/my_wxid
if not os.path.exists(os.path.join(g.work_path, my_wxid)):
os.makedirs(os.path.join(g.work_path, my_wxid))
merge_save_path_new = os.path.join(g.work_path, my_wxid, "merge_all.db")
shutil.move(merge_save_path, str(merge_save_path_new))
# 删除out_path
if os.path.exists(out_path):
try:
shutil.rmtree(out_path)
except PermissionError as e:
# 显示堆栈信息
ls_loger.error(f"{e}", exc_info=True)
db_config = {
"key": random_str(16),
"type": "sqlite",
"path": merge_save_path_new
}
set_conf(g.caf, my_wxid, "db_config", db_config)
set_conf(g.caf, my_wxid, "merge_path", merge_save_path_new)
set_conf(g.caf, my_wxid, "wx_path", wx_path)
set_conf(g.caf, my_wxid, "key", key)
set_conf(g.caf, my_wxid, "my_wxid", my_wxid)
set_conf(g.caf, "auto_setting", "last", my_wxid)
rdata = {
"merge_path": merge_save_path_new,
"wx_path": wx_path,
"key": key,
"my_wxid": my_wxid,
"is_init": True,
}
return ReJson(0, rdata)
else:
return ReJson(2001, body=merge_save_path)
@ls_api.route('/api/ls/init_nokey', methods=["GET", 'POST'])
@error9999
def init_nokey():
"""
初始化包括key
:return:
"""
merge_path = request.json.get("merge_path", "").strip().strip("'").strip('"')
wx_path = request.json.get("wx_path", "").strip().strip("'").strip('"')
my_wxid = request.json.get("my_wxid", "").strip().strip("'").strip('"')
if not wx_path:
return ReJson(1002, body=f"wx_path is required: {wx_path}")
if not os.path.exists(wx_path):
return ReJson(1001, body=f"wx_path not exists: {wx_path}")
if not merge_path:
return ReJson(1002, body=f"merge_path is required: {merge_path}")
if not my_wxid:
return ReJson(1002, body=f"my_wxid is required: {my_wxid}")
key = get_conf(g.caf, my_wxid, "key")
db_config = {
"key": random_str(16),
"type": "sqlite",
"path": merge_path
}
set_conf(g.caf, my_wxid, "db_config", db_config)
set_conf(g.caf, my_wxid, "merge_path", merge_path)
set_conf(g.caf, my_wxid, "wx_path", wx_path)
set_conf(g.caf, my_wxid, "key", key)
set_conf(g.caf, my_wxid, "my_wxid", my_wxid)
set_conf(g.caf, "test", "last", my_wxid)
rdata = {
"merge_path": merge_path,
"wx_path": wx_path,
"key": "",
"my_wxid": my_wxid,
"is_init": True,
}
return ReJson(0, rdata)
# END 以上为初始化相关 ***************************************************************************************************
@ls_api.route('/api/ls/realtimemsg', methods=["GET", "POST"])
@error9999
def get_real_time_msg():
"""
获取实时消息 使用 merge_real_time_db()函数
:return:
"""
my_wxid = get_conf(g.caf, g.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
merge_path = get_conf(g.caf, my_wxid, "merge_path")
key = get_conf(g.caf, my_wxid, "key")
wx_path = get_conf(g.caf, my_wxid, "wx_path")
if not merge_path or not key or not wx_path or not wx_path:
return ReJson(1002, body="msg_path or media_path or wx_path or key is required")
code, ret = all_merge_real_time_db(key=key, wx_path=wx_path, merge_path=merge_path)
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
# start 这部分为专业工具的api *********************************************************************************************
@ls_api.route('/api/ls/wxinfo', methods=["GET", 'POST'])
@error9999
def get_wxinfo():
"""
获取微信信息
:return:
"""
import pythoncom
pythoncom.CoInitialize()
wxinfos = get_wx_info(WX_OFFS)
pythoncom.CoUninitialize()
return ReJson(0, wxinfos)
@ls_api.route('/api/ls/biasaddr', methods=["GET", 'POST'])
@error9999
def biasaddr():
"""
BiasAddr
:return:
"""
mobile = request.json.get("mobile")
name = request.json.get("name")
account = request.json.get("account")
key = request.json.get("key", "")
wxdbPath = request.json.get("wxdbPath", "")
if not mobile or not name or not account:
return ReJson(1002)
pythoncom.CoInitialize()
rdata = BiasAddr(account, mobile, name, key, wxdbPath).run()
return ReJson(0, str(rdata))
@ls_api.route('/api/ls/decrypt', methods=["GET", 'POST'])
@error9999
def decrypt():
"""
解密
:return:
"""
key = request.json.get("key")
if not key:
return ReJson(1002)
wxdb_path = request.json.get("wxdbPath")
if not wxdb_path:
return ReJson(1002)
out_path = request.json.get("outPath")
if not out_path:
out_path = g.tmp_path
wxinfos = batch_decrypt(key, wxdb_path, out_path=out_path)
return ReJson(0, str(wxinfos))
@ls_api.route('/api/ls/merge', methods=["GET", 'POST'])
@error9999
def merge():
"""
合并
:return:
"""
wxdb_path = request.json.get("dbPath")
if not wxdb_path:
return ReJson(1002)
out_path = request.json.get("outPath")
if not out_path:
return ReJson(1002)
db_path = get_wx_db(wxdb_path)
# for i in db_path:print(i)
rdata = merge_db(db_path, out_path)
return ReJson(0, str(rdata))
# END 这部分为专业工具的api ***********************************************************************************************