PyWxDump/pywxdump/api/api.py
2024-02-24 00:57:03 +08:00

680 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: chat_api.py
# Description:
# Author: xaoyaoo
# Date: 2024/01/02
# -------------------------------------------------------------------------------
import base64
import json
import logging
import os
import time
import shutil
from flask import Flask, request, render_template, g, Blueprint, send_file, make_response, session
from pywxdump import analyzer, read_img_dat, read_audio, get_wechat_db, get_core_db
from pywxdump.analyzer.export_chat import get_contact, get_room_user_list
from pywxdump.api.rjson import ReJson, RqJson
from pywxdump.api.utils import read_session, save_session, error9999
from pywxdump import read_info, VERSION_LIST, batch_decrypt, BiasAddr, merge_db, decrypt_merge, merge_real_time_db
import pywxdump
# app = Flask(__name__, static_folder='../ui/web/dist', static_url_path='/')
api = Blueprint('api', __name__, template_folder='../ui/web', static_folder='../ui/web/assets/', )
api.debug = False
@api.route('/api/init', methods=["GET", 'POST'])
@error9999
def init():
"""
初始化 设置微信数据库路径,图片路径,解密需要的数据库
:return:
"""
msg_path = request.json.get("msg_path", "").strip().strip("'").strip('"')
micro_path = request.json.get("micro_path", "").strip().strip("'").strip('"')
media_path = request.json.get("media_path", "").strip().strip("'").strip('"')
wx_path = request.json.get("wx_path", "").strip().strip("'").strip('"')
key = request.json.get("key", "").strip().strip("'").strip('"')
my_wxid = request.json.get("my_wxid", "").strip().strip("'").strip('"')
init_type = request.json.get("init_type", "").strip()
if init_type == "last":
save_msg_path = read_session(g.sf, "msg_path")
save_micro_path = read_session(g.sf, "micro_path")
save_my_wxid = read_session(g.sf, "my_wxid")
if save_msg_path and save_micro_path and os.path.exists(save_msg_path) and os.path.exists(
save_micro_path):
try:
a = get_real_time_msg()
except Exception as e:
pass
return ReJson(0, {"msg_path": save_msg_path, "micro_path": save_micro_path, "is_init": True})
else:
return ReJson(1002, body="上次初始化的路径不存在")
if key: # 如果key不为空表示是解密模式
if not wx_path:
return ReJson(1002)
if not os.path.exists(wx_path):
return ReJson(1001, body=wx_path)
out_path = os.path.join(g.tmp_path, "decrypted", my_wxid) if my_wxid else os.path.join(g.tmp_path, "decrypted")
if os.path.exists(out_path):
shutil.rmtree(out_path)
code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=out_path)
time.sleep(1)
if code:
save_session(g.sf, "msg_path", merge_save_path)
save_session(g.sf, "micro_path", merge_save_path)
save_session(g.sf, "media_path", merge_save_path)
save_session(g.sf, "wx_path", wx_path)
save_session(g.sf, "key", key)
save_session(g.sf, "my_wxid", my_wxid)
rdata = {
"msg_path": merge_save_path,
"micro_path": merge_save_path,
"media_path": merge_save_path,
"wx_path": wx_path,
"key": key,
"my_wxid": my_wxid,
"is_init": True,
}
return ReJson(0, rdata)
else:
return ReJson(2001, body=merge_save_path)
else:
if not msg_path or not micro_path:
return ReJson(1002, body="msg_path and micro_path is required")
if not os.path.exists(msg_path) or not os.path.exists(micro_path):
return ReJson(1001)
save_session(g.sf, "msg_path", msg_path)
save_session(g.sf, "micro_path", micro_path)
save_session(g.sf, "media_path", media_path)
save_session(g.sf, "wx_path", wx_path)
save_session(g.sf, "key", "")
save_session(g.sf, "my_wxid", my_wxid)
rdata = {
"msg_path": msg_path,
"micro_path": micro_path,
"media_path": media_path,
"wx_path": wx_path,
"key": "",
"my_wxid": my_wxid,
"is_init": True,
}
return ReJson(0, rdata)
@api.route('/api/version', methods=["GET", 'POST'])
@error9999
def version():
"""
版本
:return:
"""
return ReJson(0, pywxdump.__version__)
@api.route('/api/contact_list', methods=["GET", 'POST'])
@error9999
def contact_list():
"""
获取联系人列表
:return:
"""
# 获取联系人列表
# 从header中读取micro_path
micro_path = request.headers.get("micro_path")
if not micro_path:
micro_path = read_session(g.sf, "micro_path")
start = request.json.get("start")
limit = request.json.get("limit")
contact_list = analyzer.get_contact_list(micro_path)
# save_session(g.sf, "user_list", contact_list)
if limit:
contact_list = contact_list[int(start):int(start) + int(limit)]
return ReJson(0, contact_list)
@api.route('/api/chat_count', methods=["GET", 'POST'])
def chat_count():
"""
获取每个联系人的聊天记录数量
:return:
"""
try:
# 获取联系人列表
# 从header中读取micro_path
msg_path = request.headers.get("msg_path")
if not msg_path:
msg_path = read_session(g.sf, "msg_path")
username = request.json.get("username", "")
contact_list = analyzer.get_chat_count(msg_path, username)
return ReJson(0, contact_list)
except Exception as e:
return ReJson(9999, msg=str(e))
@api.route('/api/contact_count_list', methods=["GET", 'POST'])
def contact_count_list():
"""
获取联系人列表以及聊天记录数量
:return:
"""
try:
# 获取联系人列表
# 从header中读取micro_path
msg_path = request.headers.get("msg_path")
micro_path = request.headers.get("micro_path")
if not msg_path:
msg_path = read_session(g.sf, "msg_path")
if not micro_path:
micro_path = read_session(g.sf, "micro_path")
start = request.json.get("start")
limit = request.json.get("limit")
word = request.json.get("word", "")
contact_list = analyzer.get_contact_list(micro_path)
chat_count = analyzer.get_chat_count(msg_path)
for contact in contact_list:
contact["chat_count"] = chat_count.get(contact["username"], 0)
# 去重
contact_list = [dict(t) for t in {tuple(d.items()) for d in contact_list}]
# 降序
contact_list = sorted(contact_list, key=lambda x: x["chat_count"], reverse=True)
# save_session(g.sf, "user_list", contact_list)
if word and word != "" and word != "undefined" and word != "null":
contact_list = [contact for contact in contact_list if
word in contact["account"] or word in contact["describe"] or word in contact[
"nickname"] or word in contact["remark"] or word in contact["username"]]
if limit:
contact_list = contact_list[int(start):int(start) + int(limit)]
return ReJson(0, contact_list)
except Exception as e:
return ReJson(9999, msg=str(e))
@api.route('/api/msgs_user_list', methods=['GET', 'POST'])
@error9999
def get_msg_user_list():
"""
获取消息联系人列表
:return:
"""
msg_path = request.headers.get("msg_path")
micro_path = request.headers.get("micro_path")
if not msg_path:
msg_path = read_session(g.sf, "msg_path")
if not micro_path:
micro_path = read_session(g.sf, "micro_path")
wxid = request.json.get("wxid")
# msg_list = analyzer.get_msg_list(msg_path, wxid, start_index=start, page_size=limit)
my_wxid = read_session(g.sf, "my_wxid")
userlist = []
if wxid.endswith("@chatroom"):
# 群聊
userlist = get_room_user_list(msg_path, wxid)
else:
# 单聊
user = get_contact(micro_path, wxid)
my_user = get_contact(micro_path, my_wxid)
userlist.append(user)
userlist.append(my_user)
return ReJson(0, {"user_list": userlist})
@api.route('/api/msgs_list', methods=['GET', 'POST'])
@error9999
def get_msg_list():
msg_path = request.headers.get("msg_path")
micro_path = request.headers.get("micro_path")
if not msg_path:
msg_path = read_session(g.sf, "msg_path")
if not micro_path:
micro_path = read_session(g.sf, "micro_path")
start = request.json.get("start")
limit = request.json.get("limit")
wxid = request.json.get("wxid")
my_wxid = read_session(g.sf, "my_wxid")
msg_list = analyzer.get_msg_list(msg_path, wxid, start_index=start, page_size=limit)
return ReJson(0, {"msg_list": msg_list, 'my_wxid': my_wxid})
def func_get_msgs(start, limit, wxid, msg_path, micro_path):
if not msg_path:
msg_path = read_session(g.sf, "msg_path")
if not micro_path:
micro_path = read_session(g.sf, "micro_path")
msg_list = analyzer.get_msg_list(msg_path, wxid, start_index=start, page_size=limit)
# row_data = {"MsgSvrID": MsgSvrID, "type_name": type_name, "is_sender": IsSender, "talker": talker,
# "room_name": StrTalker, "content": content, "CreateTime": CreateTime}
contact_list = analyzer.get_contact_list(micro_path)
userlist = {}
my_wxid = read_session(g.sf, "my_wxid")
if wxid.endswith("@chatroom"):
# 群聊
talkers = [msg["talker"] for msg in msg_list] + [wxid, my_wxid]
talkers = list(set(talkers))
for user in contact_list:
if user["username"] in talkers:
userlist[user["username"]] = user
else:
# 单聊
for user in contact_list:
if user["username"] == wxid or user["username"] == my_wxid:
userlist[user["username"]] = user
if len(userlist) == 2:
break
return {"msg_list": msg_list, "user_list": userlist, "my_wxid": my_wxid}
@api.route('/api/msgs', methods=["GET", 'POST'])
@error9999
def get_msgs():
msg_path = request.headers.get("msg_path")
micro_path = request.headers.get("micro_path")
if not msg_path:
msg_path = read_session(g.sf, "msg_path")
if not micro_path:
micro_path = read_session(g.sf, "micro_path")
start = request.json.get("start")
limit = request.json.get("limit")
wxid = request.json.get("wxid")
rdata = func_get_msgs(start, limit, wxid, msg_path, micro_path)
return ReJson(0, rdata)
@api.route('/api/realtimemsg', methods=["GET", "POST"])
@error9999
def get_real_time_msg():
"""
获取实时消息 使用 merge_real_time_db()函数
:return:
"""
save_msg_path = read_session(g.sf, "msg_path")
save_media_path = read_session(g.sf, "media_path")
wx_path = read_session(g.sf, "wx_path")
key = read_session(g.sf, "key")
if not save_msg_path or not save_media_path or not wx_path or not key:
return ReJson(1002, body="msg_path or media_path or wx_path or key is required")
media_paths = get_core_db(wx_path, ["MediaMSG"])
msg_paths = get_core_db(wx_path, ["MSG"])
if not media_paths[0] or not msg_paths[0]:
return ReJson(1001, body="media_paths or msg_paths is required")
media_paths = media_paths[1]
media_paths.sort()
msg_paths = msg_paths[1]
msg_paths.sort()
merge_real_time_db(key=key, db_path=media_paths[-1], merge_path=save_media_path)
merge_real_time_db(key=key, db_path=msg_paths[-1], merge_path=save_msg_path)
return ReJson(0, "success")
@api.route('/api/img', methods=["GET", 'POST'])
@error9999
def get_img():
"""
获取图片
:return:
"""
if request.method == "GET":
img_path = request.args.get("img_path")
elif request.method == "POST":
img_path = request.json.get("img_path")
else:
return ReJson(1003, msg="Unsupported method")
if not img_path:
return ReJson(1002)
wx_path = read_session(g.sf, "wx_path")
img_tmp_path = os.path.join(g.tmp_path, "img")
img_path_all = os.path.join(wx_path, img_path)
if os.path.exists(img_path_all):
fomt, md5, out_bytes = read_img_dat(img_path_all)
imgsavepath = os.path.join(img_tmp_path, img_path + "_" + ".".join([md5, fomt]))
if not os.path.exists(os.path.dirname(imgsavepath)):
os.makedirs(os.path.dirname(imgsavepath))
with open(imgsavepath, "wb") as f:
f.write(out_bytes)
return send_file(imgsavepath)
else:
return ReJson(1001, body=img_path_all)
@api.route('/api/video/<path:videoPath>', methods=["GET", 'POST'])
def get_video(videoPath):
wx_path = read_session(g.sf, "wx_path")
all_video_path = os.path.join(wx_path, videoPath)
if not os.path.exists(all_video_path):
return ReJson(5002)
return send_file(all_video_path)
@api.route('/api/file_info', methods=["GET", 'POST'])
def get_file_info():
file_path = request.args.get("file_path")
file_path = request.json.get("file_path", file_path)
if not file_path:
return ReJson(1002)
wx_path = read_session(g.sf, "wx_path")
all_file_path = os.path.join(wx_path, file_path)
if not os.path.exists(all_file_path):
return ReJson(5002)
file_name = os.path.basename(all_file_path)
file_size = os.path.getsize(all_file_path)
return ReJson(0, {"file_name": file_name, "file_size": str(file_size)})
@api.route('/api/file/<path:filePath>', methods=["GET", 'POST'])
def get_file(filePath):
wx_path = read_session(g.sf, "wx_path")
all_file_path = os.path.join(wx_path, filePath)
if not os.path.exists(all_file_path):
return ReJson(5002)
return send_file(all_file_path)
@api.route('/api/audio/<path:savePath>', methods=["GET", 'POST'])
def get_audio(savePath):
# savePath = request.args.get("savePath")
# savePath = request.json.get("savePath", savePath)
savePath = "audio/" + savePath # 这个是从url中获取的
MsgSvrID = savePath.split("_")[-1].replace(".wav", "")
if not savePath:
return ReJson(1002)
media_path = read_session(g.sf, "media_path")
wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path)
if not wave_data:
return ReJson(1001)
# 判断savePath路径的文件夹是否存在
savePath = os.path.join(g.tmp_path, savePath)
if not os.path.exists(os.path.dirname(savePath)):
os.makedirs(os.path.dirname(savePath))
with open(savePath, "wb") as f:
f.write(wave_data)
return send_file(savePath)
# 导出聊天记录
@api.route('/api/export', methods=["GET", 'POST'])
@error9999
def export():
"""
导出聊天记录
:return:
"""
export_type = request.json.get("export_type")
start_time = request.json.get("start_time", 0)
end_time = request.json.get("end_time", 0)
chat_type = request.json.get("chat_type")
username = request.json.get("username")
wx_path = request.json.get("wx_path", read_session(g.sf, "wx_path"))
key = request.json.get("key", read_session(g.sf, "key"))
if not export_type or not isinstance(export_type, str):
return ReJson(1002)
# 导出路径
outpath = os.path.join(g.tmp_path, "export", export_type)
if not os.path.exists(outpath):
os.makedirs(outpath)
if export_type == "endb": # 导出加密数据库
# 获取微信文件夹路径
if not wx_path:
return ReJson(1002)
if not os.path.exists(wx_path):
return ReJson(1001, body=wx_path)
# 分割wx_path的文件名和父目录
code, wxdbpaths = get_core_db(wx_path)
if not code:
return ReJson(2001, body=wxdbpaths)
for wxdb in wxdbpaths:
# 复制wxdb->outpath, os.path.basename(wxdb)
shutil.copy(wxdb, os.path.join(outpath, os.path.basename(wxdb)))
return ReJson(0, body=outpath)
elif export_type == "dedb":
if isinstance(start_time, int) and isinstance(end_time, int):
msg_path = read_session(g.sf, "msg_path")
micro_path = read_session(g.sf, "micro_path")
media_path = read_session(g.sf, "media_path")
dbpaths = [msg_path, msg_path, micro_path]
dbpaths = list(set(dbpaths))
mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
return ReJson(0, body=mergepath)
# if msg_path == media_path and msg_path == media_path:
# shutil.copy(msg_path, os.path.join(outpath, "merge.db"))
# return ReJson(0, body=msg_path)
# else:
# dbpaths = [msg_path, msg_path, micro_path]
# dbpaths = list(set(dbpaths))
# mergepath = merge_db(dbpaths, os.path.join(outpath, "merge.db"), start_time, end_time)
# return ReJson(0, body=mergepath)
else:
return ReJson(1002, body={"start_time": start_time, "end_time": end_time})
elif export_type == "csv":
outpath = os.path.join(outpath, username)
if not os.path.exists(outpath):
os.makedirs(outpath)
code, ret = analyzer.export_csv(username, outpath, read_session(g.sf, "msg_path"))
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
elif export_type == "json":
outpath = os.path.join(outpath, username)
if not os.path.exists(outpath):
os.makedirs(outpath)
code, ret = analyzer.export_json(username, outpath, read_session(g.sf, "msg_path"))
if code:
return ReJson(0, ret)
else:
return ReJson(2001, body=ret)
elif export_type == "html":
outpath = os.path.join(outpath, username)
if os.path.exists(outpath):
shutil.rmtree(outpath)
if not os.path.exists(outpath):
os.makedirs(outpath)
# chat_type_tups = []
# for ct in chat_type:
# tup = analyzer.get_name_typeid(ct)
# if tup:
# chat_type_tups += tup
# if not chat_type_tups:
# return ReJson(1002)
# 复制文件 html
export_html = os.path.join(os.path.dirname(pywxdump.VERSION_LIST_PATH), "ui", "export")
export_files = os.listdir(export_html)
for file in export_files:
if os.path.isfile(os.path.join(export_html, file)):
shutil.copy(os.path.join(export_html, file), os.path.join(outpath, file))
else:
shutil.copytree(os.path.join(export_html, file), os.path.join(outpath, file))
rdata = func_get_msgs(0, 10000000, username, "", "")
msg_list = rdata["msg_list"]
for i in range(len(msg_list)):
if msg_list[i]["type_name"] == "语音":
savePath = msg_list[i]["content"]["src"]
MsgSvrID = savePath.split("_")[-1].replace(".wav", "")
if not savePath:
return ReJson(1002)
media_path = read_session(g.sf, "media_path")
wave_data = read_audio(MsgSvrID, is_wave=True, DB_PATH=media_path)
if not wave_data:
return ReJson(1001)
# 判断savePath路径的文件夹是否存在
savePath = os.path.join(outpath, savePath)
if not os.path.exists(os.path.dirname(savePath)):
os.makedirs(os.path.dirname(savePath))
with open(savePath, "wb") as f:
f.write(wave_data)
elif msg_list[i]["type_name"] == "图片":
img_path = msg_list[i]["content"]["src"]
wx_path = read_session(g.sf, "wx_path")
img_path_all = os.path.join(wx_path, img_path)
if os.path.exists(img_path_all):
fomt, md5, out_bytes = read_img_dat(img_path_all)
imgsavepath = os.path.join(outpath, "img", img_path + "_" + ".".join([md5, fomt]))
if not os.path.exists(os.path.dirname(imgsavepath)):
os.makedirs(os.path.dirname(imgsavepath))
with open(imgsavepath, "wb") as f:
f.write(out_bytes)
msg_list[i]["content"]["src"] = os.path.join("img", img_path + "_" + ".".join([md5, fomt]))
rdata["msg_list"] = msg_list
rdata["myuserdata"] = rdata["user_list"][rdata["my_wxid"]]
rdata["myuserdata"]["chat_count"] = len(rdata["msg_list"])
save_data = rdata
save_json_path = os.path.join(outpath, "data")
if not os.path.exists(save_json_path):
os.makedirs(save_json_path)
with open(os.path.join(save_json_path, "msg_user.json"), "w", encoding="utf-8") as f:
json.dump(save_data, f, ensure_ascii=False)
return ReJson(0, outpath)
elif export_type == "pdf":
pass
elif export_type == "docx":
pass
else:
return ReJson(1002)
return ReJson(9999, "")
# 这部分为专业工具的api
@api.route('/api/wxinfo', methods=["GET", 'POST'])
@error9999
def get_wxinfo():
"""
获取微信信息
:return:
"""
import pythoncom
pythoncom.CoInitialize()
wxinfos = read_info(VERSION_LIST)
pythoncom.CoUninitialize()
return ReJson(0, wxinfos)
@api.route('/api/decrypt', methods=["GET", 'POST'])
@error9999
def decrypt():
"""
解密
:return:
"""
key = request.json.get("key")
if not key:
return ReJson(1002)
wxdb_path = request.json.get("wxdbPath")
if not wxdb_path:
return ReJson(1002)
out_path = request.json.get("outPath")
if not out_path:
out_path = g.tmp_path
wxinfos = batch_decrypt(key, wxdb_path, out_path=out_path)
return ReJson(0, str(wxinfos))
@api.route('/api/biasaddr', methods=["GET", 'POST'])
@error9999
def biasaddr():
"""
BiasAddr
:return:
"""
mobile = request.json.get("mobile")
name = request.json.get("name")
account = request.json.get("account")
key = request.json.get("key", "")
wxdbPath = request.json.get("wxdbPath", "")
if not mobile or not name or not account:
return ReJson(1002)
rdata = BiasAddr(account, mobile, name, key, wxdbPath).run()
return ReJson(0, str(rdata))
@api.route('/api/merge', methods=["GET", 'POST'])
@error9999
def merge():
"""
合并
:return:
"""
wxdb_path = request.json.get("dbPath")
if not wxdb_path:
return ReJson(1002)
out_path = request.json.get("outPath")
if not out_path:
return ReJson(1002)
rdata = merge_db(wxdb_path, out_path)
return ReJson(0, str(rdata))
# END 这部分为专业工具的api
# 关于、帮助、设置
@api.route('/api/check_update', methods=["GET", 'POST'])
@error9999
def check_update():
"""
检查更新
:return:
"""
url = "https://api.github.com/repos/xaoyaoo/PyWxDump/tags"
try:
import requests
res = requests.get(url)
if res.status_code == 200:
data = res.json()
NEW_VERSION = data[0].get("name")
if NEW_VERSION[1:] != pywxdump.__version__:
msg = "有新版本"
else:
msg = "已经是最新版本"
return ReJson(0, body={"msg": msg, "latest_version": NEW_VERSION,
"latest_url": "https://github.com/xaoyaoo/PyWxDump/releases/tag/" + NEW_VERSION})
else:
return ReJson(2001, body="status_code is not 200")
except Exception as e:
return ReJson(9999, msg=str(e))
# END 关于、帮助、设置
@api.route('/')
@error9999
def index():
return render_template('index.html')