加速merge_db合并速度

This commit is contained in:
xaoyaoo 2024-07-20 19:11:36 +08:00
parent 96e6b74d32
commit 930efaf0e9

View File

@ -203,7 +203,7 @@ def execute_sql(connection, sql, params=None):
return None return None
def merge_db(db_paths, save_path="merge.db", CreateTime: int = 0, endCreateTime: int = 0): def merge_db(db_paths, save_path="merge.db", startCreateTime: int = 0, endCreateTime: int = 0):
""" """
合并数据库 会忽略主键以及重复的行 合并数据库 会忽略主键以及重复的行
:param db_paths: :param db_paths:
@ -214,88 +214,115 @@ def merge_db(db_paths, save_path="merge.db", CreateTime: int = 0, endCreateTime:
if os.path.isdir(save_path): if os.path.isdir(save_path):
save_path = os.path.join(save_path, f"merge_{int(time.time())}.db") save_path = os.path.join(save_path, f"merge_{int(time.time())}.db")
_db_paths = []
if isinstance(db_paths, str):
if os.path.isdir(db_paths):
_db_paths = [os.path.join(db_paths, i) for i in os.listdir(db_paths) if i.endswith(".db")]
elif os.path.isfile(db_paths):
_db_paths = [db_paths]
else:
raise FileNotFoundError("db_paths 不存在")
if isinstance(db_paths, list): if isinstance(db_paths, list):
# alias, file_path # alias, file_path
databases = {f"MSG{i}": db_path for i, db_path in enumerate(db_paths)} databases = {f"MSG{i}": db_path for i, db_path in enumerate(db_paths)}
elif isinstance(db_paths, str):
# 判断是否是文件or文件夹
if os.path.isdir(db_paths):
db_paths = [os.path.join(db_paths, i) for i in os.listdir(db_paths) if i.endswith(".db")]
databases = {f"MSG{i}": db_path for i, db_path in enumerate(db_paths)}
elif os.path.isfile(db_paths):
databases = {"MSG": db_paths}
else:
raise FileNotFoundError("db_paths 不存在")
else: else:
raise TypeError("db_paths 类型错误") raise TypeError("db_paths 类型错误")
outdb = sqlite3.connect(save_path) outdb = sqlite3.connect(save_path)
out_cursor = outdb.cursor() out_cursor = outdb.cursor()
# 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间
sync_log_status = execute_sql(outdb, "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_log'")
if len(sync_log_status) < 1:
# db_path 微信数据库路径tbl_name 表名src_count 源数据库记录数current_count 当前合并后的数据库对应表记录数
sync_record_create_sql = ("CREATE TABLE sync_log ("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"db_path TEXT NOT NULL,"
"tbl_name TEXT NOT NULL,"
"src_count INT,"
"current_count INT,"
"createTime INT DEFAULT (strftime('%s', 'now')), "
"updateTime INT DEFAULT (strftime('%s', 'now'))"
");")
out_cursor.execute(sync_record_create_sql)
# 创建索引
out_cursor.execute("CREATE INDEX idx_sync_log_db_path ON sync_log (db_path);")
out_cursor.execute("CREATE INDEX idx_sync_log_tbl_name ON sync_log (tbl_name);")
# 创建联合索引,防止重复
out_cursor.execute("CREATE UNIQUE INDEX idx_sync_log_db_tbl ON sync_log (db_path, tbl_name);")
outdb.commit()
# 将MSG_db_paths中的数据合并到out_db_path中 # 将MSG_db_paths中的数据合并到out_db_path中
for alias in databases: for alias, path in databases.items():
db = sqlite3.connect(databases[alias]) # 附加数据库
# 获取表名 sql_attach = f"ATTACH DATABASE '{path}' AS {alias}"
sql = f"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;" out_cursor.execute(sql_attach)
tables = execute_sql(db, sql) outdb.commit()
try: sql_query_tbl_name = f"SELECT name FROM {alias}.sqlite_master WHERE type='table' ORDER BY name;"
for table in tables: tables = execute_sql(outdb, sql_query_tbl_name)
table = table[0] for table in tables:
if table == "sqlite_sequence": table = table[0]
continue if table == "sqlite_sequence":
# 获取表中的字段名 continue
sql = f"PRAGMA table_info({table})" # 获取表中的字段名
columns = execute_sql(db, sql) sql_query_columns = f"PRAGMA table_info({table})"
if not columns or len(columns) < 1: columns = execute_sql(outdb, sql_query_columns)
continue col_type = {
col_type = { (i[1] if isinstance(i[1], str) else i[1].decode(),
(i[1] if isinstance(i[1], str) else i[1].decode(), i[2] if isinstance(i[2], str) else i[2].decode()) i[2] if isinstance(i[2], str) else i[2].decode())
for for i in columns}
i in columns} columns = [i[0] for i in col_type]
columns = [i[1] if isinstance(i[1], str) else i[1].decode() for i in columns] if not columns or len(columns) < 1:
if not columns or len(columns) < 1: continue
continue # 创建表table
sql_create_tbl = f"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM {alias}.{table} WHERE 0 = 1;"
out_cursor.execute(sql_create_tbl)
# 创建包含 NULL 值比较的 UNIQUE 索引
index_name = f"{table}_unique_index"
coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns)
sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})"
out_cursor.execute(sql)
# 检测表是否存在 # 插入sync_log
sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'" sql_query_sync_log = f"SELECT * FROM sync_log WHERE db_path=? AND tbl_name=?"
out_cursor.execute(sql) sync_log = execute_sql(outdb, sql_query_sync_log, (path, table))
if len(out_cursor.fetchall()) < 1: if not sync_log or len(sync_log) < 1:
# 创建表 sql_insert_sync_log = "INSERT INTO sync_log (db_path, tbl_name, src_count, current_count) VALUES (?, ?, ?, ?)"
# 拼接创建表的SQL语句 out_cursor.execute(sql_insert_sync_log, (path, table, 0, 0))
column_definitions = [] outdb.commit()
for column in col_type:
column_name = column[0] if isinstance(column[0], str) else column[0].decode()
column_type = column[1] if isinstance(column[1], str) else column[1].decode()
column_definition = f"{column_name} {column_type}"
column_definitions.append(column_definition)
sql = f"CREATE TABLE IF NOT EXISTS {table} ({','.join(column_definitions)})"
# sql = f"CREATE TABLE IF NOT EXISTS {table} ({','.join(columns)})"
out_cursor.execute(sql)
# 创建包含 NULL 值比较的 UNIQUE 索引 # 比较源数据库和合并后的数据库记录数
index_name = f"{table}_unique_index" log_src_count = execute_sql(outdb, sql_query_sync_log, (path, table))[0][3]
coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns) # 将 NULL 值转换为 '' src_count = execute_sql(outdb, f"SELECT COUNT(*) FROM {alias}.{table}")[0][0]
sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})" if src_count <= log_src_count:
out_cursor.execute(sql) continue
# 获取表中的数据 sql_base = f"SELECT {','.join([i for i in columns])} FROM {alias}.{table} "
if "CreateTime" in columns and CreateTime > 0: # 构建WHERE子句
sql = f"SELECT {','.join([i[0] for i in col_type])} FROM {table} WHERE CreateTime>? ORDER BY CreateTime" where_clauses, params = [], []
src_data = execute_sql(db, sql, (CreateTime,)) if "CreateTime" in columns:
else: if startCreateTime > 0:
sql = f"SELECT {','.join([i[0] for i in col_type])} FROM {table}" where_clauses.append("CreateTime > ?")
src_data = execute_sql(db, sql) params.append(startCreateTime)
if not src_data or len(src_data) < 1: if endCreateTime > 0:
continue where_clauses.append("CreateTime < ?")
# 插入数据 params.append(endCreateTime)
sql = f"INSERT OR IGNORE INTO {table} ({','.join([i[0] for i in col_type])}) VALUES ({','.join(['?'] * len(columns))})" # 如果有WHERE子句将其添加到SQL语句中并添加ORDER BY子句
try: sql = f"{sql_base} WHERE {' AND '.join(where_clauses)} ORDER BY CreateTime" if where_clauses else sql_base
out_cursor.executemany(sql, src_data) src_data = execute_sql(outdb, sql, tuple(params))
except Exception as e: if not src_data or len(src_data) < 1:
logging.error(f"error: {alias}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n**********") continue
outdb.commit() # 插入数据
except Exception as e: sql = f"INSERT OR IGNORE INTO {table} ({','.join([i for i in columns])}) VALUES ({','.join(['?'] * len(columns))})"
logging.error(f"fun(merge_db) error: {alias}\n{e}\n**********") try:
db.close() out_cursor.executemany(sql, src_data)
except Exception as e:
logging.error(f"error: {path}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n", exc_info=True)
# 分离数据库
sql_detach = f"DETACH DATABASE {alias}"
out_cursor.execute(sql_detach)
outdb.commit()
outdb.close() outdb.close()
return save_path return save_path
@ -362,7 +389,7 @@ def decrypt_merge(wx_path, key, outpath="", CreateTime: int = 0, endCreateTime:
de_db_type = [f"de_{i}" for i in db_type] de_db_type = [f"de_{i}" for i in db_type]
parpare_merge_db_path = [i for i in out_dbs if any(keyword in i for keyword in de_db_type)] parpare_merge_db_path = [i for i in out_dbs if any(keyword in i for keyword in de_db_type)]
merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, CreateTime=CreateTime, merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, startCreateTime=CreateTime,
endCreateTime=endCreateTime) endCreateTime=endCreateTime)
return True, merge_save_path return True, merge_save_path