PyWxDump/pywxdump/wx_core/merge_db.py

480 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: merge_db.py
# Description:
# Author: xaoyaoo
# Date: 2023/12/03
# -------------------------------------------------------------------------------
import logging
import os
import shutil
import sqlite3
import subprocess
import time
from typing import List
from .decryption import batch_decrypt
from .wx_info import get_core_db
from .utils import wx_core_loger, wx_core_error
@wx_core_error
def execute_sql(connection, sql, params=None):
"""
执行给定的SQL语句返回结果。
参数:
- connection SQLite连接
- sql要执行的SQL语句
- paramsSQL语句中的参数
"""
try:
# connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.fetchall()
except Exception as e:
try:
connection.text_factory = bytes
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
rdata = cursor.fetchall()
connection.text_factory = str
return rdata
except Exception as e:
wx_core_loger.error(f"**********\nSQL: {sql}\nparams: {params}\n{e}\n**********", exc_info=True)
return None
@wx_core_error
def check_create_sync_log(connection):
"""
检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间
:param connection: SQLite连接
:return: True or False
"""
out_cursor = connection.cursor()
# 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间
sync_log_status = execute_sql(connection, "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_log'")
if len(sync_log_status) < 1:
# db_path 微信数据库路径tbl_name 表名src_count 源数据库记录数current_count 当前合并后的数据库对应表记录数
sync_record_create_sql = ("CREATE TABLE sync_log ("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"db_path TEXT NOT NULL,"
"tbl_name TEXT NOT NULL,"
"src_count INT,"
"current_count INT,"
"createTime INT DEFAULT (strftime('%s', 'now')), "
"updateTime INT DEFAULT (strftime('%s', 'now'))"
");")
out_cursor.execute(sync_record_create_sql)
# 创建索引
out_cursor.execute("CREATE INDEX idx_sync_log_db_path ON sync_log (db_path);")
out_cursor.execute("CREATE INDEX idx_sync_log_tbl_name ON sync_log (tbl_name);")
# 创建联合索引,防止重复
out_cursor.execute("CREATE UNIQUE INDEX idx_sync_log_db_tbl ON sync_log (db_path, tbl_name);")
connection.commit()
out_cursor.close()
return True
@wx_core_error
def check_create_file_md5(connection):
"""
检查是否存在表 file_md5,用于记录文件信息,后续用于去重等操作,暂时闲置
"""
pass
@wx_core_error
def merge_db(db_paths: List[dict], save_path: str = "merge.db", is_merge_data: bool = True,
startCreateTime: int = 0, endCreateTime: int = 0):
"""
合并数据库 会忽略主键以及重复的行。
:param db_paths: [{"db_path": "xxx", "de_path": "xxx"},...]
db_path表示初始路径de_path表示解密后的路径初始路径用于保存合并的日志情况解密后的路径用于读取数据
:param save_path: str 输出文件路径
:param is_merge_data: bool 是否合并数据(如果为False则只解密并创建表不插入数据)
:param startCreateTime: 开始时间戳 主要用于MSG数据库的合并
:param endCreateTime: 结束时间戳 主要用于MSG数据库的合并
:return:
"""
if os.path.isdir(save_path):
save_path = os.path.join(save_path, f"merge_{int(time.time())}.db")
if isinstance(db_paths, list):
# alias, file_path
databases = {f"dbi_{i}": (db['db_path'],
db.get('de_path', db['db_path'])
) for i, db in enumerate(db_paths)
}
else:
raise TypeError("db_paths 类型错误")
outdb = sqlite3.connect(save_path)
is_sync_log = check_create_sync_log(outdb)
if not is_sync_log:
wx_core_loger.warning("创建同步记录表失败")
out_cursor = outdb.cursor()
# 将MSG_db_paths中的数据合并到out_db_path中
for alias, db in databases.items():
db_path = db[0]
de_path = db[1]
# 附加数据库
sql_attach = f"ATTACH DATABASE '{de_path}' AS {alias}"
out_cursor.execute(sql_attach)
outdb.commit()
sql_query_tbl_name = f"SELECT tbl_name, sql FROM {alias}.sqlite_master WHERE type='table' ORDER BY tbl_name;"
tables = execute_sql(outdb, sql_query_tbl_name)
for table in tables:
table, init_create_sql = table[0], table[1]
table = table if isinstance(table, str) else table.decode()
init_create_sql = init_create_sql if isinstance(init_create_sql, str) else init_create_sql.decode()
if table == "sqlite_sequence":
continue
if "CREATE TABLE".lower() not in str(init_create_sql).lower():
continue
# 获取表中的字段名
sql_query_columns = f"PRAGMA table_info({table})"
columns = execute_sql(outdb, sql_query_columns)
if table == "ChatInfo" and len(columns) > 12: # bizChat中的ChatInfo表与MicroMsg中的ChatInfo表字段不同
continue
col_type = {
(i[1] if isinstance(i[1], str) else i[1].decode(),
i[2] if isinstance(i[2], str) else i[2].decode())
for i in columns}
columns = [i[0] for i in col_type]
if not columns or len(columns) < 1:
continue
# 创建表table
sql_create_tbl = f"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM {alias}.{table} WHERE 0 = 1;"
out_cursor.execute(sql_create_tbl)
# 创建包含 NULL 值比较的 UNIQUE 索引
index_name = f"{table}_unique_index"
coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns)
sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})"
out_cursor.execute(sql)
# 插入sync_log
sql_query_sync_log = f"SELECT src_count FROM sync_log WHERE db_path=? AND tbl_name=?"
sync_log = execute_sql(outdb, sql_query_sync_log, (db_path, table))
if not sync_log or len(sync_log) < 1:
sql_insert_sync_log = "INSERT INTO sync_log (db_path, tbl_name, src_count, current_count) VALUES (?, ?, ?, ?)"
out_cursor.execute(sql_insert_sync_log, (db_path, table, 0, 0))
outdb.commit()
if is_merge_data:
# 比较源数据库和合并后的数据库记录数
log_src_count = execute_sql(outdb, sql_query_sync_log, (db_path, table))[0][0]
src_count = execute_sql(outdb, f"SELECT COUNT(*) FROM {alias}.{table}")[0][0]
if src_count <= log_src_count:
wx_core_loger.info(f"忽略 {db_path} {de_path} {table} {src_count} {log_src_count}")
continue
# 构建数据查询sql
sql_base = f"SELECT {','.join([i for i in columns])} FROM {alias}.{table} "
where_clauses, params = [], []
if "CreateTime" in columns:
if startCreateTime > 0:
where_clauses.append("CreateTime > ?")
params.append(startCreateTime)
if endCreateTime > 0:
where_clauses.append("CreateTime < ?")
params.append(endCreateTime)
# 如果有WHERE子句将其添加到SQL语句中并添加ORDER BY子句
sql = f"{sql_base} WHERE {' AND '.join(where_clauses)} ORDER BY CreateTime" if where_clauses else sql_base
src_data = execute_sql(outdb, sql, tuple(params))
if not src_data or len(src_data) < 1:
continue
# 插入数据
sql = f"INSERT OR IGNORE INTO {table} ({','.join([i for i in columns])}) VALUES ({','.join(['?'] * len(columns))})"
try:
out_cursor.executemany(sql, src_data)
# update sync_log
sql_update_sync_log = ("UPDATE sync_log "
"SET src_count = ? ,"
f"current_count=(SELECT COUNT(*) FROM {table}) "
"WHERE db_path=? AND tbl_name=?")
out_cursor.execute(sql_update_sync_log, (src_count, db_path, table))
except Exception as e:
wx_core_loger.error(
f"error: {db_path}\n{de_path}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n",
exc_info=True)
# 分离数据库
sql_detach = f"DETACH DATABASE {alias}"
out_cursor.execute(sql_detach)
outdb.commit()
out_cursor.close()
outdb.close()
return save_path
# @wx_core_error
# def merge_db1(db_paths: list[dict], save_path: str = "merge.db", is_merge_data: bool = True,
# startCreateTime: int = 0, endCreateTime: int = 0):
# """
# 合并数据库 会忽略主键以及重复的行。
# :param db_paths: [{"db_path": "xxx", "de_path": "xxx"},...]
# db_path表示初始路径de_path表示解密后的路径初始路径用于保存合并的日志情况解密后的路径用于读取数据
# :param save_path: str 输出文件路径
# :param is_merge_data: bool 是否合并数据(如果为False则只解密并创建表不插入数据)
# :param startCreateTime: 开始时间戳 主要用于MSG数据库的合并
# :param endCreateTime: 结束时间戳 主要用于MSG数据库的合并
# :return:
# """
# if os.path.isdir(save_path):
# save_path = os.path.join(save_path, f"merge_{int(time.time())}.db")
#
# if isinstance(db_paths, list):
# # alias, file_path
# databases = {f"MSG{i}": (db['db_path'],
# db.get('de_path', db['db_path'])
# ) for i, db in enumerate(db_paths)
# }
# else:
# raise TypeError("db_paths 类型错误")
#
# from sqlalchemy import create_engine, MetaData, Table, select, insert, Column, UniqueConstraint
# from sqlalchemy.orm import sessionmaker
# from sqlalchemy import inspect, PrimaryKeyConstraint
#
# outdb = create_engine(f"sqlite:///{save_path}", echo=False)
#
# # 创建Session实例
# Session = sessionmaker()
# Session.configure(bind=outdb)
# session = Session()
#
# # 将MSG_db_paths中的数据合并到out_db_path中
# for alias, db in databases.items():
# db_path = db[0]
# de_path = db[1]
#
# db_engine = create_engine(f"sqlite:///{de_path}", echo=False)
#
# # 反射源数据库的表结构
# metadata = MetaData()
# metadata.reflect(bind=db_engine)
#
# # 创建表
# outdb_metadata = MetaData()
# inspector = inspect(db_engine)
# table_names = [i for i in inspector.get_table_names() if i not in ["sqlite_sequence"]]
# for table_name in table_names:
# # 创建表table
# columns_list_dict = inspector.get_columns(table_name)
# col_names = [i['name'] for i in columns_list_dict]
# columns = [Column(i['name'], i['type'], primary_key=False) for i in columns_list_dict]
# table = Table(table_name, outdb_metadata, *columns)
# if len(columns) > 1: # 联合索引
# unique_constraint = UniqueConstraint(*col_names, name=f"{table_name}_unique_index")
# table.append_constraint(unique_constraint)
# else:
# table.append_constraint(PrimaryKeyConstraint(*col_names))
# table.create(outdb, checkfirst=True)
#
# # 将源数据库中的数据插入目标数据库
# outdb_metadata = MetaData()
# for table_name in metadata.tables:
# source_table = Table(table_name, metadata, autoload_with=db_engine)
# outdb_table = Table(table_name, outdb_metadata, autoload_with=outdb)
#
# # 查询源表中的所有数据
# query = select(source_table)
# with db_engine.connect() as connection:
# result = connection.execute(query).fetchall()
#
# # 插入到目标表中
# for row in result:
# row_data = row._asdict()
#
# # 尝试将所有文本数据转换为 UTF-8
# for key, value in row_data.items():
# if isinstance(value, str):
# row_data[key] = value.encode("utf-8")
#
# insert_stmt = insert(outdb_table).values(row_data)
# try:
# session.execute(insert_stmt)
# except Exception as e:
# pass
# db_engine.dispose()
#
# # 提交事务
# session.commit()
# # 关闭Session
# session.close()
# outdb.dispose()
# return save_path
@wx_core_error
def decrypt_merge(wx_path: str, key: str, outpath: str = "",
merge_save_path: str = None,
is_merge_data=True, is_del_decrypted: bool = True,
startCreateTime: int = 0, endCreateTime: int = 0,
db_type=None) -> (bool, str):
"""
解密合并数据库 msg.db, microMsg.db, media.db,注意:会删除原数据库
:param wx_path: 微信路径 eg: C:\\*******\\WeChat Files\\wxid_*********
:param key: 解密密钥
:param outpath: 输出路径
:param merge_save_path: 合并后的数据库路径
:param is_merge_data: 是否合并数据(如果为False则只解密并创建表不插入数据)
:param is_del_decrypted: 是否删除解密后的数据库(除了合并后的数据库)
:param startCreateTime: 开始时间戳 主要用于MSG数据库的合并
:param endCreateTime: 结束时间戳 主要用于MSG数据库的合并
:param db_type: 数据库类型,从核心数据库中选择
:return: (true,解密后的数据库路径) or (false,错误信息)
"""
if db_type is None:
db_type = []
outpath = outpath if outpath else "decrypt_merge_tmp"
merge_save_path = os.path.join(outpath,
f"merge_{int(time.time())}.db") if merge_save_path is None else merge_save_path
decrypted_path = os.path.join(outpath, "decrypted")
if not wx_path or not key or not os.path.exists(wx_path):
wx_core_loger.error("参数错误", exc_info=True)
return False, "参数错误"
# 解密
code, wxdbpaths = get_core_db(wx_path, db_type)
if not code:
wx_core_loger.error(f"获取数据库路径失败{wxdbpaths}", exc_info=True)
return False, wxdbpaths
# 判断out_path是否为空目录
if os.path.exists(decrypted_path) and os.listdir(decrypted_path):
for root, dirs, files in os.walk(decrypted_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
if not os.path.exists(decrypted_path):
os.makedirs(decrypted_path)
wxdbpaths = {i["db_path"]: i for i in wxdbpaths}
# 调用 decrypt 函数,并传入参数 # 解密
code, ret = batch_decrypt(key=key, db_path=list(wxdbpaths.keys()), out_path=decrypted_path, is_print=False)
if not code:
wx_core_loger.error(f"解密失败{ret}", exc_info=True)
return False, ret
out_dbs = []
for code1, ret1 in ret:
if code1:
out_dbs.append(ret1)
parpare_merge_db_path = []
for db_path, out_path, _ in out_dbs:
parpare_merge_db_path.append({"db_path": db_path, "de_path": out_path})
merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, is_merge_data=is_merge_data,
startCreateTime=startCreateTime, endCreateTime=endCreateTime)
if is_del_decrypted:
shutil.rmtree(decrypted_path, True)
if isinstance(merge_save_path, str):
return True, merge_save_path
else:
return False, "未知错误"
@wx_core_error
def merge_real_time_db(key, merge_path: str, db_paths: [dict] or dict):
"""
合并实时数据库消息,暂时只支持64位系统
:param key: 解密密钥
:param db_paths: [dict] or dict eg: {'wxid': 'wxid_***', 'db_type': 'MicroMsg',
'db_path': 'C:\**\wxid_***\Msg\MicroMsg.db', 'wxid_dir': 'C:\***\wxid_***'}
:param merge_path: 合并后的数据库路径
:return:
"""
try:
import platform
except:
raise ImportError("未找到模块 platform")
# 判断系统位数是否为64位如果不是则抛出异常
if platform.architecture()[0] != '64bit':
raise Exception("System is not 64-bit.")
if isinstance(db_paths, dict):
db_paths = [db_paths]
merge_path = os.path.abspath(merge_path) # 合并后的数据库路径,必须为绝对路径
merge_path_base = os.path.dirname(merge_path) # 合并后的数据库路径
if not os.path.exists(merge_path_base):
os.makedirs(merge_path_base)
endbs = []
for db_info in db_paths:
db_path = os.path.abspath(db_info['db_path'])
if not os.path.exists(db_path):
# raise FileNotFoundError("数据库不存在")
continue
if "MSG" not in db_path and "MicroMsg" not in db_path and "MediaMSG" not in db_path:
# raise FileNotFoundError("数据库不是消息数据库") # MicroMsg实时数据库
continue
endbs.append(os.path.abspath(db_path))
endbs = '" "'.join(list(set(endbs)))
# 获取当前文件夹路径
current_path = os.path.dirname(__file__)
real_time_exe_path = os.path.join(current_path, "tools", "realTime.exe")
# 调用cmd命令
cmd = f'{real_time_exe_path} "{key}" "{merge_path}" "{endbs}"'
# os.system(cmd)
p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=merge_path_base,
creationflags=subprocess.CREATE_NO_WINDOW)
# p.communicate()
# 查看返回值
out, err = p.communicate()
if out and out.decode("utf-8").find("SUCCESS") >= 0:
wx_core_loger.info(f"合并实时数据库成功{out}")
return True, merge_path
if err:
wx_core_loger.error(f"合并实时数据库失败\n{out}\n{err}")
return False, err
@wx_core_error
def all_merge_real_time_db(key, wx_path, merge_path: str):
"""
合并所有实时数据库
注:这是全量合并,会有可能产生重复数据,需要自行去重
:param key: 解密密钥
:param wx_path: 微信路径
:param merge_path: 合并后的数据库路径 eg: C:\\*******\\WeChat Files\\wxid_*********\\merge.db
:return:
"""
if not merge_path or not key or not wx_path or not wx_path:
return False, "msg_path or media_path or wx_path or key is required"
try:
from pywxdump import get_core_db
except ImportError:
return False, "未找到模块 pywxdump"
db_paths = get_core_db(wx_path, ["MediaMSG", "MSG", "MicroMsg"])
if not db_paths[0]:
return False, db_paths[1]
db_paths = db_paths[1]
code, ret = merge_real_time_db(key=key, merge_path=merge_path, db_paths=db_paths)
if code:
return True, merge_path
else:
return False, ret