feat(message): impl message handler

This commit is contained in:
Changhua 2025-02-18 01:14:20 +08:00
parent f964dba48a
commit 6371f83b87
4 changed files with 113 additions and 75 deletions

View File

@ -1,6 +1,7 @@
#include "message_handler.h"
#include <condition_variable>
#include <filesystem>
#include <mutex>
#include <queue>
@ -15,60 +16,61 @@
extern QWORD g_WeChatWinDllAddr;
#define OS_RECV_MSG_ID 0x30
#define OS_RECV_MSG_TYPE 0x38
#define OS_RECV_MSG_SELF 0x3C
#define OS_RECV_MSG_TS 0x44
#define OS_RECV_MSG_ROOMID 0x48
#define OS_RECV_MSG_CONTENT 0x88
#define OS_RECV_MSG_WXID 0x240
#define OS_RECV_MSG_SIGN 0x260
#define OS_RECV_MSG_THUMB 0x280
#define OS_RECV_MSG_EXTRA 0x2A0
#define OS_RECV_MSG_XML 0x308
#define OS_RECV_MSG_CALL 0x213ED90
#define OS_PYQ_MSG_START 0x30
#define OS_PYQ_MSG_END 0x38
#define OS_PYQ_MSG_TS 0x38
#define OS_PYQ_MSG_XML 0x9B8
#define OS_PYQ_MSG_SENDER 0x18
#define OS_PYQ_MSG_CONTENT 0x48
#define OS_PYQ_MSG_CALL 0x2E42C90
#define OS_WXLOG 0x2613D20
#define OS_PYQ_MSG_START 0x30
#define OS_PYQ_MSG_END 0x38
#define OS_PYQ_MSG_TS 0x38
#define OS_PYQ_MSG_XML 0x9B8
#define OS_PYQ_MSG_SENDER 0x18
#define OS_PYQ_MSG_CONTENT 0x48
#define OS_PYQ_MSG_CALL 0x2E42C90
namespace message
{
namespace OsLog = Offsets::Message::Log;
namespace fs = std::filesystem;
namespace OsLog = Offsets::Message::Log;
namespace OsRecv = Offsets::Message::Receive;
QWORD Handler::DispatchMsg(QWORD arg1, QWORD arg2)
{
auto &handler = getInstance();
WxMsg_t wxMsg = {};
try {
wxMsg.id = util::get_qword(arg2 + OS_RECV_MSG_ID);
wxMsg.type = util::get_dword(arg2 + OS_RECV_MSG_TYPE);
wxMsg.is_self = util::get_dword(arg2 + OS_RECV_MSG_SELF);
wxMsg.ts = util::get_dword(arg2 + OS_RECV_MSG_TS);
wxMsg.content = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_CONTENT);
wxMsg.sign = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_SIGN);
wxMsg.xml = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_XML);
wxMsg.roomid = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_ROOMID);
wxMsg.id = util::get_qword(arg2 + OsRecv::ID);
wxMsg.type = util::get_dword(arg2 + OsRecv::TYPE);
wxMsg.is_self = util::get_dword(arg2 + OsRecv::SELF);
wxMsg.ts = util::get_dword(arg2 + OsRecv::TIMESTAMP);
wxMsg.content = util::get_str_by_wstr_addr(arg2 + OsRecv::CONTENT);
wxMsg.sign = util::get_str_by_wstr_addr(arg2 + OsRecv::SIGN);
wxMsg.xml = util::get_str_by_wstr_addr(arg2 + OsRecv::XML);
wxMsg.roomid = util::get_str_by_wstr_addr(arg2 + OsRecv::ROOMID);
if (wxMsg.roomid.find("@chatroom") != std::string::npos) {
if (wxMsg.roomid.find("@chatroom") != std::string::npos) { // 群 ID 的格式为 xxxxxxxxxxx@chatroom
wxMsg.is_group = true;
wxMsg.sender
= wxMsg.is_self ? account::get_self_wxid() : util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_WXID);
wxMsg.sender = wxMsg.is_self ? account::get_self_wxid() : util::get_str_by_wstr_addr(arg2 + OsRecv::WXID);
} else {
wxMsg.is_group = false;
wxMsg.sender = wxMsg.is_self ? account::get_self_wxid() : wxMsg.roomid;
}
fs::path thumb = util::get_str_by_wstr_addr(arg2 + OsRecv::THUMB);
if (!thumb.empty()) {
wxMsg.thumb = (account::get_home_path() / thumb).generic_string();
}
fs::path extra = util::get_str_by_wstr_addr(arg2 + OsRecv::EXTRA);
if (!extra.empty()) {
wxMsg.extra = (account::get_home_path() / extra).generic_string();
}
LOG_DEBUG("{}", wxMsg.content);
} catch (const std::exception &e) {
LOG_ERROR(util::gb2312_to_utf8(e.what()));
}
{
std::unique_lock<std::mutex> lock(handler.mutex_);
handler.msgQueue_.push(wxMsg);
handler.msgQueue_.push(wxMsg); // 推送到队列
}
handler.cv_.notify_all();
@ -219,7 +221,7 @@ int Handler::ListenMsg()
{
if (isListeningMsg) return 1;
funcRecvMsg = reinterpret_cast<funcRecvMsg_t>(g_WeChatWinDllAddr + OS_RECV_MSG_CALL);
funcRecvMsg = reinterpret_cast<funcRecvMsg_t>(g_WeChatWinDllAddr + OsRecv::CALL);
if (InitializeHook() != MH_OK) return -1;
if (MH_CreateHook(funcRecvMsg, &DispatchMsg, reinterpret_cast<LPVOID *>(&realRecvMsg)) != MH_OK) return -1;
if (MH_EnableHook(funcRecvMsg) != MH_OK) return -1;
@ -277,7 +279,7 @@ MH_STATUS Handler::UninitializeHook()
bool Handler::rpc_get_msg_types(uint8_t *out, size_t *len)
{
MsgTypes_t types = GetMsgTypes();
MsgTypes_t types = GetMsgTypes();
return fill_response<Functions_FUNC_GET_MSG_TYPES>(out, len, [&](Response &rsp) {
rsp.msg.types.types.funcs.encode = encode_types;
rsp.msg.types.types.arg = &types;

View File

@ -23,6 +23,22 @@ namespace Message
constexpr uint64_t LEVEL = 0x56E4244; // 日志级别
constexpr uint64_t CALL = 0x261B890; // 日志函数
}
namespace Receive
{
constexpr uint64_t CALL = 0x2141E80; // 接收消息 Call
constexpr uint64_t ID = 0x30; // 消息 ID
constexpr uint64_t TYPE = 0x38; // 消息类型
constexpr uint64_t SELF = 0x3C; // 消息是否来自自己
constexpr uint64_t TIMESTAMP = 0x44; // 消息时间戳
constexpr uint64_t ROOMID = 0x48; // 群聊 ID或者发送者 wxid
constexpr uint64_t CONTENT = 0x88; // 消息内容
constexpr uint64_t WXID = 0x240; // 发送者 wxid
constexpr uint64_t SIGN = 0x260; // 消息签名
constexpr uint64_t THUMB = 0x280; // 缩略图路径
constexpr uint64_t EXTRA = 0x2A0; // 原图路径
constexpr uint64_t XML = 0x308; // 消息 XML
}
}
}

View File

@ -81,7 +81,7 @@ int RpcServer::start(int port)
isRunning_ = true;
try {
cmdThread_ = std::thread(&RpcServer::runRpcServer, this);
cmdThread_ = std::thread(&RpcServer::run_rpc_server, this);
} catch (const std::exception &e) {
LOG_ERROR("启动 RPC 服务器失败: {}", e.what());
isRunning_ = false;
@ -123,43 +123,56 @@ int RpcServer::stop()
return 0;
}
void RpcServer::receiveMessageCallback()
void RpcServer::on_message_callback()
{
int rv;
nng_socket msgSock = NNG_SOCKET_INITIALIZER;
Response rsp = Response_init_default;
rsp.func = Functions_FUNC_ENABLE_RECV_TXT;
rsp.which_msg = Response_wxmsg_tag;
std::vector<uint8_t> msgBuffer(DEFAULT_BUF_SIZE);
try {
int rv;
nng_socket msgSock = NNG_SOCKET_INITIALIZER;
Response rsp = Response_init_default;
rsp.func = Functions_FUNC_ENABLE_RECV_TXT;
rsp.which_msg = Response_wxmsg_tag;
std::vector<uint8_t> msgBuffer(DEFAULT_BUF_SIZE);
pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size());
pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size());
std::string url = build_url(port_ + 1);
if ((rv = nng_pair1_open(&msgSock)) != 0) {
LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv));
return;
}
std::string url = build_url(port_ + 1);
if ((rv = nng_pair1_open(&msgSock)) != 0) {
LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv));
return;
}
if ((rv = nng_listen(msgSock, url.c_str(), NULL, 0)) != 0) {
LOG_ERROR("nng_listen error {}", nng_strerror(rv));
return;
}
if ((rv = nng_listen(msgSock, url.c_str(), NULL, 0)) != 0) {
LOG_ERROR("nng_listen error {}", nng_strerror(rv));
return;
}
LOG_INFO("MSG Server listening on {}", url.c_str());
if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 5000)) != 0) {
LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv));
return;
}
if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 5000)) != 0) {
LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv));
return;
}
while (handler_.isMessageListening()) {
std::unique_lock<std::mutex> lock(handler_.getMutex());
std::optional<WxMsg_t> msgOpt;
auto hasMessage = [&]() {
msgOpt = handler_.popMessage();
return msgOpt.has_value();
};
while (handler_.isMessageListening()) {
std::optional<WxMsg_t> msgOpt;
{
std::unique_lock<std::mutex> lock(handler_.getMutex());
bool hasMessage
= handler_.getConditionVariable().wait_for(lock, std::chrono::milliseconds(1000), [&]() {
lock.unlock();
msgOpt = handler_.popMessage();
lock.lock();
return msgOpt.has_value();
});
if (!hasMessage) {
continue;
}
}
if (!msgOpt.has_value()) {
LOG_WARN("popMessage returned empty after wait_for success.");
continue;
}
if (handler_.getConditionVariable().wait_for(lock, std::chrono::milliseconds(1000), hasMessage)) {
WxMsg_t wxmsg = std::move(msgOpt.value());
rsp.msg.wxmsg.id = wxmsg.id;
rsp.msg.wxmsg.is_self = wxmsg.is_self;
@ -187,11 +200,16 @@ void RpcServer::receiveMessageCallback()
}
LOG_DEBUG("Send data length {}", stream.bytes_written);
}
nng_close(msgSock);
LOG_DEBUG("Leave MSG Server.");
} catch (const std::exception &e) {
LOG_ERROR("Fatal exception in on_message_callback: {}", e.what());
} catch (...) {
LOG_ERROR("Unknown fatal exception in on_message_callback.");
}
nng_close(msgSock);
}
bool RpcServer::enableRecvMsg(bool pyq, uint8_t *out, size_t *len)
bool RpcServer::start_message_listener(bool pyq, uint8_t *out, size_t *len)
{
return fill_response<Functions_FUNC_ENABLE_RECV_TXT>(out, len, [&](Response &rsp) {
rsp.msg.status = handler_.ListenMsg();
@ -199,12 +217,12 @@ bool RpcServer::enableRecvMsg(bool pyq, uint8_t *out, size_t *len)
if (pyq) {
handler_.ListenPyq();
}
msgThread_ = std::thread(&RpcServer::receiveMessageCallback, this);
msgThread_ = std::thread(&RpcServer::on_message_callback, this);
}
});
}
bool RpcServer::disableRecvMsg(uint8_t *out, size_t *len)
bool RpcServer::stop_message_listener(uint8_t *out, size_t *len)
{
return fill_response<Functions_FUNC_DISABLE_RECV_TXT>(out, len, [&](Response &rsp) {
rsp.msg.status = handler_.UnListenMsg();
@ -223,6 +241,8 @@ const std::unordered_map<Functions, RpcServer::FunctionHandler> RpcServer::rpcFu
{ Functions_FUNC_GET_SELF_WXID, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_self_wxid(out, len); } },
{ Functions_FUNC_GET_USER_INFO, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_user_info(out, len); } },
{ Functions_FUNC_GET_MSG_TYPES, [](const Request &r, uint8_t *out, size_t *len) { return RpcServer::getInstance().handler_.rpc_get_msg_types(out, len); } },
{ Functions_FUNC_ENABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return RpcServer::getInstance().start_message_listener(r.msg.flag, out, len); } },
{ Functions_FUNC_DISABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return RpcServer::getInstance().stop_message_listener(out, len); } },
// { Functions_FUNC_GET_CONTACTS, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_get_contacts(out, len); } },
// { Functions_FUNC_GET_DB_NAMES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_names(out, len); } },
// { Functions_FUNC_GET_DB_TABLES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_tables(r.msg.str, out, len); } },
@ -277,7 +297,7 @@ bool RpcServer::dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out
return ret;
}
void RpcServer::runRpcServer()
void RpcServer::run_rpc_server()
{
int rv = 0;
nng_socket cmdSock = NNG_SOCKET_INITIALIZER;

View File

@ -26,10 +26,10 @@ private:
RpcServer(const RpcServer &) = delete;
RpcServer &operator=(const RpcServer &) = delete;
void runRpcServer();
void receiveMessageCallback();
bool enableRecvMsg(bool pyq, uint8_t *out, size_t *len);
bool disableRecvMsg(uint8_t *out, size_t *len);
void run_rpc_server();
void on_message_callback();
bool start_message_listener(bool pyq, uint8_t *out, size_t *len);
bool stop_message_listener(uint8_t *out, size_t *len);
bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len);
static std::string build_url(int port);