diff --git a/WeChatFerry/spy/rpc_server.cpp b/WeChatFerry/spy/rpc_server.cpp index 1d09e30..71c8f60 100644 --- a/WeChatFerry/spy/rpc_server.cpp +++ b/WeChatFerry/spy/rpc_server.cpp @@ -39,18 +39,19 @@ namespace fs = std::filesystem; -bool gIsLogging = false; -bool gIsListening = false; -bool gIsListeningPyq = false; -mutex gMutex; -condition_variable gCV; -queue gMsgQueue; +constexpr size_t DEFAULT_BUF_SIZE = 16 * 1024 * 1024; -static int lport = 0; -static DWORD lThreadId = 0; -static bool lIsRunning = false; -static nng_socket cmdSock, msgSock; // TODO: 断开检测 -static uint8_t gBuffer[G_BUF_SIZE] = { 0 }; +static int cmdPort = 0; +static bool isRpcRunning = false; +static bool isReveivingMsg = false; +static HANDLE cmdThread = NULL; +static HANDLE msgThread = NULL; +static nng_socket cmdSock = NNG_SOCKET_INITIALIZER; // TODO: 断开检测 +static nng_socket msgSock = NNG_SOCKET_INITIALIZER; // TODO: 断开检测 + +auto &msgHandler = MessageHandler::getInstance(); + +static std::string BuildUrl(int port) { return "tcp://0.0.0.0:" + std::to_string(port); } bool func_is_login(uint8_t *out, size_t *len) { @@ -116,7 +117,7 @@ bool func_get_msg_types(uint8_t *out, size_t *len) rsp.func = Functions_FUNC_GET_MSG_TYPES; rsp.which_msg = Response_types_tag; - MsgTypes_t types = GetMsgTypes(); + MsgTypes_t types = msgHandler.GetMsgTypes(); rsp.msg.types.types.funcs.encode = encode_types; rsp.msg.types.types.arg = &types; @@ -429,67 +430,68 @@ bool func_forward_msg(uint64_t id, char *receiver, uint8_t *out, size_t *len) static void PushMessage() { - static uint8_t buffer[G_BUF_SIZE] = { 0 }; - int rv; Response rsp = Response_init_default; rsp.func = Functions_FUNC_ENABLE_RECV_TXT; rsp.which_msg = Response_wxmsg_tag; + std::vector msgBuffer(DEFAULT_BUF_SIZE); - pb_ostream_t stream = pb_ostream_from_buffer(buffer, G_BUF_SIZE); + pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size()); - char url[URL_SIZE + 1] = { 0 }; - sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport + 1); + std::string url = BuildUrl(cmdPort + 1); if ((rv = nng_pair1_open(&msgSock)) != 0) { LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); return; } - if ((rv = nng_listen(msgSock, url, NULL, 0)) != 0) { + if ((rv = nng_listen(msgSock, url.c_str(), NULL, 0)) != 0) { LOG_ERROR("nng_listen error {}", nng_strerror(rv)); return; } - LOG_INFO("MSG Server listening on {}", url); - if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 2000)) != 0) { + LOG_INFO("MSG Server listening on {}", url.c_str()); + if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 5000)) != 0) { LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv)); return; } - while (gIsListening) { - unique_lock lock(gMutex); - if (gCV.wait_for(lock, chrono::milliseconds(1000), []() { return !gMsgQueue.empty(); })) { - while (!gMsgQueue.empty()) { - auto wxmsg = gMsgQueue.front(); - rsp.msg.wxmsg.id = wxmsg.id; - rsp.msg.wxmsg.is_self = wxmsg.is_self; - rsp.msg.wxmsg.is_group = wxmsg.is_group; - rsp.msg.wxmsg.type = wxmsg.type; - rsp.msg.wxmsg.ts = wxmsg.ts; - rsp.msg.wxmsg.roomid = (char *)wxmsg.roomid.c_str(); - rsp.msg.wxmsg.content = (char *)wxmsg.content.c_str(); - rsp.msg.wxmsg.sender = (char *)wxmsg.sender.c_str(); - rsp.msg.wxmsg.sign = (char *)wxmsg.sign.c_str(); - rsp.msg.wxmsg.thumb = (char *)wxmsg.thumb.c_str(); - rsp.msg.wxmsg.extra = (char *)wxmsg.extra.c_str(); - rsp.msg.wxmsg.xml = (char *)wxmsg.xml.c_str(); - gMsgQueue.pop(); - LOG_DEBUG("Push msg: {}", wxmsg.content); - pb_ostream_t stream = pb_ostream_from_buffer(buffer, G_BUF_SIZE); - if (!pb_encode(&stream, Response_fields, &rsp)) { - LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream)); - continue; - } + while (msgHandler.isMessageListening()) { + std::unique_lock lock(msgHandler.getMutex()); + std::optional msgOpt; + auto hasMessage = [&]() { + msgOpt = msgHandler.popMessage(); + return msgOpt.has_value(); + }; - rv = nng_send(msgSock, buffer, stream.bytes_written, 0); - if (rv != 0) { - LOG_ERROR("msgSock-nng_send: {}", nng_strerror(rv)); - } - LOG_DEBUG("Send data length {}", stream.bytes_written); + if (msgHandler.getConditionVariable().wait_for(lock, std::chrono::milliseconds(1000), hasMessage)) { + WxMsg_t wxmsg = std::move(msgOpt.value()); + rsp.msg.wxmsg.id = wxmsg.id; + rsp.msg.wxmsg.is_self = wxmsg.is_self; + rsp.msg.wxmsg.is_group = wxmsg.is_group; + rsp.msg.wxmsg.type = wxmsg.type; + rsp.msg.wxmsg.ts = wxmsg.ts; + rsp.msg.wxmsg.roomid = (char *)wxmsg.roomid.c_str(); + rsp.msg.wxmsg.content = (char *)wxmsg.content.c_str(); + rsp.msg.wxmsg.sender = (char *)wxmsg.sender.c_str(); + rsp.msg.wxmsg.sign = (char *)wxmsg.sign.c_str(); + rsp.msg.wxmsg.thumb = (char *)wxmsg.thumb.c_str(); + rsp.msg.wxmsg.extra = (char *)wxmsg.extra.c_str(); + rsp.msg.wxmsg.xml = (char *)wxmsg.xml.c_str(); + + LOG_DEBUG("Push msg: {}", wxmsg.content); + pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size()); + if (!pb_encode(&stream, Response_fields, &rsp)) { + LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream)); + continue; } + + rv = nng_send(msgSock, msgBuffer.data(), stream.bytes_written, 0); + if (rv != 0) { + LOG_ERROR("msgSock-nng_send: {}", nng_strerror(rv)); + } + LOG_DEBUG("Send data length {}", stream.bytes_written); } } - nng_close(msgSock); } bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len) @@ -497,22 +499,18 @@ bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len) Response rsp = Response_init_default; rsp.func = Functions_FUNC_ENABLE_RECV_TXT; rsp.which_msg = Response_status_tag; - rsp.msg.status = 0; + rsp.msg.status = msgHandler.ListenMsg(); - if (!gIsListening) { - ListenMessage(); + if (rsp.msg.status == 0) { if (pyq) { - ListenPyq(); + msgHandler.ListenPyq(); } - HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL); + msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL); if (msgThread == NULL) { rsp.msg.status = GetLastError(); LOG_ERROR("func_enable_recv_txt failed: {}", rsp.msg.status); - } else { - CloseHandle(msgThread); } } - pb_ostream_t stream = pb_ostream_from_buffer(out, *len); if (!pb_encode(&stream, Response_fields, &rsp)) { LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream)); @@ -528,10 +526,15 @@ bool func_disable_recv_txt(uint8_t *out, size_t *len) Response rsp = Response_init_default; rsp.func = Functions_FUNC_DISABLE_RECV_TXT; rsp.which_msg = Response_status_tag; - rsp.msg.status = 0; + rsp.msg.status = msgHandler.UnListenMsg(); - UnListenPyq(); - UnListenMessage(); // 可能需要1秒之后才能退出,见 PushMessage + if (rsp.msg.status == 0) { + msgHandler.UnListenPyq(); + if (msgThread != NULL) { + TerminateThread(msgThread, 0); + msgThread = NULL; + } + } pb_ostream_t stream = pb_ostream_from_buffer(out, *len); if (!pb_encode(&stream, Response_fields, &rsp)) { @@ -990,52 +993,51 @@ static bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len return ret; } -static int RunServer() +static int RunRpcServer() { - int rv = 0; - char url[URL_SIZE + 1] = { 0 }; - sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport); + int rv = 0; + std::string url = BuildUrl(cmdPort); if ((rv = nng_pair1_open(&cmdSock)) != 0) { LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); return rv; } - if ((rv = nng_listen(cmdSock, (char *)url, NULL, 0)) != 0) { + if ((rv = nng_listen(cmdSock, url.c_str(), NULL, 0)) != 0) { LOG_ERROR("nng_listen error {}", nng_strerror(rv)); return rv; } - LOG_INFO("CMD Server listening on {}", (char *)url); + LOG_INFO("CMD Server listening on {}", url.c_str()); if ((rv = nng_setopt_ms(cmdSock, NNG_OPT_SENDTIMEO, 1000)) != 0) { LOG_ERROR("nng_setopt_ms error: {}", nng_strerror(rv)); return rv; } - lIsRunning = true; - while (lIsRunning) { + std::vector cmdBuffer(DEFAULT_BUF_SIZE); + isRpcRunning = true; + while (isRpcRunning) { uint8_t *in = NULL; - size_t in_len, out_len = G_BUF_SIZE; + size_t in_len, out_len = cmdBuffer.size(); if ((rv = nng_recv(cmdSock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) { LOG_ERROR("cmdSock-nng_recv error: {}", nng_strerror(rv)); break; } try { // LOG_BUFFER(in, in_len); - if (dispatcher(in, in_len, gBuffer, &out_len)) { + if (dispatcher(in, in_len, cmdBuffer.data(), &out_len)) { LOG_DEBUG("Send data length {}", out_len); - // LOG_BUFFER(gBuffer, out_len); - rv = nng_send(cmdSock, gBuffer, out_len, 0); + // LOG_BUFFER(cmdBuffer.data(), out_len); + rv = nng_send(cmdSock, cmdBuffer.data(), out_len, 0); if (rv != 0) { LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv)); } } else { // Error LOG_ERROR("Dispatcher failed..."); - rv = nng_send(cmdSock, gBuffer, 0, 0); + rv = nng_send(cmdSock, cmdBuffer.data(), 0, 0); if (rv != 0) { LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv)); } - // break; } } catch (const std::exception &e) { LOG_ERROR(GB2312ToUtf8(e.what())); @@ -1045,21 +1047,22 @@ static int RunServer() nng_free(in, in_len); } RpcStopServer(); - LOG_DEBUG("Leave RunServer"); + LOG_DEBUG("Leave RunRpcServer"); return rv; } int RpcStartServer(int port) { - if (lIsRunning) { - return 0; + if (isRpcRunning) { + LOG_WARN("RPC 服务已经启动"); + return 1; } - lport = port; - - HANDLE rpcThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RunServer, NULL, NULL, &lThreadId); - if (rpcThread != 0) { - CloseHandle(rpcThread); + cmdPort = port; + cmdThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RunRpcServer, NULL, NULL, NULL); + if (cmdThread == NULL) { + LOG_ERROR("CreateThread failed: {}", GetLastError()); + return -1; } #if ENABLE_WX_LOG EnableLog(); @@ -1069,16 +1072,29 @@ int RpcStartServer(int port) int RpcStopServer() { - if (lIsRunning) { - nng_close(cmdSock); - nng_close(msgSock); - // UnListenMessage(); - lIsRunning = false; - Sleep(1000); - LOG_INFO("Server stoped."); + if (!isRpcRunning) { + LOG_WARN("RPC 服务未启动"); + return 1; } + + nng_close(cmdSock); + nng_close(msgSock); + msgHandler.UnListenPyq(); + msgHandler.UnListenMsg(); #if ENABLE_WX_LOG DisableLog(); #endif + if (cmdThread != NULL) { + WaitForSingleObject(cmdThread, INFINITE); + CloseHandle(cmdThread); + cmdThread = NULL; + } + + if (msgThread != NULL) { + WaitForSingleObject(msgThread, INFINITE); + CloseHandle(msgThread); + msgThread = NULL; + } + isRpcRunning = false; return 0; }