diff --git a/WeChatFerry/spy/rpc_server.cpp b/WeChatFerry/spy/rpc_server.cpp index d238642..dbbc02a 100644 --- a/WeChatFerry/spy/rpc_server.cpp +++ b/WeChatFerry/spy/rpc_server.cpp @@ -14,12 +14,9 @@ #include #include -#include #include #include -#include "wcf.pb.h" - #include "account_manager.h" #include "chatroom_manager.h" #include "contact_manager.h" @@ -39,30 +36,103 @@ namespace fs = std::filesystem; constexpr size_t DEFAULT_BUF_SIZE = 16 * 1024 * 1024; -static int cmdPort = 0; -static bool isRpcRunning = false; -static HANDLE cmdThread = NULL; -static HANDLE msgThread = NULL; -static nng_socket cmdSock = NNG_SOCKET_INITIALIZER; // TODO: 断开检测 -static nng_socket msgSock = NNG_SOCKET_INITIALIZER; // TODO: 断开检测 -auto &handler = message::Handler::getInstance(); -auto &sender = message::Sender::get_instance(); +std::unique_ptr RpcServer::instance_ = nullptr; -using FunctionHandler = std::function; +RpcServer &RpcServer::getInstance() +{ + if (!instance_) { + instance_.reset(new RpcServer()); + } + return *instance_; +} -inline std::string build_url(int port) { return "tcp://0.0.0.0:" + std::to_string(port); } +void RpcServer::destroyInstance() +{ + if (instance_) { + instance_->stop(); + instance_.reset(); + } +} -static void receive_message_callback() +RpcServer::RpcServer(int port) : port_(port) { LOG_DEBUG("RpcServer 构造: 端口 {}", port_); } + +RpcServer::~RpcServer() +{ + stop(); + LOG_DEBUG("RpcServer 被析构,释放所有资源"); +} + +std::string RpcServer::build_url(int port) +{ + return std::string(RpcServer::RPC_SERVER_ADDRESS) + ":" + std::to_string(port); +} + +int RpcServer::start(int port) +{ + if (isRunning_.load()) { + LOG_WARN("RPC 服务已在运行"); + return 1; + } + + port_ = port; + isRunning_ = true; + + try { + cmdThread_ = std::thread(&RpcServer::runRpcServer, this); + } catch (const std::exception &e) { + LOG_ERROR("启动 RPC 服务器失败: {}", e.what()); + isRunning_ = false; + return -2; + } +#if ENABLE_WX_LOG + EnableLog(); +#endif + LOG_INFO("RPC 服务器成功启动,监听端口: {}", port_); + return 0; +} + +int RpcServer::stop() +{ + if (!isRunning_.load()) { + LOG_WARN("RPC 服务未运行"); + return 1; + } + isRunning_ = false; + + auto &handler = message::Handler::getInstance(); + handler.UnListenPyq(); + handler.UnListenMsg(); +#if ENABLE_WX_LOG + DisableLog(); +#endif + nng_fini(); + if (cmdThread_.joinable()) { + LOG_DEBUG("等待命令线程关闭"); + cmdThread_.join(); + } + LOG_DEBUG("命令线程已经关闭"); + + if (msgThread_.joinable()) { + LOG_DEBUG("等待消息线程关闭"); + msgThread_.join(); + } + LOG_DEBUG("消息线程已经关闭"); + LOG_INFO("RPC 服务已停止"); + return 0; +} + +void RpcServer::receiveMessageCallback() { int rv; - Response rsp = Response_init_default; - rsp.func = Functions_FUNC_ENABLE_RECV_TXT; - rsp.which_msg = Response_wxmsg_tag; + nng_socket msgSock = NNG_SOCKET_INITIALIZER; + Response rsp = Response_init_default; + rsp.func = Functions_FUNC_ENABLE_RECV_TXT; + rsp.which_msg = Response_wxmsg_tag; std::vector msgBuffer(DEFAULT_BUF_SIZE); pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size()); - std::string url = build_url(cmdPort + 1); + std::string url = build_url(port_ + 1); if ((rv = nng_pair1_open(&msgSock)) != 0) { LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); return; @@ -79,6 +149,7 @@ static void receive_message_callback() return; } + auto &handler = message::Handler::getInstance(); while (handler.isMessageListening()) { std::unique_lock lock(handler.getMutex()); std::optional msgOpt; @@ -116,76 +187,74 @@ static void receive_message_callback() LOG_DEBUG("Send data length {}", stream.bytes_written); } } + nng_close(msgSock); } -static bool rpc_enable_recv_msg(bool pyq, uint8_t *out, size_t *len) +bool RpcServer::enableRecvMsg(bool pyq, uint8_t *out, size_t *len) { return fill_response(out, len, [&](Response &rsp) { + auto &handler = message::Handler::getInstance(); rsp.msg.status = handler.ListenMsg(); if (rsp.msg.status == 0) { if (pyq) { handler.ListenPyq(); } - msgThread = CreateThread(nullptr, 0, (LPTHREAD_START_ROUTINE)receive_message_callback, nullptr, 0, nullptr); - if (msgThread == nullptr) { - rsp.msg.status = GetLastError(); - LOG_ERROR("func_enable_recv_txt failed: {}", rsp.msg.status); - } + msgThread_ = std::thread(&RpcServer::receiveMessageCallback, this); } }); } -static bool rpc_disable_recv_msg(uint8_t *out, size_t *len) +bool RpcServer::disableRecvMsg(uint8_t *out, size_t *len) { - return fill_response(out, len, [](Response &rsp) { + return fill_response(out, len, [&](Response &rsp) { + auto &handler = message::Handler::getInstance(); rsp.msg.status = handler.UnListenMsg(); if (rsp.msg.status == 0) { handler.UnListenPyq(); - if (msgThread != nullptr) { - TerminateThread(msgThread, 0); - msgThread = nullptr; + if (msgThread_.joinable()) { + msgThread_.join(); } } }); } -const std::unordered_map rpc_function_map = { +const std::unordered_map RpcServer::rpcFunctionMap = { // clang-format off { Functions_FUNC_IS_LOGIN, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_is_logged_in(out, len); } }, - { Functions_FUNC_GET_SELF_WXID, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_self_wxid(out, len); } }, - { Functions_FUNC_GET_USER_INFO, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_user_info(out, len); } }, - { Functions_FUNC_GET_MSG_TYPES, [](const Request &r, uint8_t *out, size_t *len) { return handler.rpc_get_msg_types(out, len); } }, - { Functions_FUNC_GET_CONTACTS, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_get_contacts(out, len); } }, - { Functions_FUNC_GET_DB_NAMES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_names(out, len); } }, - { Functions_FUNC_GET_DB_TABLES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_tables(r.msg.str, out, len); } }, - { Functions_FUNC_GET_AUDIO_MSG, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_get_audio(r.msg.am, out, len); } }, - { Functions_FUNC_SEND_TXT, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_text(r.msg.txt, out, len); } }, - { Functions_FUNC_SEND_IMG, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_image(r.msg.file, out, len); } }, - { Functions_FUNC_SEND_FILE, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_file(r.msg.file, out, len); } }, - { Functions_FUNC_SEND_XML, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_xml(r.msg.xml, out, len); } }, - { Functions_FUNC_SEND_EMOTION, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_emotion(r.msg.file, out, len); } }, - { Functions_FUNC_SEND_RICH_TXT, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_rich_text(r.msg.rt, out, len); } }, - { Functions_FUNC_SEND_PAT_MSG, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_pat(r.msg.pm, out, len); } }, - { Functions_FUNC_FORWARD_MSG, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_forward(r.msg.fm, out, len); } }, - { Functions_FUNC_ENABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return rpc_enable_recv_msg(r.msg.flag, out, len); } }, - { Functions_FUNC_DISABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return rpc_disable_recv_msg(out, len); } }, - { Functions_FUNC_EXEC_DB_QUERY, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_exec_db_query(r.msg.query, out, len); } }, - { Functions_FUNC_ACCEPT_FRIEND, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_accept_friend(r.msg.v, out, len); } }, - { Functions_FUNC_RECV_TRANSFER, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_receive_transfer(r.msg.tf, out, len); } }, - { Functions_FUNC_REFRESH_PYQ, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_refresh_pyq(r.msg.ui64, out, len); } }, - { Functions_FUNC_DOWNLOAD_ATTACH, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_download_attachment(r.msg.att, out, len); } }, - { Functions_FUNC_GET_CONTACT_INFO, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_get_contact_info(r.msg.str, out, len); } }, - { Functions_FUNC_REVOKE_MSG, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_revoke_message(r.msg.ui64, out, len); } }, - { Functions_FUNC_REFRESH_QRCODE, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_get_login_url(out, len); } }, - { Functions_FUNC_DECRYPT_IMAGE, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_decrypt_image(r.msg.dec, out, len); } }, - { Functions_FUNC_EXEC_OCR, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_get_ocr_result(r.msg.str, out, len); } }, - { Functions_FUNC_ADD_ROOM_MEMBERS, [](const Request &r, uint8_t *out, size_t *len) { return chatroom::rpc_add_chatroom_member(r.msg.m, out, len); } }, - { Functions_FUNC_DEL_ROOM_MEMBERS, [](const Request &r, uint8_t *out, size_t *len) { return chatroom::rpc_delete_chatroom_member(r.msg.m, out, len); } }, - { Functions_FUNC_INV_ROOM_MEMBERS, [](const Request &r, uint8_t *out, size_t *len) { return chatroom::rpc_invite_chatroom_member(r.msg.m, out, len); } }, + // { Functions_FUNC_GET_SELF_WXID, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_self_wxid(out, len); } }, + // { Functions_FUNC_GET_USER_INFO, [](const Request &r, uint8_t *out, size_t *len) { return account::rpc_get_user_info(out, len); } }, + // { Functions_FUNC_GET_MSG_TYPES, [](const Request &r, uint8_t *out, size_t *len) { return handler.rpc_get_msg_types(out, len); } }, + // { Functions_FUNC_GET_CONTACTS, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_get_contacts(out, len); } }, + // { Functions_FUNC_GET_DB_NAMES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_names(out, len); } }, + // { Functions_FUNC_GET_DB_TABLES, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_get_db_tables(r.msg.str, out, len); } }, + // { Functions_FUNC_GET_AUDIO_MSG, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_get_audio(r.msg.am, out, len); } }, + // { Functions_FUNC_SEND_TXT, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_text(r.msg.txt, out, len); } }, + // { Functions_FUNC_SEND_IMG, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_image(r.msg.file, out, len); } }, + // { Functions_FUNC_SEND_FILE, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_file(r.msg.file, out, len); } }, + // { Functions_FUNC_SEND_XML, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_xml(r.msg.xml, out, len); } }, + // { Functions_FUNC_SEND_EMOTION, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_emotion(r.msg.file, out, len); } }, + // { Functions_FUNC_SEND_RICH_TXT, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_rich_text(r.msg.rt, out, len); } }, + // { Functions_FUNC_SEND_PAT_MSG, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_send_pat(r.msg.pm, out, len); } }, + // { Functions_FUNC_FORWARD_MSG, [](const Request &r, uint8_t *out, size_t *len) { return sender.rpc_forward(r.msg.fm, out, len); } }, + // { Functions_FUNC_ENABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return rpc_enable_recv_msg(r.msg.flag, out, len); } }, + // { Functions_FUNC_DISABLE_RECV_TXT, [](const Request &r, uint8_t *out, size_t *len) { return rpc_disable_recv_msg(out, len); } }, + // { Functions_FUNC_EXEC_DB_QUERY, [](const Request &r, uint8_t *out, size_t *len) { return db::rpc_exec_db_query(r.msg.query, out, len); } }, + // { Functions_FUNC_ACCEPT_FRIEND, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_accept_friend(r.msg.v, out, len); } }, + // { Functions_FUNC_RECV_TRANSFER, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_receive_transfer(r.msg.tf, out, len); } }, + // { Functions_FUNC_REFRESH_PYQ, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_refresh_pyq(r.msg.ui64, out, len); } }, + // { Functions_FUNC_DOWNLOAD_ATTACH, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_download_attachment(r.msg.att, out, len); } }, + // { Functions_FUNC_GET_CONTACT_INFO, [](const Request &r, uint8_t *out, size_t *len) { return contact::rpc_get_contact_info(r.msg.str, out, len); } }, + // { Functions_FUNC_REVOKE_MSG, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_revoke_message(r.msg.ui64, out, len); } }, + // { Functions_FUNC_REFRESH_QRCODE, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_get_login_url(out, len); } }, + // { Functions_FUNC_DECRYPT_IMAGE, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_decrypt_image(r.msg.dec, out, len); } }, + // { Functions_FUNC_EXEC_OCR, [](const Request &r, uint8_t *out, size_t *len) { return misc::rpc_get_ocr_result(r.msg.str, out, len); } }, + // { Functions_FUNC_ADD_ROOM_MEMBERS, [](const Request &r, uint8_t *out, size_t *len) { return chatroom::rpc_add_chatroom_member(r.msg.m, out, len); } }, + // { Functions_FUNC_DEL_ROOM_MEMBERS, [](const Request &r, uint8_t *out, size_t *len) { return chatroom::rpc_delete_chatroom_member(r.msg.m, out, len); } }, + // { Functions_FUNC_INV_ROOM_MEMBERS, [](const Request &r, uint8_t *out, size_t *len) { return chatroom::rpc_invite_chatroom_member(r.msg.m, out, len); } }, // clang-format on }; -static bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len) +bool RpcServer::dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len) { bool ret = false; Request req = Request_init_default; @@ -198,8 +267,8 @@ static bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len LOG_DEBUG("{:#04x}[{}] length: {}", (uint8_t)req.func, magic_enum::enum_name(req.func), in_len); - auto it = rpc_function_map.find(req.func); - if (it != rpc_function_map.end()) { + auto it = RpcServer::rpcFunctionMap.find(req.func); + if (it != RpcServer::rpcFunctionMap.end()) { ret = it->second(req, out, out_len); } else { LOG_ERROR("[未知方法]"); @@ -209,46 +278,49 @@ static bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len return ret; } -static int RunRpcServer() +void RpcServer::runRpcServer() { - int rv = 0; - std::string url = build_url(cmdPort); + int rv = 0; + nng_socket cmdSock = NNG_SOCKET_INITIALIZER; + std::string url = build_url(port_); + if ((rv = nng_pair1_open(&cmdSock)) != 0) { - LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); - return rv; + LOG_ERROR("nng_pair1_open error: {}", nng_strerror(rv)); + return; } - if ((rv = nng_listen(cmdSock, url.c_str(), NULL, 0)) != 0) { - LOG_ERROR("nng_listen error {}", nng_strerror(rv)); - return rv; + if ((rv = nng_listen(cmdSock, url.c_str(), nullptr, 0)) != 0) { + LOG_ERROR("nng_listen error: {}", nng_strerror(rv)); + return; } - LOG_INFO("CMD Server listening on {}", url.c_str()); if ((rv = nng_setopt_ms(cmdSock, NNG_OPT_SENDTIMEO, 1000)) != 0) { LOG_ERROR("nng_setopt_ms error: {}", nng_strerror(rv)); - return rv; + return; } + LOG_INFO("CMD Server listening on {}", url); std::vector cmdBuffer(DEFAULT_BUF_SIZE); - isRpcRunning = true; - while (isRpcRunning) { - uint8_t *in = NULL; + + while (isRunning_.load()) { + uint8_t *in = nullptr; size_t in_len, out_len = cmdBuffer.size(); - if ((rv = nng_recv(cmdSock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) { + + rv = nng_recv(cmdSock, &in, &in_len, NNG_FLAG_ALLOC); + if (rv != 0) { LOG_ERROR("cmdSock-nng_recv error: {}", nng_strerror(rv)); break; } + try { - // LOG_BUFFER(in, in_len); if (dispatcher(in, in_len, cmdBuffer.data(), &out_len)) { LOG_DEBUG("Send data length {}", out_len); - // LOG_BUFFER(cmdBuffer.data(), out_len); + rv = nng_send(cmdSock, cmdBuffer.data(), out_len, 0); if (rv != 0) { LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv)); } - - } else { // Error + } else { // 处理失败情况 LOG_ERROR("Dispatcher failed..."); rv = nng_send(cmdSock, cmdBuffer.data(), 0, 0); if (rv != 0) { @@ -258,59 +330,11 @@ static int RunRpcServer() } catch (const std::exception &e) { LOG_ERROR(util::gb2312_to_utf8(e.what())); } catch (...) { - LOG_ERROR("Unknow exception."); + LOG_ERROR("Unknown exception."); } + nng_free(in, in_len); } - RpcStopServer(); - LOG_DEBUG("Leave RunRpcServer"); - return rv; -} - -int RpcStartServer(int port) -{ - if (isRpcRunning) { - LOG_WARN("RPC 服务已经启动"); - return 1; - } - - cmdPort = port; - cmdThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RunRpcServer, NULL, NULL, NULL); - if (cmdThread == NULL) { - LOG_ERROR("CreateThread failed: {}", GetLastError()); - return -1; - } -#if ENABLE_WX_LOG - EnableLog(); -#endif - return 0; -} - -int RpcStopServer() -{ - if (!isRpcRunning) { - LOG_WARN("RPC 服务未启动"); - return 1; - } - nng_close(cmdSock); - nng_close(msgSock); - handler.UnListenPyq(); - handler.UnListenMsg(); -#if ENABLE_WX_LOG - DisableLog(); -#endif - if (cmdThread != NULL) { - WaitForSingleObject(cmdThread, INFINITE); - CloseHandle(cmdThread); - cmdThread = NULL; - } - - if (msgThread != NULL) { - WaitForSingleObject(msgThread, INFINITE); - CloseHandle(msgThread); - msgThread = NULL; - } - isRpcRunning = false; - return 0; + LOG_DEBUG("Leave RunRpcServer"); } diff --git a/WeChatFerry/spy/rpc_server.h b/WeChatFerry/spy/rpc_server.h index 3bc31e1..400bbff 100644 --- a/WeChatFerry/spy/rpc_server.h +++ b/WeChatFerry/spy/rpc_server.h @@ -1,10 +1,52 @@ #pragma once -#ifdef SPY_EXPORTS -#define SPY_API __declspec(dllexport) -#else -#define SPY_API __declspec(dllimport) -#endif +#include +#include +#include +#include -int RpcStartServer(int port); -int RpcStopServer(); +#include + +#include "wcf.pb.h" + +class RpcServer +{ +public: + static RpcServer &getInstance(); + static void destroyInstance(); + + int start(int port = RPC_DEFAULT_PORT); + int stop(); + +private: + RpcServer(int port = RPC_DEFAULT_PORT); + ~RpcServer(); + RpcServer(const RpcServer &) = delete; + RpcServer &operator=(const RpcServer &) = delete; + + void runRpcServer(); + void receiveMessageCallback(); + bool enableRecvMsg(bool pyq, uint8_t *out, size_t *len); + bool disableRecvMsg(uint8_t *out, size_t *len); + bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len); + + static std::string build_url(int port); + + using FunctionHandler = std::function; + + // 服务器默认端口号和绑定地址 + static constexpr int RPC_DEFAULT_PORT = 10086; + static constexpr const char *RPC_SERVER_ADDRESS = "tcp://0.0.0.0"; + + int port_ = RPC_DEFAULT_PORT; + std::atomic isRunning_ { false }; + std::thread cmdThread_; + std::thread msgThread_; + + struct Deleter { + void operator()(RpcServer *server) const { delete server; } + }; + + static std::unique_ptr instance_; + static const std::unordered_map rpcFunctionMap; +}; diff --git a/WeChatFerry/spy/spy.cpp b/WeChatFerry/spy/spy.cpp index db59d50..b721e9f 100644 --- a/WeChatFerry/spy/spy.cpp +++ b/WeChatFerry/spy/spy.cpp @@ -7,7 +7,7 @@ #include "rpc_server.h" #include "util.h" -constexpr std::string_view SUPPORT_VERSION = "3.9.11.25"; +constexpr std::string_view SUPPORT_VERSION = "3.9.12.17"; UINT64 g_WeChatWinDllAddr = 0; @@ -32,12 +32,13 @@ int InitSpy(LPVOID args) } LOG_INFO(msg); - RpcStartServer(pp->port); + RpcServer::getInstance().start(pp->port); + return 0; } void CleanupSpy() { LOG_DEBUG("CleanupSpy"); - RpcStopServer(); + RpcServer::destroyInstance(); }