解密并合并 OpenIM*
This commit is contained in:
parent
e28043b41a
commit
0a6226c6a0
@ -241,7 +241,7 @@ def read_info(version_list: dict = None, is_logging: bool = False, save_path: st
|
||||
|
||||
|
||||
def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = None, wxid: Union[List[str], str] = None,
|
||||
is_logging: bool = False):
|
||||
is_logging: bool = False, is_return_list: bool = False) -> Union[str, dict, list]:
|
||||
r"""
|
||||
获取微信数据库路径
|
||||
:param require_list: 需要获取的数据库类型
|
||||
@ -288,6 +288,19 @@ def get_wechat_db(require_list: Union[List[str], str] = "all", msg_dir: str = No
|
||||
if is_logging: print(error)
|
||||
return error
|
||||
|
||||
if is_return_list: # 如果返回列表,返回值:{wxid:[db_path1,db_path2]}
|
||||
db_list = {}
|
||||
# 获取数据库路径
|
||||
for user, user_dir in user_dirs.items(): # 遍历用户目录
|
||||
db_list[user] = []
|
||||
for root, dirs, files in os.walk(user_dir):
|
||||
for file_name in files:
|
||||
for n, p in pattern.items():
|
||||
if p.match(file_name):
|
||||
src_path = os.path.join(root, file_name)
|
||||
db_list[user].append(src_path)
|
||||
return db_list
|
||||
|
||||
# 获取数据库路径
|
||||
for user, user_dir in user_dirs.items(): # 遍历用户目录
|
||||
user_dirs[user] = {n: [] for n in pattern.keys()}
|
||||
@ -319,7 +332,7 @@ def get_core_db(wx_path: str, db_type: list = None) -> [str]:
|
||||
"""
|
||||
if not os.path.exists(wx_path):
|
||||
return False, f"[-] 目录不存在: {wx_path}"
|
||||
db_type_all = ["MSG", "MediaMSG", "MicroMsg"]
|
||||
db_type_all = ["MSG", "MediaMSG", "MicroMsg", "OpenIMContact", "OpenIMMedia", "OpenIMMsg"]
|
||||
|
||||
if not db_type:
|
||||
db_type = db_type_all
|
||||
@ -328,22 +341,13 @@ def get_core_db(wx_path: str, db_type: list = None) -> [str]:
|
||||
|
||||
msg_dir = os.path.dirname(wx_path)
|
||||
my_wxid = os.path.basename(wx_path)
|
||||
WxDbPath = get_wechat_db('all', msg_dir, wxid=my_wxid, is_logging=False) # 获取微信数据库路径
|
||||
WxDbPath = get_wechat_db(db_type, msg_dir, wxid=my_wxid, is_logging=False, is_return_list=True) # 获取微信数据库路径
|
||||
if isinstance(WxDbPath, str): # 如果返回的是字符串,则表示出错
|
||||
return False, WxDbPath
|
||||
wxdbpaths = [path for user_dir in WxDbPath.values() for paths in user_dir.values() for path in paths]
|
||||
wxdbpaths = WxDbPath.get(wx_path, [])
|
||||
if len(wxdbpaths) == 0:
|
||||
return False, "未获取到数据库路径"
|
||||
rdbpaths = []
|
||||
for dbp in wxdbpaths:
|
||||
if "MicroMsg" in db_type and "MicroMsg" in dbp:
|
||||
rdbpaths.append(dbp)
|
||||
if "MediaMSG" in db_type and "MediaMSG" in dbp:
|
||||
rdbpaths.append(dbp)
|
||||
if "MSG" in db_type and "Multi\MSG" in dbp:
|
||||
rdbpaths.append(dbp)
|
||||
|
||||
return True, rdbpaths
|
||||
return True, wxdbpaths
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -304,7 +304,7 @@ def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0, endCreateTime:
|
||||
my_wxid = os.path.basename(wx_path)
|
||||
|
||||
# 解密
|
||||
code, wxdbpaths = get_core_db(wx_path, ["MSG", "MediaMSG", "MicroMsg"])
|
||||
code, wxdbpaths = get_core_db(wx_path, ["MSG", "MediaMSG", "MicroMsg", "OpenIMContact", "OpenIMMedia", "OpenIMMsg"])
|
||||
|
||||
# 判断out_path是否为空目录
|
||||
if os.path.exists(decrypted_path) and os.listdir(decrypted_path):
|
||||
@ -327,7 +327,8 @@ def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0, endCreateTime:
|
||||
if code1:
|
||||
out_dbs.append(ret1[1])
|
||||
|
||||
parpare_merge_db_path = [i for i in out_dbs if "de_MicroMsg" in i or "de_MediaMSG" in i or "de_MSG" in i]
|
||||
parpare_merge_db_path = [i for i in out_dbs if
|
||||
"de_MicroMsg" in i or "de_MediaMSG" in i or "de_MSG" in i or "de_OpenIMMsg" in i or "de_OpenIMMedia" in i or "de_OpenIMContact" in i]
|
||||
|
||||
merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, CreateTime=CreateTime,
|
||||
endCreateTime=endCreateTime)
|
||||
|
Loading…
Reference in New Issue
Block a user