优化命令行提示,优化合并数据库方法

This commit is contained in:
xaoyaoo 2023-12-10 00:16:05 +08:00
parent de580a35f7
commit 6bf25a1b03
6 changed files with 112 additions and 8 deletions

View File

@ -1,11 +1,12 @@
# 更新日志
## v2.3.4 (2023-12-09)
## v2.3.6 (2023-12-09)
### 优化
- 修复部分bug
- merge命令错误修复
- 优化命令行提示,优化合并数据库方法
## v2.3.3 (2023-12-08)

View File

@ -6,7 +6,7 @@
# Date: 2023/10/14
# -------------------------------------------------------------------------------
from .wx_info import BiasAddr,read_info, get_wechat_db,encrypt,batch_decrypt,decrypt
from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db
from .wx_info import merge_copy_db, merge_msg_db, merge_media_msg_db,merge_db
from .analyzer.db_parsing import read_img_dat, read_emoji, decompress_CompressContent, read_audio_buf, read_audio, parse_xml_string,read_BytesExtra
from .ui import app_show_chat, get_user_list, export

View File

@ -131,11 +131,11 @@ class MainMerge():
self.mode = "merge"
# 添加 'decrypt' 子命令解析器
sb_merge = parser.add_parser(self.mode, help="[测试功能]合并微信数据库(MSG.db or MediaMSG.db)")
sb_merge.add_argument("-t", "--dbtype", type=str, help="数据库类型(可选值)[msg,media]", required=True, metavar="")
sb_merge.add_argument("-i", "--db_path", type=str, help="数据库路径(文件路径,使用英文[,]分割)", required=True, metavar="")
sb_merge.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"),
help="输出路径(目录或文件名)[默认为当前路径下decrypted文件夹下merge_***.db]", required=False,
metavar="")
sb_merge.add_argument("-t", "--dbtype", type=str, help="数据库类型(可选值)[msg,media]", required=False, metavar="")
return sb_merge
def run(self, args):
@ -150,17 +150,20 @@ class MainMerge():
print(f"[-] 数据库路径不存在:{i}")
return
if not os.path.exists(out_path):
if (not out_path.endswith(".db")) and (not os.path.exists(out_path)):
os.makedirs(out_path)
print(f"[+] 创建输出文件夹:{out_path}")
print(f"[*] 合并中...(用时较久,耐心等待)")
if dbtype == "msg":
result = merge_msg_db(db_path, out_path)
result = merge_db(db_path, out_path)
elif dbtype == "media":
result = merge_media_msg_db(db_path, out_path)
result = merge_db(db_path, out_path)
else:
print(f"[-] 未知数据库类型:{dbtype}")
return
print(f"[+] 合并完成:{result}")
return result

View File

@ -8,4 +8,4 @@
from .get_wx_info import read_info, get_wechat_db
from .get_bias_addr import BiasAddr
from .decryption import batch_decrypt, encrypt, decrypt
from .merge_db import merge_msg_db, merge_copy_db, merge_media_msg_db
from .merge_db import merge_msg_db, merge_copy_db, merge_media_msg_db,merge_db

View File

@ -8,6 +8,7 @@
import os
import shutil
import sqlite3
import time
def merge_copy_db(db_path, save_path):
@ -164,3 +165,102 @@ def merge_media_msg_db(db_path: list, save_path: str):
merged_conn.close()
return save_path
def attach_databases(connection, databases):
"""
将多个数据库附加到给定的SQLite连接
参数
-连接SQLite连接
-数据库包含数据库别名和文件路径的词典
"""
cursor = connection.cursor()
for alias, file_path in databases.items():
attach_command = f"ATTACH DATABASE '{file_path}' AS {alias};"
cursor.execute(attach_command)
connection.commit()
def execute_sql(connection, sql, params=None):
"""
执行给定的SQL语句返回结果
参数
- connection SQLite连接
- sql要执行的SQL语句
- paramsSQL语句中的参数
"""
cursor = connection.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.fetchall()
def merge_db(db_paths, save_path="merge.db"):
if os.path.isdir(save_path):
save_path = os.path.join(save_path, f"merge_{int(time.time())}.db")
if isinstance(db_paths, list):
# alias, file_path
databases = {f"MSG{i}": db_path for i, db_path in enumerate(db_paths)}
elif isinstance(db_paths, str):
databases = {"MSG": db_paths}
else:
raise TypeError("db_paths 类型错误")
# 连接 MSG_ALL.db 数据库,并执行查询
if len(databases) > 1:
db = sqlite3.connect(":memory:")
attach_databases(db, databases)
else:
db = sqlite3.connect(list(databases.values())[0])
outdb = sqlite3.connect(save_path)
out_cursor = outdb.cursor()
# 将MSG_db_paths中的数据合并到out_db_path中
for alias in databases:
# 获取表名
sql = f"SELECT name FROM {alias}.sqlite_master WHERE type='table' ORDER BY name;"
tables = execute_sql(db, sql)
for table in tables:
table = table[0]
if table == "sqlite_sequence":
continue
# 获取表中的数据
sql = f"SELECT * FROM {alias}.{table}"
data = execute_sql(db, sql)
if len(data) < 1:
continue
# 获取表中的字段名
sql = f"PRAGMA table_info({table})"
columns = execute_sql(db, sql)
columns = [i[1] for i in columns]
if len(columns) < 1:
continue
# 检测表是否存在
sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'"
out_cursor.execute(sql)
if len(out_cursor.fetchall()) < 1:
# 创建表
sql = f"CREATE TABLE IF NOT EXISTS {table} ({','.join(columns)})"
out_cursor.execute(sql)
# 创建包含 NULL 值比较的 UNIQUE 索引
index_name = f"{table}_unique_index"
coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns) # 将 NULL 值转换为 ''
sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})"
out_cursor.execute(sql)
# 插入数据
sql = f"INSERT OR IGNORE INTO {table} VALUES ({','.join(['?'] * len(columns))})"
out_cursor.executemany(sql, data)
outdb.commit()
outdb.close()
# 断开数据库连接
if len(databases) > 1:
for alias in databases:
db.execute(f"DETACH DATABASE {alias}")
db.close()
return save_path

View File

@ -3,7 +3,7 @@ from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
version = "2.3.5"
version = "2.3.6"
install_requires = [
"psutil",