From 6371f83b872f04b90924b695e4075b723a2c7cae Mon Sep 17 00:00:00 2001 From: Changhua Date: Tue, 18 Feb 2025 01:14:20 +0800 Subject: [PATCH] feat(message): impl message handler --- WeChatFerry/spy/message_handler.cpp | 72 +++++++++++----------- WeChatFerry/spy/offsets.h | 16 +++++ WeChatFerry/spy/rpc_server.cpp | 92 ++++++++++++++++++----------- WeChatFerry/spy/rpc_server.h | 8 +-- 4 files changed, 113 insertions(+), 75 deletions(-) diff --git a/WeChatFerry/spy/message_handler.cpp b/WeChatFerry/spy/message_handler.cpp index 7f7e8df..db3cc1f 100644 --- a/WeChatFerry/spy/message_handler.cpp +++ b/WeChatFerry/spy/message_handler.cpp @@ -1,6 +1,7 @@ #include "message_handler.h" #include +#include #include #include @@ -15,60 +16,61 @@ extern QWORD g_WeChatWinDllAddr; -#define OS_RECV_MSG_ID 0x30 -#define OS_RECV_MSG_TYPE 0x38 -#define OS_RECV_MSG_SELF 0x3C -#define OS_RECV_MSG_TS 0x44 -#define OS_RECV_MSG_ROOMID 0x48 -#define OS_RECV_MSG_CONTENT 0x88 -#define OS_RECV_MSG_WXID 0x240 -#define OS_RECV_MSG_SIGN 0x260 -#define OS_RECV_MSG_THUMB 0x280 -#define OS_RECV_MSG_EXTRA 0x2A0 -#define OS_RECV_MSG_XML 0x308 -#define OS_RECV_MSG_CALL 0x213ED90 -#define OS_PYQ_MSG_START 0x30 -#define OS_PYQ_MSG_END 0x38 -#define OS_PYQ_MSG_TS 0x38 -#define OS_PYQ_MSG_XML 0x9B8 -#define OS_PYQ_MSG_SENDER 0x18 -#define OS_PYQ_MSG_CONTENT 0x48 -#define OS_PYQ_MSG_CALL 0x2E42C90 -#define OS_WXLOG 0x2613D20 +#define OS_PYQ_MSG_START 0x30 +#define OS_PYQ_MSG_END 0x38 +#define OS_PYQ_MSG_TS 0x38 +#define OS_PYQ_MSG_XML 0x9B8 +#define OS_PYQ_MSG_SENDER 0x18 +#define OS_PYQ_MSG_CONTENT 0x48 +#define OS_PYQ_MSG_CALL 0x2E42C90 namespace message { -namespace OsLog = Offsets::Message::Log; + +namespace fs = std::filesystem; + +namespace OsLog = Offsets::Message::Log; +namespace OsRecv = Offsets::Message::Receive; QWORD Handler::DispatchMsg(QWORD arg1, QWORD arg2) { auto &handler = getInstance(); WxMsg_t wxMsg = {}; try { - wxMsg.id = util::get_qword(arg2 + OS_RECV_MSG_ID); - wxMsg.type = util::get_dword(arg2 + OS_RECV_MSG_TYPE); - wxMsg.is_self = util::get_dword(arg2 + OS_RECV_MSG_SELF); - wxMsg.ts = util::get_dword(arg2 + OS_RECV_MSG_TS); - wxMsg.content = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_CONTENT); - wxMsg.sign = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_SIGN); - wxMsg.xml = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_XML); - wxMsg.roomid = util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_ROOMID); + wxMsg.id = util::get_qword(arg2 + OsRecv::ID); + wxMsg.type = util::get_dword(arg2 + OsRecv::TYPE); + wxMsg.is_self = util::get_dword(arg2 + OsRecv::SELF); + wxMsg.ts = util::get_dword(arg2 + OsRecv::TIMESTAMP); + wxMsg.content = util::get_str_by_wstr_addr(arg2 + OsRecv::CONTENT); + wxMsg.sign = util::get_str_by_wstr_addr(arg2 + OsRecv::SIGN); + wxMsg.xml = util::get_str_by_wstr_addr(arg2 + OsRecv::XML); + wxMsg.roomid = util::get_str_by_wstr_addr(arg2 + OsRecv::ROOMID); - if (wxMsg.roomid.find("@chatroom") != std::string::npos) { + if (wxMsg.roomid.find("@chatroom") != std::string::npos) { // 群 ID 的格式为 xxxxxxxxxxx@chatroom wxMsg.is_group = true; - wxMsg.sender - = wxMsg.is_self ? account::get_self_wxid() : util::get_str_by_wstr_addr(arg2 + OS_RECV_MSG_WXID); + wxMsg.sender = wxMsg.is_self ? account::get_self_wxid() : util::get_str_by_wstr_addr(arg2 + OsRecv::WXID); } else { wxMsg.is_group = false; wxMsg.sender = wxMsg.is_self ? account::get_self_wxid() : wxMsg.roomid; } + + fs::path thumb = util::get_str_by_wstr_addr(arg2 + OsRecv::THUMB); + if (!thumb.empty()) { + wxMsg.thumb = (account::get_home_path() / thumb).generic_string(); + } + + fs::path extra = util::get_str_by_wstr_addr(arg2 + OsRecv::EXTRA); + if (!extra.empty()) { + wxMsg.extra = (account::get_home_path() / extra).generic_string(); + } + LOG_DEBUG("{}", wxMsg.content); } catch (const std::exception &e) { LOG_ERROR(util::gb2312_to_utf8(e.what())); } { std::unique_lock lock(handler.mutex_); - handler.msgQueue_.push(wxMsg); + handler.msgQueue_.push(wxMsg); // 推送到队列 } handler.cv_.notify_all(); @@ -219,7 +221,7 @@ int Handler::ListenMsg() { if (isListeningMsg) return 1; - funcRecvMsg = reinterpret_cast(g_WeChatWinDllAddr + OS_RECV_MSG_CALL); + funcRecvMsg = reinterpret_cast(g_WeChatWinDllAddr + OsRecv::CALL); if (InitializeHook() != MH_OK) return -1; if (MH_CreateHook(funcRecvMsg, &DispatchMsg, reinterpret_cast(&realRecvMsg)) != MH_OK) return -1; if (MH_EnableHook(funcRecvMsg) != MH_OK) return -1; @@ -277,7 +279,7 @@ MH_STATUS Handler::UninitializeHook() bool Handler::rpc_get_msg_types(uint8_t *out, size_t *len) { - MsgTypes_t types = GetMsgTypes(); + MsgTypes_t types = GetMsgTypes(); return fill_response(out, len, [&](Response &rsp) { rsp.msg.types.types.funcs.encode = encode_types; rsp.msg.types.types.arg = &types; diff --git a/WeChatFerry/spy/offsets.h b/WeChatFerry/spy/offsets.h index 1a9d415..ca7ba91 100644 --- a/WeChatFerry/spy/offsets.h +++ b/WeChatFerry/spy/offsets.h @@ -23,6 +23,22 @@ namespace Message constexpr uint64_t LEVEL = 0x56E4244; // 日志级别 constexpr uint64_t CALL = 0x261B890; // 日志函数 } + + namespace Receive + { + constexpr uint64_t CALL = 0x2141E80; // 接收消息 Call + constexpr uint64_t ID = 0x30; // 消息 ID + constexpr uint64_t TYPE = 0x38; // 消息类型 + constexpr uint64_t SELF = 0x3C; // 消息是否来自自己 + constexpr uint64_t TIMESTAMP = 0x44; // 消息时间戳 + constexpr uint64_t ROOMID = 0x48; // 群聊 ID(或者发送者 wxid) + constexpr uint64_t CONTENT = 0x88; // 消息内容 + constexpr uint64_t WXID = 0x240; // 发送者 wxid + constexpr uint64_t SIGN = 0x260; // 消息签名 + constexpr uint64_t THUMB = 0x280; // 缩略图路径 + constexpr uint64_t EXTRA = 0x2A0; // 原图路径 + constexpr uint64_t XML = 0x308; // 消息 XML + } } } diff --git a/WeChatFerry/spy/rpc_server.cpp b/WeChatFerry/spy/rpc_server.cpp index 0d6842b..5f24d78 100644 --- a/WeChatFerry/spy/rpc_server.cpp +++ b/WeChatFerry/spy/rpc_server.cpp @@ -81,7 +81,7 @@ int RpcServer::start(int port) isRunning_ = true; try { - cmdThread_ = std::thread(&RpcServer::runRpcServer, this); + cmdThread_ = std::thread(&RpcServer::run_rpc_server, this); } catch (const std::exception &e) { LOG_ERROR("启动 RPC 服务器失败: {}", e.what()); isRunning_ = false; @@ -123,43 +123,56 @@ int RpcServer::stop() return 0; } -void RpcServer::receiveMessageCallback() +void RpcServer::on_message_callback() { - int rv; - nng_socket msgSock = NNG_SOCKET_INITIALIZER; - Response rsp = Response_init_default; - rsp.func = Functions_FUNC_ENABLE_RECV_TXT; - rsp.which_msg = Response_wxmsg_tag; - std::vector msgBuffer(DEFAULT_BUF_SIZE); + try { + int rv; + nng_socket msgSock = NNG_SOCKET_INITIALIZER; + Response rsp = Response_init_default; + rsp.func = Functions_FUNC_ENABLE_RECV_TXT; + rsp.which_msg = Response_wxmsg_tag; + std::vector msgBuffer(DEFAULT_BUF_SIZE); - pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size()); + pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size()); - std::string url = build_url(port_ + 1); - if ((rv = nng_pair1_open(&msgSock)) != 0) { - LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); - return; - } + std::string url = build_url(port_ + 1); + if ((rv = nng_pair1_open(&msgSock)) != 0) { + LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); + return; + } - if ((rv = nng_listen(msgSock, url.c_str(), NULL, 0)) != 0) { - LOG_ERROR("nng_listen error {}", nng_strerror(rv)); - return; - } + if ((rv = nng_listen(msgSock, url.c_str(), NULL, 0)) != 0) { + LOG_ERROR("nng_listen error {}", nng_strerror(rv)); + return; + } - LOG_INFO("MSG Server listening on {}", url.c_str()); - if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 5000)) != 0) { - LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv)); - return; - } + if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 5000)) != 0) { + LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv)); + return; + } - while (handler_.isMessageListening()) { - std::unique_lock lock(handler_.getMutex()); - std::optional msgOpt; - auto hasMessage = [&]() { - msgOpt = handler_.popMessage(); - return msgOpt.has_value(); - }; + while (handler_.isMessageListening()) { + std::optional msgOpt; + { + std::unique_lock lock(handler_.getMutex()); + bool hasMessage + = handler_.getConditionVariable().wait_for(lock, std::chrono::milliseconds(1000), [&]() { + lock.unlock(); + msgOpt = handler_.popMessage(); + lock.lock(); + return msgOpt.has_value(); + }); + + if (!hasMessage) { + continue; + } + } + + if (!msgOpt.has_value()) { + LOG_WARN("popMessage returned empty after wait_for success."); + continue; + } - if (handler_.getConditionVariable().wait_for(lock, std::chrono::milliseconds(1000), hasMessage)) { WxMsg_t wxmsg = std::move(msgOpt.value()); rsp.msg.wxmsg.id = wxmsg.id; rsp.msg.wxmsg.is_self = wxmsg.is_self; @@ -187,11 +200,16 @@ void RpcServer::receiveMessageCallback() } LOG_DEBUG("Send data length {}", stream.bytes_written); } + nng_close(msgSock); + LOG_DEBUG("Leave MSG Server."); + } catch (const std::exception &e) { + LOG_ERROR("Fatal exception in on_message_callback: {}", e.what()); + } catch (...) { + LOG_ERROR("Unknown fatal exception in on_message_callback."); } - nng_close(msgSock); } -bool RpcServer::enableRecvMsg(bool pyq, uint8_t *out, size_t *len) +bool RpcServer::start_message_listener(bool pyq, uint8_t *out, size_t *len) { return fill_response(out, len, [&](Response &rsp) { rsp.msg.status = handler_.ListenMsg(); @@ -199,12 +217,12 @@ bool RpcServer::enableRecvMsg(bool pyq, uint8_t *out, size_t *len) if (pyq) { handler_.ListenPyq(); } - msgThread_ = std::thread(&RpcServer::receiveMessageCallback, this); + msgThread_ = std::thread(&RpcServer::on_message_callback, this); } }); } -bool RpcServer::disableRecvMsg(uint8_t *out, size_t *len) +bool RpcServer::stop_message_listener(uint8_t *out, size_t *len) { return fill_response(out, len, [&](Response &rsp) { rsp.msg.status = handler_.UnListenMsg(); @@ -223,6 +241,8 @@ const std::unordered_map RpcServer::rpcFu { Functions_FUNC_GET_SELF_WXID, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_self_wxid(out, len); } }, { Functions_FUNC_GET_USER_INFO, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_user_info(out, len); } }, { Functions_FUNC_GET_MSG_TYPES, [](const Request &r, uint8_t *out, size_t *len) { return RpcServer::getInstance().handler_.rpc_get_msg_types(out, len); } }, + { Functions_FUNC_ENABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return RpcServer::getInstance().start_message_listener(r.msg.flag, out, len); } }, + { Functions_FUNC_DISABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return RpcServer::getInstance().stop_message_listener(out, len); } }, // { Functions_FUNC_GET_CONTACTS, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_get_contacts(out, len); } }, // { Functions_FUNC_GET_DB_NAMES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_names(out, len); } }, // { Functions_FUNC_GET_DB_TABLES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_tables(r.msg.str, out, len); } }, @@ -277,7 +297,7 @@ bool RpcServer::dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out return ret; } -void RpcServer::runRpcServer() +void RpcServer::run_rpc_server() { int rv = 0; nng_socket cmdSock = NNG_SOCKET_INITIALIZER; diff --git a/WeChatFerry/spy/rpc_server.h b/WeChatFerry/spy/rpc_server.h index 468689e..0bb96b9 100644 --- a/WeChatFerry/spy/rpc_server.h +++ b/WeChatFerry/spy/rpc_server.h @@ -26,10 +26,10 @@ private: RpcServer(const RpcServer &) = delete; RpcServer &operator=(const RpcServer &) = delete; - void runRpcServer(); - void receiveMessageCallback(); - bool enableRecvMsg(bool pyq, uint8_t *out, size_t *len); - bool disableRecvMsg(uint8_t *out, size_t *len); + void run_rpc_server(); + void on_message_callback(); + bool start_message_listener(bool pyq, uint8_t *out, size_t *len); + bool stop_message_listener(uint8_t *out, size_t *len); bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len); static std::string build_url(int port);