From 226baa5d130d77e03c0954e3dd16b685a5859d64 Mon Sep 17 00:00:00 2001 From: xaoyaoo Date: Sat, 20 Jan 2024 22:03:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=BC=E5=87=BA=E8=A7=A3?= =?UTF-8?q?=E5=AF=86=E6=95=B0=E6=8D=AE=E5=BA=93=E3=80=81csv=E6=96=B9?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pywxdump/__init__.py | 2 +- pywxdump/api/api.py | 79 ++++++++++++++++++--------------- pywxdump/wx_info/__init__.py | 2 +- pywxdump/wx_info/get_wx_info.py | 35 +++++++++++++++ pywxdump/wx_info/merge_db.py | 18 +++----- 5 files changed, 86 insertions(+), 50 deletions(-) diff --git a/pywxdump/__init__.py b/pywxdump/__init__.py index d592f18..262a74e 100644 --- a/pywxdump/__init__.py +++ b/pywxdump/__init__.py @@ -5,7 +5,7 @@ # Author: xaoyaoo # Date: 2023/10/14 # ------------------------------------------------------------------------------- -from .wx_info import BiasAddr, read_info, get_wechat_db, encrypt, batch_decrypt, decrypt +from .wx_info import BiasAddr, read_info, get_wechat_db, encrypt, batch_decrypt, decrypt, get_core_db from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db, merge_db, decrypt_merge from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, \ parse_xml_string, read_BytesExtra diff --git a/pywxdump/api/api.py b/pywxdump/api/api.py index 913bd36..7c8392e 100644 --- a/pywxdump/api/api.py +++ b/pywxdump/api/api.py @@ -12,7 +12,7 @@ import time import shutil from flask import Flask, request, render_template, g, Blueprint, send_file, make_response, session -from pywxdump import analyzer, read_img_dat, read_audio, get_wechat_db +from pywxdump import analyzer, read_img_dat, read_audio, get_wechat_db, get_core_db from pywxdump.api.rjson import ReJson, RqJson from pywxdump.api.utils import read_session, save_session from pywxdump import read_info, VERSION_LIST, batch_decrypt, BiasAddr, merge_db, decrypt_merge @@ -282,34 +282,22 @@ def export(): :return: """ export_type = request.json.get("export_type") - start_time = request.json.get("start_time") - end_time = request.json.get("end_time") + start_time = request.json.get("start_time", 0) + end_time = request.json.get("end_time", 0) chat_type = request.json.get("chat_type") username = request.json.get("username") - - # 可选参数 wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path")) key = request.json.get("key", read_session(g.sf, "key")) - if not export_type: - return ReJson(1002) - chat_type_tups = [] - for t in chat_type: - tup = analyzer.get_name_typeid(t) - if tup: - chat_type_tups += tup - if not chat_type_tups: + if not export_type or not isinstance(export_type, str): return ReJson(1002) # 导出路径 - outpath = os.path.join(g.tmp_path, "export") + outpath = os.path.join(g.tmp_path, "export", export_type) if not os.path.exists(outpath): os.makedirs(outpath) - if export_type == "endb": - outpath = os.path.join(outpath, export_type) - if not os.path.exists(outpath): - os.makedirs(outpath) + if export_type == "endb": # 导出加密数据库 # 获取微信文件夹路径 if not wx_path: return ReJson(1002) @@ -317,36 +305,55 @@ def export(): return ReJson(1001, body=wx_path) # 分割wx_path的文件名和父目录 - msg_dir = os.path.dirname(wx_path) - my_wxid = os.path.basename(wx_path) - WxDbPath = get_wechat_db('all', msg_dir, wxid=my_wxid, is_logging=False) # 获取微信数据库路径 - if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错 - return False, WxDbPath - wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths] - if len(wxdbpaths) == 0: - return False, "未获取到数据库路径" - wxdbpaths = [i for i in wxdbpaths if "MicroMsg" in i or "MediaMSG" in i or r"Multi\MSG" in i] # 过滤掉无需解密的数据库 + code, wxdbpaths = get_core_db(wx_path) + if not code: + return ReJson(2001, body=wxdbpaths) for wxdb in wxdbpaths: - # 复制wxdb->dboutpath - dboutpath = os.path.join(outpath, os.path.basename(wxdb)) - shutil.copy(wxdb, dboutpath) + # 复制wxdb->outpath, os.path.basename(wxdb) + shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb))) return ReJson(0, body=outpath) elif export_type == "dedb": - pass + if isinstance(start_time, int) and isinstance(end_time, int): + msg_path = read_session(g.sf, "msg_path") + micro_path = read_session(g.sf, "micro_path") + media_path = read_session(g.sf, "media_path") + dbpaths = [msg_path, msg_path, micro_path] + dbpaths = list(set(dbpaths)) + mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time) + return ReJson(0, body=mergepath) + # if msg_path == media_path and msg_path == media_path: + # shutil.copy(msg_path, os.path.join(outpath, "merge.db")) + # return ReJson(0, body=msg_path) + # else: + # dbpaths = [msg_path, msg_path, micro_path] + # dbpaths = list(set(dbpaths)) + # mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time) + # return ReJson(0, body=mergepath) + else: + return ReJson(1002, body={"start_time": start_time, "end_time": end_time}) + elif export_type == "csv": - # 导出聊天记录 - outpath = os.path.join(outpath, "csv") if not os.path.exists(outpath): os.makedirs(outpath) - code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path")) + code, ret = analyzer.export_csv(username, os.path.join(outpath, username), read_session(g.sf, "msg_path")) if code: return ReJson(0, ret) + else: + return ReJson(2001, body=ret) elif export_type == "json": pass elif export_type == "html": - pass + + chat_type_tups = [] + for ct in chat_type: + tup = analyzer.get_name_typeid(ct) + if tup: + chat_type_tups += tup + if not chat_type_tups: + return ReJson(1002) + elif export_type == "pdf": pass elif export_type == "docx": @@ -354,7 +361,7 @@ def export(): else: return ReJson(1002) - return ReJson(0, "") + return ReJson(9999, "") # 这部分为专业工具的api diff --git a/pywxdump/wx_info/__init__.py b/pywxdump/wx_info/__init__.py index d2ba147..9f21b37 100644 --- a/pywxdump/wx_info/__init__.py +++ b/pywxdump/wx_info/__init__.py @@ -5,7 +5,7 @@ # Author: xaoyaoo # Date: 2023/08/21 # ------------------------------------------------------------------------------- -from .get_wx_info import read_info, get_wechat_db +from .get_wx_info import read_info, get_wechat_db, get_core_db from .get_bias_addr import BiasAddr from .decryption import batch_decrypt, encrypt, decrypt from .merge_db import merge_msg_db, merge_copy_db, merge_media_msg_db, merge_db, decrypt_merge diff --git a/pywxdump/wx_info/get_wx_info.py b/pywxdump/wx_info/get_wx_info.py index 6f88f4f..5a34d4b 100644 --- a/pywxdump/wx_info/get_wx_info.py +++ b/pywxdump/wx_info/get_wx_info.py @@ -293,6 +293,41 @@ def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = No return user_dirs +def get_core_db(wx_path: str, db_type: list = None) -> [str]: + """ + 获取聊天消息核心数据库路径 + :param wx_path: 微信文件夹路径 eg:C:\*****\WeChat Files\wxid******* + :param db_type: 数据库类型 eg: ["MSG", "MediaMSG", "MicroMsg"],三个中选择一个或多个 + :return: 返回数据库路径 eg:["",""] + """ + if not os.path.exists(wx_path): + return False, f"[-] 目录不存在: {wx_path}" + db_type_all = ["MSG", "MediaMSG", "MicroMsg"] + + db_type = [dt for dt in db_type if dt in db_type_all] + if not db_type: + db_type = db_type_all + + msg_dir = os.path.dirname(wx_path) + my_wxid = os.path.basename(wx_path) + WxDbPath = get_wechat_db('all', msg_dir, wxid=my_wxid, is_logging=False) # 获取微信数据库路径 + if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错 + return False, WxDbPath + wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths] + if len(wxdbpaths) == 0: + return False, "未获取到数据库路径" + rdbpaths = [] + for dbp in wxdbpaths: + if "MicroMsg" in db_type and "MicroMsg" in dbp: + rdbpaths.append(dbp) + if "MediaMSG" in db_type and "MediaMSG" in dbp: + rdbpaths.append(dbp) + if "MSG" in db_type and "Multi\MSG" in dbp: + rdbpaths.append(dbp) + + return True, rdbpaths + + if __name__ == '__main__': from pywxdump import VERSION_LIST diff --git a/pywxdump/wx_info/merge_db.py b/pywxdump/wx_info/merge_db.py index df4a68a..58c15db 100644 --- a/pywxdump/wx_info/merge_db.py +++ b/pywxdump/wx_info/merge_db.py @@ -279,7 +279,7 @@ def merge_db(db_paths, save_path="merge.db", CreateTime: int = 0, endCreateTime: return save_path -def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0) -> (bool, str): +def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0, endCreateTime: int = 0) -> (bool, str): """ 解密合并数据库 msg.db, microMsg.db, media.db :param wx_path: 微信路径 eg: C:\*******\WeChat Files\wxid_********* @@ -287,28 +287,21 @@ def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0) -> (bool, str): :return: (true,解密后的数据库路径) or (false,错误信息) """ from .decryption import batch_decrypt - from .get_wx_info import get_wechat_db + from .get_wx_info import get_core_db outpath = outpath if outpath else "decrypt_merge_tmp" merge_save_path = os.path.join(outpath, "merge_all.db") decrypted_path = os.path.join(outpath, "decrypted") if not wx_path or not key: - return True, "参数错误" + return False, "参数错误" # 分割wx_path的文件名和父目录 msg_dir = os.path.dirname(wx_path) my_wxid = os.path.basename(wx_path) # 解密 - WxDbPath = get_wechat_db('all', msg_dir, wxid=my_wxid, is_logging=False) # 获取微信数据库路径 - if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错 - return False, WxDbPath - wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths] - if len(wxdbpaths) == 0: - return False, "未获取到数据库路径" - - wxdbpaths = [i for i in wxdbpaths if "MicroMsg" in i or "MediaMSG" in i or r"Multi\MSG" in i] # 过滤掉无需解密的数据库 + code, wxdbpaths = get_core_db(wx_path) # 判断out_path是否为空目录 if os.path.exists(decrypted_path) and os.listdir(decrypted_path): @@ -333,6 +326,7 @@ def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0) -> (bool, str): parpare_merge_db_path = [i for i in out_dbs if "de_MicroMsg" in i or "de_MediaMSG" in i or "de_MSG" in i] - merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, CreateTime=CreateTime) + merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, CreateTime=CreateTime, + endCreateTime=endCreateTime) return True, merge_save_path