增加导出加密数据库的方式

This commit is contained in:
xaoyaoo 2024-01-20 21:00:17 +08:00
parent 7dd740d45f
commit 10767f2abe
4 changed files with 113 additions and 65 deletions

View File

@ -6,7 +6,7 @@
# Date: 2023/10/14
# -------------------------------------------------------------------------------
from .wx_info import BiasAddr, read_info, get_wechat_db, encrypt, batch_decrypt, decrypt
from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db, merge_db
from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db, merge_db, decrypt_merge
from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \
parse_xml_string, read_BytesExtra
from .analyzer import export_csv

View File

@ -9,12 +9,13 @@ import base64
import logging
import os
import time
import shutil
from flask import Flask, request, render_template, g, Blueprint, send_file, make_response, session
from pywxdump import analyzer, read_img_dat, read_audio, get_wechat_db
from pywxdump.api.rjson import ReJson, RqJson
from pywxdump.api.utils import read_session, save_session
from pywxdump import read_info, VERSION_LIST, batch_decrypt, BiasAddr, merge_db
from pywxdump import read_info, VERSION_LIST, batch_decrypt, BiasAddr, merge_db, decrypt_merge
import pywxdump
# app = Flask(__name__, static_folder='../ui/web/dist', static_url_path='/')
@ -49,66 +50,29 @@ def init():
save_micro_path) and save_my_wxid == my_wxid:
return ReJson(0, {"msg_path": save_msg_path, "micro_path": save_micro_path, "is_init": True})
# 解密
WxDbPath = get_wechat_db('all', None, wxid=my_wxid, is_logging=False) # 获取微信数据库路径
if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错
print(WxDbPath)
return ReJson(4007)
wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths]
if len(wxdbpaths) == 0:
print("[-] 未获取到数据库路径")
return ReJson(4007)
wxdbpaths = [i for i in wxdbpaths if "MicroMsg" in i or "MediaMSG" in i or r"Multi\MSG" in i] # 过滤掉无需解密的数据库
decrypted_path = os.path.join(g.tmp_path, "decrypted")
# 判断out_path是否为空目录
if os.path.exists(decrypted_path) and os.listdir(decrypted_path):
isdel = "y"
if isdel.lower() == 'y' or isdel.lower() == 'yes':
for root, dirs, files in os.walk(decrypted_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
out_path = os.path.join(decrypted_path, my_wxid) if my_wxid else decrypted_path
if not os.path.exists(out_path):
os.makedirs(out_path)
# 调用 decrypt 函数,并传入参数 # 解密
code, ret = batch_decrypt(key, wxdbpaths, out_path, False)
if not code:
return ReJson(4007, msg=ret)
out_dbs = []
for code1, ret1 in ret:
if code1:
out_dbs.append(ret1[1])
parpare_merge_db_path = [i for i in out_dbs if "de_MicroMsg" in i or "de_MediaMSG" in i or "de_MSG" in i]
# 合并所有的数据库
logging.info("开始合并数据库")
merge_save_path = merge_db(parpare_merge_db_path, os.path.join(out_path, "merge_all.db"))
out_path = os.path.join(g.tmp_path, "decrypted", my_wxid) if my_wxid else os.path.join(g.tmp_path,
"decrypted")
code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=out_path)
time.sleep(1)
save_session(g.sf, "msg_path", merge_save_path)
save_session(g.sf, "micro_path", merge_save_path)
save_session(g.sf, "media_path", merge_save_path)
save_session(g.sf, "wx_path", wx_path)
save_session(g.sf, "key", key)
save_session(g.sf, "my_wxid", my_wxid)
rdata = {
"msg_path": merge_save_path,
"micro_path": merge_save_path,
"media_path": merge_save_path,
"wx_path": wx_path,
"key": key,
"my_wxid": my_wxid,
"is_init": True,
}
return ReJson(0, rdata)
if code:
save_session(g.sf, "msg_path", merge_save_path)
save_session(g.sf, "micro_path", merge_save_path)
save_session(g.sf, "media_path", merge_save_path)
save_session(g.sf, "wx_path", wx_path)
save_session(g.sf, "key", key)
save_session(g.sf, "my_wxid", my_wxid)
rdata = {
"msg_path": merge_save_path,
"micro_path": merge_save_path,
"media_path": merge_save_path,
"wx_path": wx_path,
"key": key,
"my_wxid": my_wxid,
"is_init": True,
}
return ReJson(0, rdata)
else:
return ReJson(2001, body=merge_save_path)
else:
if not msg_path or not micro_path or not media_path or not wx_path or not my_wxid:
@ -327,7 +291,7 @@ def export():
wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path"))
key = request.json.get("key", read_session(g.sf, "key"))
if not export_type or not start_time or not end_time or not chat_type or not username:
if not export_type:
return ReJson(1002)
chat_type_tups = []
for t in chat_type:
@ -343,7 +307,32 @@ def export():
os.makedirs(outpath)
if export_type == "endb":
pass
outpath = os.path.join(outpath, export_type)
if not os.path.exists(outpath):
os.makedirs(outpath)
# 获取微信文件夹路径
if not wx_path:
return ReJson(1002)
if not os.path.exists(wx_path):
return ReJson(1001, body=wx_path)
# 分割wx_path的文件名和父目录
msg_dir = os.path.dirname(wx_path)
my_wxid = os.path.basename(wx_path)
WxDbPath = get_wechat_db('all', msg_dir, wxid=my_wxid, is_logging=False) # 获取微信数据库路径
if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错
return False, WxDbPath
wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths]
if len(wxdbpaths) == 0:
return False, "未获取到数据库路径"
wxdbpaths = [i for i in wxdbpaths if "MicroMsg" in i or "MediaMSG" in i or r"Multi\MSG" in i] # 过滤掉无需解密的数据库
for wxdb in wxdbpaths:
# 复制wxdb->dboutpath
dboutpath = os.path.join(outpath, os.path.basename(wxdb))
shutil.copy(wxdb, dboutpath)
return ReJson(0, body=outpath)
elif export_type == "dedb":
pass
elif export_type == "csv":

View File

@ -8,4 +8,4 @@
from .get_wx_info import read_info, get_wechat_db
from .get_bias_addr import BiasAddr
from .decryption import batch_decrypt, encrypt, decrypt
from .merge_db import merge_msg_db, merge_copy_db, merge_media_msg_db, merge_db
from .merge_db import merge_msg_db, merge_copy_db, merge_media_msg_db, merge_db, decrypt_merge

View File

@ -199,7 +199,7 @@ def execute_sql(connection, sql, params=None):
return None
def merge_db(db_paths, save_path="merge.db", CreateTime: int = 0):
def merge_db(db_paths, save_path="merge.db", CreateTime: int = 0, endCreateTime: int = 0):
"""
合并数据库 会忽略主键以及重复的行
:param db_paths:
@ -277,3 +277,62 @@ def merge_db(db_paths, save_path="merge.db", CreateTime: int = 0):
outdb.commit()
outdb.close()
return save_path
def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0) -> (bool, str):
"""
解密合并数据库 msg.db, microMsg.db, media.db
:param wx_path: 微信路径 eg: C:\*******\WeChat Files\wxid_*********
:param key: 解密密钥
:return: (true,解密后的数据库路径) or (false,错误信息)
"""
from .decryption import batch_decrypt
from .get_wx_info import get_wechat_db
outpath = outpath if outpath else "decrypt_merge_tmp"
merge_save_path = os.path.join(outpath, "merge_all.db")
decrypted_path = os.path.join(outpath, "decrypted")
if not wx_path or not key:
return True, "参数错误"
# 分割wx_path的文件名和父目录
msg_dir = os.path.dirname(wx_path)
my_wxid = os.path.basename(wx_path)
# 解密
WxDbPath = get_wechat_db('all', msg_dir, wxid=my_wxid, is_logging=False) # 获取微信数据库路径
if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错
return False, WxDbPath
wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths]
if len(wxdbpaths) == 0:
return False, "未获取到数据库路径"
wxdbpaths = [i for i in wxdbpaths if "MicroMsg" in i or "MediaMSG" in i or r"Multi\MSG" in i] # 过滤掉无需解密的数据库
# 判断out_path是否为空目录
if os.path.exists(decrypted_path) and os.listdir(decrypted_path):
for root, dirs, files in os.walk(decrypted_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
if not os.path.exists(decrypted_path):
os.makedirs(decrypted_path)
# 调用 decrypt 函数,并传入参数 # 解密
code, ret = batch_decrypt(key, wxdbpaths, decrypted_path, False)
if not code:
return False, ret
out_dbs = []
for code1, ret1 in ret:
if code1:
out_dbs.append(ret1[1])
parpare_merge_db_path = [i for i in out_dbs if "de_MicroMsg" in i or "de_MediaMSG" in i or "de_MSG" in i]
merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, CreateTime=CreateTime)
return True, merge_save_path