增加新的使用方法,加快访问速度,合并相似的功能

This commit is contained in:
xaoyaoo 2024-04-19 12:23:56 +08:00
parent 57b773d391
commit 58609ee7aa
10 changed files with 195 additions and 61 deletions

View File

@ -26,4 +26,4 @@ except:
PYWXDUMP_ROOT_PATH = os.path.dirname(__file__)
db_init = DBPool("DBPOOL_INIT")
__version__ = "2.4.71"
__version__ = "3.0.0"

View File

@ -31,7 +31,7 @@ def ReJson(code: int, body: [dict, list] = None, msg: str = None, error: str = N
}
rjson = situation.get(code, {'code': 9999, 'body': None, 'msg': "code错误", "extra": {}})
if code != 0:
logging.warning((code, rjson['body'], msg if msg else None))
logging.warning(f"\n{code} \n{rjson['body']}\n{msg if msg else None}")
if body:
rjson['body'] = body
if msg:

View File

@ -9,29 +9,68 @@ import base64
import json
import logging
import os
import re
import traceback
from .rjson import ReJson
from functools import wraps
def read_session(session_file, arg):
with open(session_file, 'r') as f:
session = json.load(f)
return session.get(arg, "")
def save_session(session_file, arg, value):
def read_session(session_file, wxid, arg):
try:
with open(session_file, 'r') as f:
session = json.load(f)
except:
except FileNotFoundError:
logging.error(f"Session file not found: {session_file}")
return None
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return None
return session.get(wxid, {}).get(arg, None)
def get_session_wxids(session_file):
try:
with open(session_file, 'r') as f:
session = json.load(f)
except FileNotFoundError:
logging.error(f"Session file not found: {session_file}")
return None
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return None
return list(session.keys())
def save_session(session_file, wxid, arg, value):
try:
with open(session_file, 'r') as f:
session = json.load(f)
except FileNotFoundError:
session = {}
session[arg] = value
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return False
if wxid not in session:
session[wxid] = {}
session[wxid][arg] = value
try:
with open(session_file, 'w') as f:
json.dump(session, f, indent=4)
except Exception as e:
logging.error(f"Error writing to file: {e}")
return False
return True
def validate_title(title):
"""
校验文件名是否合法
"""
rstr = r"[\/\\\:\*\?\"\<\>\|\.]" # '/ \ : * ? " < > |'
new_title = re.sub(rstr, "_", title) # 替换为下划线
return new_title
def error9999(func):
@wraps(func)
def wrapper(*args, **kwargs):
@ -40,9 +79,12 @@ def error9999(func):
except Exception as e:
traceback_data = traceback.format_exc()
rdata = f"{traceback_data}"
# logging.error(rdata)
return ReJson(9999, body=rdata)
return wrapper
def gen_base64(path):
# 获取文件名后缀
extension = os.path.splitext(path)[1]

View File

@ -12,11 +12,13 @@ import logging
class DatabaseBase:
_singleton_instances = {} # 使用字典存储不同db_path对应的单例实例
_connection_pool = {} # 使用字典存储不同db_path对应的连接池
_class_name = "DatabaseBase"
def __new__(cls, db_path):
if db_path not in cls._singleton_instances:
cls._singleton_instances[db_path] = super().__new__(cls)
return cls._singleton_instances[db_path]
if cls._class_name not in cls._singleton_instances:
cls._singleton_instances[cls._class_name] = super().__new__(cls)
return cls._singleton_instances[cls._class_name]
def __init__(self, db_path):
self._db_path = db_path
@ -26,6 +28,8 @@ class DatabaseBase:
def _connect_to_database(cls, db_path):
if not os.path.exists(db_path):
raise FileNotFoundError(f"文件不存在: {db_path}")
if db_path in cls._connection_pool and cls._connection_pool[db_path] is not None:
return cls._connection_pool[db_path]
connection = sqlite3.connect(db_path, check_same_thread=False)
logging.info(f"{connection} 连接句柄创建 {db_path}")
return connection
@ -70,7 +74,7 @@ class DatabaseBase:
def __del__(self):
self.close_connection()
del self._singleton_instances[self._db_path]
# del self._singleton_instances[self._db_path]
if __name__ == '__main__':

View File

@ -17,6 +17,7 @@ import blackboxprotobuf
class ParsingMSG(DatabaseBase):
_class_name = "MSG"
def __init__(self, db_path):
super().__init__(db_path)
@ -45,24 +46,26 @@ class ParsingMSG(DatabaseBase):
except Exception as e:
return None
def chat_count(self, wxid: str = ""):
def msg_count(self, wxid: str = ""):
"""
获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量不传wxid则获取所有联系人的聊天记录数量
:param MSG_db_path: MSG.db 文件路径
:return: 聊天记录数量列表
"""
if wxid:
sql = f"SELECT StrTalker,COUNT(*) FROM MSG WHERE StrTalker='{wxid}';"
sql = f"SELECT StrTalker, COUNT(*) FROM MSG WHERE StrTalker='{wxid}';"
else:
sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
result = self.execute_sql(sql)
df = pd.DataFrame(result, columns=["wxid", "chat_count"])
df = pd.DataFrame(result, columns=["wxid", "msg_count"])
# # 排序
df = df.sort_values(by="msg_count", ascending=False)
# chat_counts {wxid: chat_count}
chat_counts = df.set_index("wxid").to_dict()["chat_count"]
chat_counts = df.set_index("wxid").to_dict()["msg_count"]
return chat_counts
def chat_count_total(self):
def msg_count_total(self):
"""
获取聊天记录总数
:return: 聊天记录总数
@ -255,18 +258,11 @@ class ParsingMSG(DatabaseBase):
"FROM MSG ORDER BY CreateTime ASC LIMIT ?,?")
result1 = self.execute_sql(sql, (start_index, page_size))
# df = pd.DataFrame(result1, columns=[
# 'localId', 'IsSender', 'StrContent', 'StrTalker', 'Sequence', 'Type', 'SubType', 'CreateTime', 'MsgSvrID',
# 'DisplayContent', 'CompressContent', 'BytesExtra', 'id'
# ])
# df['msg_detail'] = df.apply(lambda row: self.msg_detail(row), axis=1)
# return df['msg_detail'].tolist()
data = []
wxid_list = []
for row in result1:
data.append(self.msg_detail(row))
return data
# return rdata
tmpdata = self.msg_detail(row)
wxid_list.append(tmpdata["talker"])
data.append(tmpdata)
wxid_list = list(set(wxid_list))
return data, wxid_list

View File

@ -10,13 +10,13 @@ from .utils import silk2audio
class ParsingMediaMSG(DatabaseBase):
_class_name = "MediaMSG"
def __init__(self, db_path):
super().__init__(db_path)
def get_audio(self, MsgSvrID, is_play=False, is_wave=False, save_path=None, rate=24000):
sql = "select Buf from Media where Reserved0={}".format(MsgSvrID)
DBdata = self.execute_sql(sql)
sql = "select Buf from Media where Reserved0=? "
DBdata = self.execute_sql(sql, (MsgSvrID,))
if len(DBdata) == 0:
return False
data = DBdata[0][0] # [1:] + b'\xFF\xFF'

View File

@ -6,51 +6,93 @@
# Date: 2024/04/15
# -------------------------------------------------------------------------------
from .dbbase import DatabaseBase
from .utils import timestamp2str
class ParsingMicroMsg(DatabaseBase):
_class_name = "MicroMsg"
def __init__(self, db_path):
super().__init__(db_path)
def wxid2userinfo(self, wx_id):
def wxid2userinfo(self, wxid):
"""
获取单个联系人信息
:param wx_id: 微信id
:param wxid: 微信id
:return: 联系人信息
"""
if isinstance(wxid, str):
wxid = [wxid]
elif isinstance(wxid, list):
wxid = wxid
else:
return {}
wxid = "','".join(wxid)
wxid = f"'{wxid}'"
# 获取username是wx_id的用户
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
"FROM Contact A,ContactHeadImgUrl B "
f"WHERE A.UserName = '{wx_id}' AND A.UserName = B.usrName "
f"WHERE A.UserName = B.usrName AND A.UserName in ({wxid}) "
"ORDER BY NickName ASC;")
result = self.execute_sql(sql)
if not result:
return None
result = result[0]
return {"wxid": result[0], "nickname": result[1], "remark": result[2], "account": result[3],
"describe": result[4], "headImgUrl": result[5]}
return {}
users = {}
for row in result:
# 获取wxid,昵称,备注,描述,头像
username, nickname, remark, Alias, describe, headImgUrl = row
users[username] = {"wxid": username, "nickname": nickname, "remark": remark, "account": Alias,
"describe": describe, "headImgUrl": headImgUrl}
return users
def user_list(self):
def user_list(self, word=None):
"""
获取联系人列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
:return: 联系人列表
"""
users = []
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
"FROM Contact A,ContactHeadImgUrl B "
"where UserName==usrName "
"ORDER BY NickName ASC;")
sql = (
"SELECT A.UserName, A.NickName, A.Remark,A.Alias,A.Reserved6,B.bigHeadImgUrl "
"FROM Contact A left join ContactHeadImgUrl B on A.UserName==B.usrName "
"ORDER BY A.NickName DESC;")
if word:
sql = sql.replace("ORDER BY A.NickName DESC;",
f"where "
f"A.UserName LIKE '%{word}%' "
f"OR A.NickName LIKE '%{word}%' "
f"OR A.Remark LIKE '%{word}%' "
f"OR A.Alias LIKE '%{word}%' "
# f"OR A.Reserved6 LIKE '%{word}%' "
"ORDER BY A.NickName DESC;")
result = self.execute_sql(sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
# 获取wxid,昵称,备注,描述,头像
username, nickname, remark, Alias, describe, headImgUrl = row
users.append(
{"wxid": username, "nickname": nickname, "remark": remark, "account": Alias,
"describe": describe, "headImgUrl": headImgUrl})
return users
def recent_chat_wxid(self):
"""
获取最近聊天的联系人
:return: 最近聊天的联系人
"""
users = []
sql = (
"SELECT C.Username, C.LastReadedCreateTime,C.LastReadedSvrId "
"FROM ChatInfo C "
"ORDER BY C.LastReadedCreateTime DESC;")
result = self.execute_sql(sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
username, LastReadedCreateTime, LastReadedSvrId = row
LastReadedCreateTime = timestamp2str(LastReadedCreateTime / 1000) if LastReadedCreateTime else None
users.append(
{"wxid": username, "LastReadedCreateTime": LastReadedCreateTime, "LastReadedSvrId": LastReadedSvrId})
return users
def chatroom_list(self):
"""
获取群聊列表
@ -73,3 +115,5 @@ class ParsingMicroMsg(DatabaseBase):
{"ChatRoomName": ChatRoomName, "UserNameList": UserNameList, "DisplayNameList": DisplayNameList,
"Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor})
return rooms

View File

@ -9,10 +9,43 @@ from .dbbase import DatabaseBase
class ParsingOpenIMContact(DatabaseBase):
_class_name = "OpenIMContact"
def __init__(self, db_path):
super().__init__(db_path)
def user_list(self):
def wxid2userinfo(self, wxid):
"""
获取单个联系人信息
:param wxid: 微信id
:return: 联系人信息
"""
if isinstance(wxid, str):
wxid = [wxid]
elif isinstance(wxid, list):
wxid = wxid
else:
return {}
wxid = "','".join(wxid)
wxid = f"'{wxid}'"
# 获取username是wx_id的用户
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.BigHeadImgUrl "
"FROM OpenIMContact A "
f"WHERE A.UserName in ({wxid}) "
"ORDER BY NickName ASC;")
result = self.execute_sql(sql)
if not result:
return {}
users = {}
for row in result:
# 获取用户名、昵称、备注和聊天记录数量
username, nickname, remark, headImgUrl = row
users[username] = {"wxid": username, "nickname": nickname, "remark": remark, "account": "", "describe": "",
"headImgUrl": headImgUrl}
return users
def user_list(self, word=None):
"""
获取联系人列表
:param MicroMsg_db_path: MicroMsg.db 文件路径
@ -21,6 +54,13 @@ class ParsingOpenIMContact(DatabaseBase):
users = []
sql = ("SELECT A.UserName, A.NickName, A.Remark,A.BigHeadImgUrl FROM OpenIMContact A "
"ORDER BY NickName ASC;")
if word:
sql = sql.replace("ORDER BY NickName ASC;",
f"where "
f"UserName LIKE '%{word}%' "
f"OR NickName LIKE '%{word}%' "
f"OR Remark LIKE '%{word}%' "
"ORDER BY NickName ASC;")
result = self.execute_sql(sql)
for row in result:
# 获取用户名、昵称、备注和聊天记录数量

View File

@ -6,6 +6,7 @@
# Date: 2024/04/15
# -------------------------------------------------------------------------------
import hashlib
import os
import re
import time
import wave
@ -238,6 +239,9 @@ def download_file(url, save_path=None):
return None
data = r.content
if save_path and isinstance(save_path, str):
# 创建文件夹
if not os.path.exists(os.path.dirname(save_path)):
os.makedirs(os.path.dirname(save_path))
with open(save_path, "wb") as f:
f.write(data)
return data
@ -334,6 +338,8 @@ def silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=2400
play_audio(pcm_data, rate)
print(is_play, is_wave, save_path)
if is_wave: # 转换为wav文件
wave_file = BytesIO() # 创建wav文件
with wave.open(wave_file, 'wb') as wf:
@ -344,6 +350,7 @@ def silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=2400
if save_path and isinstance(save_path, str):
with open(save_path, "wb") as f:
f.write(rdata)
print('saved wav file')
return rdata
return pcm_data

View File

@ -75,13 +75,14 @@ def start_falsk(merge_path="", msg_path="", micro_path="", media_path="", wx_pat
g.tmp_path = tmp_path # 临时文件夹,用于存放图片等
g.sf = session_file # 用于存放各种基础信息
if msg_path: save_session(session_file, "msg_path", msg_path)
if micro_path: save_session(session_file, "micro_path", micro_path)
if media_path: save_session(session_file, "media_path", media_path)
if wx_path: save_session(session_file, "wx_path", wx_path)
if key: save_session(session_file, "key", key)
if my_wxid: save_session(session_file, "my_wxid", my_wxid)
save_session(session_file, "test", my_wxid)
if msg_path: save_session(session_file, "test", "msg_path", msg_path)
if micro_path: save_session(session_file, "test", "micro_path", micro_path)
if media_path: save_session(session_file, "test", "media_path", media_path)
if wx_path: save_session(session_file, "test", "wx_path", wx_path)
if key: save_session(session_file, "test", "key", key)
if my_wxid: save_session(session_file, "test", "my_wxid", my_wxid)
if not os.path.exists(session_file):
save_session(session_file, "test", "last", my_wxid)
app.register_blueprint(api)
if isopenBrowser: