290 lines
8.2 KiB
Python
290 lines
8.2 KiB
Python
# -*- coding: utf-8 -*-#
|
||
# -------------------------------------------------------------------------------
|
||
# Name: utils.py
|
||
# Description:
|
||
# Author: xaoyaoo
|
||
# Date: 2023/12/03
|
||
# -------------------------------------------------------------------------------
|
||
import hashlib
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import time
|
||
|
||
|
||
def time_int2str(time_int):
|
||
"""
|
||
时间戳转换为时间字符串
|
||
:param time_int: 时间戳
|
||
:return: 时间字符串
|
||
"""
|
||
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_int))
|
||
|
||
|
||
def read_dict_all_values(data):
|
||
"""
|
||
读取字典中所有的值(单层)
|
||
:param dict_data: 字典
|
||
:return: 所有值的list
|
||
"""
|
||
result = []
|
||
if isinstance(data, list):
|
||
for item in data:
|
||
result.extend(read_dict_all_values(item))
|
||
elif isinstance(data, dict):
|
||
for key, value in data.items():
|
||
result.extend(read_dict_all_values(value))
|
||
else:
|
||
if isinstance(data, bytes):
|
||
tmp = data.decode("utf-8")
|
||
else:
|
||
tmp = str(data) if isinstance(data, int) else data
|
||
result.append(tmp)
|
||
|
||
for i in range(len(result)):
|
||
if isinstance(result[i], bytes):
|
||
result[i] = result[i].decode("utf-8")
|
||
return result
|
||
|
||
|
||
def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"):
|
||
"""
|
||
匹配 BytesExtra
|
||
:param BytesExtra: BytesExtra
|
||
:param pattern: 匹配模式
|
||
:return:
|
||
"""
|
||
if not BytesExtra:
|
||
return False
|
||
BytesExtra = read_dict_all_values(BytesExtra)
|
||
BytesExtra = "'" + "'".join(BytesExtra) + "'"
|
||
# print(BytesExtra)
|
||
|
||
match = re.search(pattern, BytesExtra)
|
||
if match:
|
||
video_path = match.group(0).replace("'", "")
|
||
return video_path
|
||
else:
|
||
return ""
|
||
|
||
|
||
def get_type_name(type_id: tuple):
|
||
"""
|
||
获取消息类型名称
|
||
:param type_id: 消息类型ID 元组 eg: (1, 0)
|
||
:return:
|
||
"""
|
||
type_name_dict = {
|
||
(1, 0): "文本",
|
||
(3, 0): "图片",
|
||
(34, 0): "语音",
|
||
(43, 0): "视频",
|
||
(47, 0): "动画表情",
|
||
|
||
(37, 0): "添加好友", # 感谢 https://github.com/zhyc9de
|
||
(42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de
|
||
(48, 0): "地图信息", # 感谢 https://github.com/zhyc9de
|
||
(49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de
|
||
(49, 53): "接龙", # 感谢 https://github.com/zhyc9de
|
||
|
||
(49, 0): "文件",
|
||
(49, 1): "类似文字消息而不一样的消息",
|
||
(49, 5): "卡片式链接",
|
||
(49, 6): "文件",
|
||
(49, 8): "用户上传的GIF表情",
|
||
(49, 19): "合并转发的聊天记录",
|
||
(49, 33): "分享的小程序",
|
||
(49, 36): "分享的小程序",
|
||
(49, 57): "带有引用的文本消息",
|
||
(49, 63): "视频号直播或直播回放等",
|
||
(49, 87): "群公告",
|
||
(49, 88): "视频号直播或直播回放等",
|
||
(49, 2000): "转账消息",
|
||
(49, 2003): "赠送红包封面",
|
||
|
||
(50, 0): "语音通话",
|
||
(10000, 0): "系统通知",
|
||
(10000, 4): "拍一拍",
|
||
(10000, 8000): "系统通知"
|
||
}
|
||
|
||
if type_id in type_name_dict:
|
||
return type_name_dict[type_id]
|
||
else:
|
||
return "未知"
|
||
|
||
|
||
def get_name_typeid(type_name: str):
|
||
"""
|
||
获取消息类型名称
|
||
:param type_id: 消息类型ID 元组 eg: (1, 0)
|
||
:return:
|
||
"""
|
||
type_name_dict = {
|
||
(1, 0): "文本",
|
||
(3, 0): "图片",
|
||
(34, 0): "语音",
|
||
(43, 0): "视频",
|
||
(47, 0): "动画表情",
|
||
|
||
(37, 0): "添加好友", # 感谢 https://github.com/zhyc9de
|
||
(42, 0): "推荐公众号", # 感谢 https://github.com/zhyc9de
|
||
(48, 0): "地图信息", # 感谢 https://github.com/zhyc9de
|
||
(49, 40): "分享收藏夹", # 感谢 https://github.com/zhyc9de
|
||
(49, 53): "接龙", # 感谢 https://github.com/zhyc9de
|
||
|
||
(49, 0): "文件",
|
||
(49, 1): "类似文字消息而不一样的消息",
|
||
(49, 5): "卡片式链接",
|
||
(49, 6): "文件",
|
||
(49, 8): "用户上传的GIF表情",
|
||
(49, 19): "合并转发的聊天记录",
|
||
(49, 33): "分享的小程序",
|
||
(49, 36): "分享的小程序",
|
||
(49, 57): "带有引用的文本消息",
|
||
(49, 63): "视频号直播或直播回放等",
|
||
(49, 87): "群公告",
|
||
(49, 88): "视频号直播或直播回放等",
|
||
(49, 2000): "转账消息",
|
||
(49, 2003): "赠送红包封面",
|
||
|
||
(50, 0): "语音通话",
|
||
(10000, 0): "系统通知",
|
||
(10000, 4): "拍一拍",
|
||
(10000, 8000): "系统通知"
|
||
}
|
||
type_tup = []
|
||
for k, v in type_name_dict.items():
|
||
if v == type_name:
|
||
type_tup.append(k)
|
||
return type_tup
|
||
|
||
|
||
def get_md5(data):
|
||
"""
|
||
获取数据的 MD5 值
|
||
:param data: 数据(bytes)
|
||
:return:
|
||
"""
|
||
md5 = hashlib.md5()
|
||
md5.update(data)
|
||
return md5.hexdigest()
|
||
|
||
|
||
import threading
|
||
|
||
|
||
def get_thread_id():
|
||
current_thread = threading.current_thread()
|
||
thread_id = current_thread.ident
|
||
return thread_id
|
||
|
||
|
||
class DBPool:
|
||
__db_pool = {}
|
||
__thread_pool = {}
|
||
|
||
def __new__(cls, *args, **kwargs):
|
||
if not hasattr(cls, '_instance'):
|
||
cls._instance = super(DBPool, cls).__new__(cls)
|
||
return cls._instance
|
||
|
||
@classmethod
|
||
def create_connection(cls, db_path):
|
||
if db_path == "DBPOOL_INIT":
|
||
return
|
||
if not os.path.exists(db_path):
|
||
raise FileNotFoundError(f"数据库文件不存在:{db_path}")
|
||
|
||
if db_path not in cls.__db_pool:
|
||
cls.__db_pool[db_path] = sqlite3.connect(db_path, check_same_thread=False)
|
||
cls.connection = cls.__db_pool[db_path]
|
||
|
||
def __init__(self, db_path):
|
||
if db_path == "DBPOOL_INIT":
|
||
return
|
||
self.db_path = db_path
|
||
if db_path not in self.__db_pool:
|
||
self.create_connection(db_path)
|
||
self.connection = self.__db_pool.get(db_path)
|
||
# 检查数据库是否关闭
|
||
if not self.connection:
|
||
self.create_connection(db_path)
|
||
self.connection = self.__db_pool.get(db_path)
|
||
|
||
def __enter__(self):
|
||
return self.connection
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.connection = None
|
||
|
||
def close_all(self):
|
||
for db_path, connection in self.__db_pool.items():
|
||
connection.close()
|
||
self.__db_pool.clear()
|
||
|
||
|
||
def attach_databases(connection, databases):
|
||
"""
|
||
将多个数据库附加到给定的SQLite连接。
|
||
参数:
|
||
-连接:SQLite连接
|
||
-数据库:包含数据库别名和文件路径的词典
|
||
"""
|
||
cursor = connection.cursor()
|
||
for alias, file_path in databases.items():
|
||
attach_command = f"ATTACH DATABASE '{file_path}' AS {alias};"
|
||
cursor.execute(attach_command)
|
||
connection.commit()
|
||
|
||
|
||
def detach_databases(connection, aliases):
|
||
"""
|
||
从给定的 SQLite 连接中分离多个数据库。
|
||
|
||
参数:
|
||
- connection: SQLite连接
|
||
- aliases:要分离的数据库别名列表
|
||
"""
|
||
cursor = connection.cursor()
|
||
for alias in aliases:
|
||
detach_command = f"DETACH DATABASE {alias};"
|
||
cursor.execute(detach_command)
|
||
connection.commit()
|
||
|
||
|
||
def execute_sql(connection, sql, params=None):
|
||
"""
|
||
执行给定的SQL语句,返回结果。
|
||
参数:
|
||
- connection: SQLite连接
|
||
- sql:要执行的SQL语句
|
||
- params:SQL语句中的参数
|
||
"""
|
||
try:
|
||
# connection.text_factory = bytes
|
||
cursor = connection.cursor()
|
||
if params:
|
||
cursor.execute(sql, params)
|
||
else:
|
||
cursor.execute(sql)
|
||
return cursor.fetchall()
|
||
except Exception as e1:
|
||
try:
|
||
connection.text_factory = bytes
|
||
cursor = connection.cursor()
|
||
if params:
|
||
cursor.execute(sql, params)
|
||
else:
|
||
cursor.execute(sql)
|
||
rdata = cursor.fetchall()
|
||
connection.text_factory = str
|
||
return rdata
|
||
except Exception as e2:
|
||
print(f"**********\nSQL: {sql}\nparams: {params}\n{e1}\n{e2}\n**********")
|
||
return None
|
||
|
||
|
||
if __name__ == '__main__':
|
||
pass
|