PyWxDump/pywxdump/api/remote_server.py

663 lines
22 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
2024-08-17 13:51:44 +08:00
# Name: remote_server.py
# Description:
# Author: xaoyaoo
# Date: 2024/01/02
# -------------------------------------------------------------------------------
import os
import time
import shutil
2024-08-11 18:04:22 +08:00
from collections import Counter
2024-08-18 14:37:36 +08:00
from urllib.parse import quote, unquote
2024-08-17 11:51:57 +08:00
from typing import List, Optional
2024-08-11 18:04:22 +08:00
2024-08-17 11:51:57 +08:00
from pydantic import BaseModel
2024-08-18 14:37:36 +08:00
from fastapi import APIRouter, Response, Body, Query, Request
2024-08-17 11:51:57 +08:00
from starlette.responses import StreamingResponse, FileResponse
2024-08-17 11:51:57 +08:00
import pywxdump
2024-08-17 17:28:57 +08:00
from pywxdump import decrypt_merge, get_core_db
2024-08-17 13:51:44 +08:00
from pywxdump.db import DBHandler
from pywxdump.db.utils import download_file, dat2img
2024-08-17 13:51:44 +08:00
from .export import export_csv, export_json, export_html
2024-08-17 11:51:57 +08:00
from .rjson import ReJson, RqJson
2024-08-18 14:37:36 +08:00
from .utils import error9999, gc, asyncError9999, rs_loger
2024-08-17 11:51:57 +08:00
rs_api = APIRouter()
# 是否初始化
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/is_init', methods=["GET", 'POST'])
@error9999
def is_init():
"""
是否初始化
:return:
"""
2024-08-17 11:51:57 +08:00
local_wxids = gc.get_local_wxids()
if len(local_wxids) > 1:
return ReJson(0, True)
return ReJson(0, False)
# start 以下为聊天联系人相关api *******************************************************************************************
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/mywxid', methods=["GET", 'POST'])
@error9999
def mywxid():
"""
获取我的微信id
:return:
"""
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
return ReJson(0, {"my_wxid": my_wxid})
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/user_session_list', methods=["GET", 'POST'])
@error9999
def user_session_list():
"""
获取联系人列表
:return:
"""
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-13 22:59:46 +08:00
db = DBHandler(db_config, my_wxid=my_wxid)
ret = db.get_session_list()
return ReJson(0, list(ret.values()))
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/user_labels_dict', methods=["GET", 'POST'])
@error9999
def user_labels_dict():
"""
获取标签字典
:return:
"""
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-13 22:59:46 +08:00
db = DBHandler(db_config, my_wxid=my_wxid)
user_labels_dict = db.get_labels()
return ReJson(0, user_labels_dict)
2024-08-17 11:51:57 +08:00
@rs_api.post('/user_list')
@error9999
2024-08-17 11:51:57 +08:00
def user_list(word: str = "", wxids: List[str] = None, labels: List[str] = None):
"""
获取联系人列表可用于搜索
:return:
"""
if isinstance(wxids, str) and wxids == '' or wxids is None: wxids = []
if isinstance(labels, str) and labels == '' or labels is None: labels = []
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-13 22:59:46 +08:00
db = DBHandler(db_config, my_wxid=my_wxid)
users = db.get_user(word, wxids, labels)
return ReJson(0, users)
# end 以上为聊天联系人相关api *********************************************************************************************
# start 以下为聊天记录相关api *********************************************************************************************
2024-08-17 11:51:57 +08:00
@rs_api.post('/msg_count')
@error9999
2024-08-17 17:28:57 +08:00
def msg_count(wxids: Optional[List[str]] = Body(..., embed=True)):
2024-08-17 11:51:57 +08:00
"""
获取联系人的聊天记录数量
:return:
"""
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
db_config = gc.get_db_config()
db = DBHandler(db_config, my_wxid=my_wxid)
count = db.get_msgs_count(wxids)
return ReJson(0, count)
@rs_api.api_route('/msg_list', methods=["GET", 'POST'])
@error9999
2024-08-17 17:28:57 +08:00
def get_msgs(wxid: str = Body(...), start: int = Body(...), limit: int = Body(...)):
2024-08-17 11:51:57 +08:00
"""
获取联系人的聊天记录
:return:
"""
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
db_config = gc.get_conf(my_wxid, "db_config")
db = DBHandler(db_config, my_wxid=my_wxid)
2024-08-18 13:59:32 +08:00
msgs, users = db.get_msgs(wxids=wxid, start_index=start, page_size=limit)
2024-08-17 11:51:57 +08:00
return ReJson(0, {"msg_list": msgs, "user_list": users})
@rs_api.get('/imgsrc')
@asyncError9999
2024-08-18 14:37:36 +08:00
async def get_imgsrc(request: Request):
"""
获取图片,
1. 从网络获取图片主要功能只是下载图片缓存到本地
2. 读取本地图片
:return:
"""
2024-08-18 14:37:36 +08:00
imgsrc = unquote(str(request.query_params).replace("src=", "", 1))
if not imgsrc:
return ReJson(1002)
if imgsrc.startswith("FileStorage"): # 如果是本地图片文件则调用get_img
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
wx_path = gc.get_conf(my_wxid, "wx_path")
img_path = imgsrc.replace("\\\\", "\\")
2024-08-17 11:51:57 +08:00
img_tmp_path = os.path.join(gc.work_path, my_wxid, "img")
original_img_path = os.path.join(wx_path, img_path)
if os.path.exists(original_img_path):
rc, fomt, md5, out_bytes = dat2img(original_img_path)
if not rc:
return ReJson(1001, body=original_img_path)
imgsavepath = os.path.join(str(img_tmp_path), img_path + "_" + "".join([md5, fomt]))
if os.path.exists(imgsavepath):
2024-08-17 11:51:57 +08:00
return FileResponse(imgsavepath)
if not os.path.exists(os.path.dirname(imgsavepath)):
os.makedirs(os.path.dirname(imgsavepath))
with open(imgsavepath, "wb") as f:
f.write(out_bytes)
2024-08-17 11:51:57 +08:00
return Response(content=out_bytes, media_type="image/jpeg")
else:
2024-08-03 20:04:53 +08:00
return ReJson(1001, body=f"{original_img_path} not exists")
elif imgsrc.startswith("http://") or imgsrc.startswith("https://"):
# 将?后面的参数连接到imgsrc
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
img_tmp_path = os.path.join(gc.work_path, my_wxid, "imgsrc")
if not os.path.exists(img_tmp_path):
os.makedirs(img_tmp_path)
file_name = imgsrc.replace("http://", "").replace("https://", "").replace("/", "_").replace("?", "_")
file_name = file_name + ".jpg"
# 如果文件名过长,则将文件明分为目录和文件名
if len(file_name) > 255:
file_name = file_name[:255] + "/" + file_name[255:]
img_path_all = os.path.join(str(img_tmp_path), file_name)
if os.path.exists(img_path_all):
2024-08-17 11:51:57 +08:00
return FileResponse(img_path_all)
else:
2024-08-17 11:51:57 +08:00
# proxies = {
# "http": "http://127.0.0.1:10809",
# "https": "http://127.0.0.1:10809",
# }
proxies = None
download_file(imgsrc, img_path_all, proxies=proxies)
if os.path.exists(img_path_all):
2024-08-17 11:51:57 +08:00
return FileResponse(img_path_all)
else:
return ReJson(4004, body=imgsrc)
else:
return ReJson(1002, body=imgsrc)
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/video', methods=["GET", 'POST'])
2024-08-18 14:37:36 +08:00
def get_video(request: Request):
"""
2024-08-17 11:51:57 +08:00
获取视频
:return:
"""
2024-08-18 14:37:36 +08:00
videoPath = unquote(str(request.query_params).replace("src=", "", 1))
2024-08-17 11:51:57 +08:00
if not videoPath:
return ReJson(1002)
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
wx_path = gc.get_conf(my_wxid, "wx_path")
videoPath = videoPath.replace("\\\\", "\\")
2024-08-17 11:51:57 +08:00
video_tmp_path = os.path.join(gc.work_path, my_wxid, "video")
original_img_path = os.path.join(wx_path, videoPath)
if not os.path.exists(original_img_path):
return ReJson(5002)
# 复制文件到临时文件夹
2024-08-17 11:51:57 +08:00
assert isinstance(video_tmp_path, str)
video_save_path = os.path.join(video_tmp_path, videoPath)
if not os.path.exists(os.path.dirname(video_save_path)):
os.makedirs(os.path.dirname(video_save_path))
if os.path.exists(video_save_path):
2024-08-17 11:51:57 +08:00
return FileResponse(path=video_save_path)
shutil.copy(original_img_path, video_save_path)
2024-08-17 11:51:57 +08:00
return FileResponse(path=video_save_path)
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/audio', methods=["GET", 'POST'])
2024-08-18 14:37:36 +08:00
def get_audio(request: Request):
2024-08-17 11:51:57 +08:00
"""
获取语音
:return:
"""
2024-08-18 14:37:36 +08:00
savePath = unquote(str(request.query_params).replace("src=", "", 1)).replace("audio\\", "", 1)
2024-08-17 11:51:57 +08:00
if not savePath:
return ReJson(1002)
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-17 11:51:57 +08:00
savePath = os.path.join(gc.work_path, my_wxid, "audio", savePath) # 这个是从url中获取的
if os.path.exists(savePath):
2024-08-17 11:51:57 +08:00
assert isinstance(savePath, str)
return FileResponse(path=savePath, media_type='audio/mpeg')
MsgSvrID = savePath.split("_")[-1].replace(".wav", "")
if not savePath:
return ReJson(1002)
# 判断savePath路径的文件夹是否存在
if not os.path.exists(os.path.dirname(savePath)):
os.makedirs(os.path.dirname(savePath))
2024-08-13 22:59:46 +08:00
db = DBHandler(db_config, my_wxid=my_wxid)
wave_data = db.get_audio(MsgSvrID, is_play=False, is_wave=True, save_path=savePath, rate=24000)
if not wave_data:
return ReJson(1001, body="wave_data is required")
if os.path.exists(savePath):
2024-08-17 11:51:57 +08:00
assert isinstance(savePath, str)
return FileResponse(path=savePath, media_type='audio/mpeg')
else:
return ReJson(4004, body=savePath)
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/file_info', methods=["GET", 'POST'])
2024-08-17 17:28:57 +08:00
def get_file_info(file_path: str = Body(..., embed=True)):
if not file_path:
return ReJson(1002)
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
wx_path = gc.get_conf(my_wxid, "wx_path")
all_file_path = os.path.join(wx_path, file_path)
if not os.path.exists(all_file_path):
return ReJson(5002)
file_name = os.path.basename(all_file_path)
file_size = os.path.getsize(all_file_path)
return ReJson(0, {"file_name": file_name, "file_size": str(file_size)})
2024-08-17 11:51:57 +08:00
@rs_api.get('/file')
2024-08-18 14:37:36 +08:00
def get_file(request: Request):
2024-08-17 11:51:57 +08:00
"""
获取文件
:return:
"""
2024-08-18 14:37:36 +08:00
file_path = unquote(str(request.query_params).replace("src=", "", 1))
2024-08-17 11:51:57 +08:00
if not file_path:
return ReJson(1002)
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
wx_path = gc.get_conf(my_wxid, "wx_path")
2024-08-17 11:51:57 +08:00
all_file_path = os.path.join(wx_path, file_path)
if not os.path.exists(all_file_path):
return ReJson(5002)
2024-08-17 11:51:57 +08:00
def file_iterator(file_path, chunk_size=8192):
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
headers = {
"Content-Disposition": f'attachment; filename*=UTF-8\'\'{quote(os.path.basename(all_file_path))}',
}
return StreamingResponse(file_iterator(all_file_path), media_type="application/octet-stream", headers=headers)
# end 以上为聊天记录相关api *********************************************************************************************
# start 导出聊天记录 *****************************************************************************************************
2024-08-17 11:51:57 +08:00
class ExportEndbRequest(BaseModel):
wx_path: str = ""
outpath: str = ""
key: str = ""
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/export_endb', methods=["GET", 'POST'])
def get_export_endb(request: ExportEndbRequest):
"""
导出加密数据库
:return:
"""
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
wx_path = request.wx_path
if not wx_path:
2024-08-17 11:51:57 +08:00
wx_path = gc.get_conf(my_wxid, "wx_path")
2024-08-11 22:46:40 +08:00
if not os.path.exists(wx_path if wx_path else ""):
return ReJson(1002, body=f"wx_path is required: {wx_path}")
# 分割wx_path的文件名和父目录
code, wxdbpaths = get_core_db(wx_path)
if not code:
return ReJson(2001, body=wxdbpaths)
2024-08-17 11:51:57 +08:00
outpath = os.path.join(gc.work_path, "export", my_wxid, "endb")
if not os.path.exists(outpath):
os.makedirs(outpath)
for wxdb in wxdbpaths:
# 复制wxdb->outpath, os.path.basename(wxdb)
assert isinstance(outpath, str) # 为了解决pycharm的警告, 无实际意义
2024-08-11 22:46:40 +08:00
wxdb_path = wxdb.get("db_path")
shutil.copy(wxdb_path, os.path.join(outpath, os.path.basename(wxdb_path)))
return ReJson(0, body=outpath)
2024-08-17 11:51:57 +08:00
class ExportDedbRequest(BaseModel):
wx_path: str = ""
outpath: str = ""
key: str = ""
@rs_api.api_route('/export_dedb', methods=["GET", "POST"])
def get_export_dedb(request: ExportDedbRequest):
"""
导出解密数据库
:return:
"""
2024-08-17 11:51:57 +08:00
key = request.key
wx_path = request.wx_path
2024-08-11 22:46:40 +08:00
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-11 22:46:40 +08:00
if not key:
2024-08-17 11:51:57 +08:00
key = gc.get_conf(my_wxid, "key")
2024-08-11 22:46:40 +08:00
if not wx_path:
2024-08-17 11:51:57 +08:00
wx_path = gc.get_conf(my_wxid, "wx_path")
if not key:
return ReJson(1002, body=f"key is required: {key}")
if not wx_path:
return ReJson(1002, body=f"wx_path is required: {wx_path}")
if not os.path.exists(wx_path):
return ReJson(1001, body=f"wx_path not exists: {wx_path}")
2024-08-17 11:51:57 +08:00
outpath = os.path.join(gc.work_path, "export", my_wxid, "dedb")
if not os.path.exists(outpath):
os.makedirs(outpath)
2024-08-11 22:46:40 +08:00
assert isinstance(outpath, str)
code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=outpath)
time.sleep(1)
if code:
return ReJson(0, body=merge_save_path)
else:
return ReJson(2001, body=merge_save_path)
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/export_csv', methods=["GET", 'POST'])
2024-08-17 17:28:57 +08:00
def get_export_csv(wxid: str = Body(..., embed=True)):
"""
导出csv
:return:
"""
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
# st_ed_time = request.json.get("datetime", [0, 0])
if not wxid:
return ReJson(1002, body=f"username is required: {wxid}")
# if not isinstance(st_ed_time, list) or len(st_ed_time) != 2:
# return ReJson(1002, body=f"datetime is required: {st_ed_time}")
# start, end = st_ed_time
# if not isinstance(start, int) or not isinstance(end, int) or start >= end:
# return ReJson(1002, body=f"datetime is required: {st_ed_time}")
2024-08-17 11:51:57 +08:00
outpath = os.path.join(gc.work_path, "export", my_wxid, "csv", wxid)
if not os.path.exists(outpath):
os.makedirs(outpath)
2024-08-13 22:59:46 +08:00
code, ret = export_csv(wxid, outpath, db_config, my_wxid=my_wxid)
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/export_json', methods=["GET", 'POST'])
2024-08-17 17:28:57 +08:00
def get_export_json(wxid: str = Body(..., embed=True)):
"""
导出json
:return:
"""
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
if not wxid:
return ReJson(1002, body=f"username is required: {wxid}")
2024-08-17 11:51:57 +08:00
outpath = os.path.join(gc.work_path, "export", my_wxid, "json", wxid)
if not os.path.exists(outpath):
os.makedirs(outpath)
2024-08-13 22:59:46 +08:00
code, ret = export_json(wxid, outpath, db_config, my_wxid=my_wxid)
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
2024-08-17 11:51:57 +08:00
class ExportHtmlRequest(BaseModel):
wxid: str
@rs_api.api_route('/export_html', methods=["GET", 'POST'])
2024-08-17 17:28:57 +08:00
def get_export_html(wxid: str = Body(..., embed=True)):
2024-08-13 23:54:21 +08:00
"""
导出json
:return:
"""
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
2024-08-13 23:54:21 +08:00
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-13 23:54:21 +08:00
if not wxid:
return ReJson(1002, body=f"username is required: {wxid}")
2024-08-17 11:51:57 +08:00
html_outpath = os.path.join(gc.work_path, "export", my_wxid, "html")
2024-08-13 23:54:21 +08:00
if not os.path.exists(html_outpath):
os.makedirs(html_outpath)
assert isinstance(html_outpath, str)
outpath = os.path.join(html_outpath, wxid)
if os.path.exists(outpath):
shutil.rmtree(outpath, ignore_errors=True)
# 复制pywxdump/ui/web/*到outpath
web_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "ui", "web")
shutil.copytree(web_path, outpath)
code, ret = export_html(wxid, outpath, db_config, my_wxid=my_wxid)
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
# end 导出聊天记录 *******************************************************************************************************
# start 聊天记录分析api **************************************************************************************************
2024-08-17 11:51:57 +08:00
class DateCountRequest(BaseModel):
wxid: str = ""
start_time: int = 0
end_time: int = 0
time_format: str = "%Y-%m-%d"
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/date_count', methods=["GET", 'POST'])
def get_date_count(request: DateCountRequest):
"""
获取日期统计
2024-08-05 23:01:09 +08:00
:return:
"""
2024-08-17 11:51:57 +08:00
wxid = request.wxid
start_time = request.start_time
end_time = request.end_time
time_format = request.time_format
2024-08-05 23:01:09 +08:00
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-13 22:59:46 +08:00
db = DBHandler(db_config, my_wxid=my_wxid)
2024-08-17 11:51:57 +08:00
date_count = db.get_date_count(wxid=wxid, start_time=start_time, end_time=end_time, time_format=time_format)
return ReJson(0, date_count)
2024-08-17 11:51:57 +08:00
class TopTalkerCountRequest(BaseModel):
top: int = 10
start_time: int = 0
end_time: int = 0
@rs_api.api_route('/top_talker_count', methods=["GET", 'POST'])
def get_top_talker_count(request: TopTalkerCountRequest):
"""
获取最多聊天的人
:return:
"""
2024-08-17 11:51:57 +08:00
top = request.top
start_time = request.start_time
end_time = request.end_time
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-13 22:59:46 +08:00
date_count = DBHandler(db_config, my_wxid=my_wxid).get_top_talker_count(top=top, start_time=start_time,
end_time=end_time)
return ReJson(0, date_count)
2024-08-17 11:51:57 +08:00
class WordCloudRequest(BaseModel):
target: str = "signature"
2024-08-11 18:04:22 +08:00
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/wordcloud', methods=["GET", 'POST'])
@error9999
def get_wordcloud(request: WordCloudRequest):
2024-08-11 18:04:22 +08:00
try:
import jieba
except ImportError:
return ReJson(9999, body="jieba is required")
2024-08-17 11:51:57 +08:00
target = request.target
2024-08-11 18:04:22 +08:00
if not target:
return ReJson(1002, body="target is required")
2024-08-11 18:04:35 +08:00
2024-08-17 11:51:57 +08:00
my_wxid = gc.get_conf(gc.at, "last")
2024-08-11 18:04:22 +08:00
if not my_wxid: return ReJson(1001, body="my_wxid is required")
2024-08-17 11:51:57 +08:00
db_config = gc.get_conf(my_wxid, "db_config")
2024-08-13 22:59:46 +08:00
db = DBHandler(db_config, my_wxid=my_wxid)
2024-08-11 18:04:22 +08:00
if target == "signature":
users = db.get_user()
signature_list = []
for wxid, user in users.items():
ExtraBuf = user.get("ExtraBuf", {})
signature = ExtraBuf.get("个性签名", "") if ExtraBuf else ""
if signature:
signature_list.append(signature)
signature_str = " ".join(signature_list)
words = jieba.lcut(signature_str)
words = [word for word in words if len(word) > 1]
count_dict = dict(Counter(words))
return ReJson(0, count_dict)
elif target == "nickname":
users = db.get_user()
nickname_list = []
for wxid, user in users.items():
nickname = user.get("nickname", "")
if nickname:
nickname_list.append(nickname)
nickname_str = " ".join(nickname_list)
words = jieba.lcut(nickname_str)
words = [word for word in words if len(word) > 1]
count_dict = dict(Counter(words))
return ReJson(0, count_dict)
return ReJson(1002, body="target is required")
2024-08-05 23:26:08 +08:00
# end 聊天记录分析api ****************************************************************************************************
# 关于、帮助、设置 *******************************************************************************************************
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/check_update', methods=["GET", 'POST'])
@error9999
def check_update():
"""
检查更新
:return:
"""
url = "https://api.github.com/repos/xaoyaoo/PyWxDump/tags"
try:
import requests
res = requests.get(url)
if res.status_code == 200:
data = res.json()
NEW_VERSION = data[0].get("name")
if NEW_VERSION[1:] != pywxdump.__version__:
msg = "有新版本"
else:
msg = "已经是最新版本"
return ReJson(0, body={"msg": msg, "latest_version": NEW_VERSION,
"latest_url": "https://github.com/xaoyaoo/PyWxDump/releases/tag/" + NEW_VERSION})
else:
return ReJson(2001, body="status_code is not 200")
except Exception as e:
return ReJson(9999, msg=str(e))
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/version', methods=["GET", "POST"])
@error9999
def version():
"""
版本
:return:
"""
return ReJson(0, pywxdump.__version__)
2024-08-17 11:51:57 +08:00
@rs_api.api_route('/get_readme', methods=["GET", 'POST'])
2024-08-05 19:06:59 +08:00
@error9999
def get_readme():
"""
版本
:return:
"""
url = "https://raw.githubusercontent.com/xaoyaoo/PyWxDump/master/doc/README_CN.md"
import requests
res = requests.get(url)
if res.status_code == 200:
data = res.text
2024-08-05 23:01:09 +08:00
data = data.replace("# <center>PyWxDump</center>", "")
2024-08-05 19:06:59 +08:00
return ReJson(0, body=data)
else:
return ReJson(2001, body="status_code is not 200")
# END 关于、帮助、设置 ***************************************************************************************************