diff --git a/spy/receive_msg.cpp b/spy/receive_msg.cpp index 19d0277..1cf55ad 100644 --- a/spy/receive_msg.cpp +++ b/spy/receive_msg.cpp @@ -1,6 +1,8 @@ #pragma execution_character_set("utf-8") #include "framework.h" +#include +#include #include #include "load_calls.h" @@ -9,7 +11,8 @@ // Defined in rpc_server.cpp extern bool gIsListening; -extern HANDLE g_hEvent; +extern mutex gMutex; +extern condition_variable gCV; extern queue gMsgQueue; // Defined in spy.cpp @@ -87,11 +90,12 @@ void DispatchMsg(DWORD reg) } wxMsg.content = GetStringByAddress(*p + g_WxCalls.recvMsg.content); - // 推送到队列 - gMsgQueue.push(wxMsg); + { + unique_lock lock(gMutex); + gMsgQueue.push(wxMsg); // 推送到队列 + } - // 通知各方消息就绪 - SetEvent(g_hEvent); + gCV.notify_all(); // 通知各方消息就绪 } __declspec(naked) void RecieveMsgFunc() diff --git a/spy/rpc_server.cpp b/spy/rpc_server.cpp index 522c2ee..7d95b79 100644 --- a/spy/rpc_server.cpp +++ b/spy/rpc_server.cpp @@ -1,6 +1,9 @@ #pragma warning(disable : 4251) +#include +#include #include +#include #include #include #include @@ -34,7 +37,8 @@ extern int IsLogin(void); // Defined in spy.cpp extern string GetSelfWxid(); // Defined in spy.cpp bool gIsListening; -HANDLE g_hEvent; // New message signal +mutex gMutex; +condition_variable gCV; queue gMsgQueue; static DWORD lThreadId = 0; @@ -259,38 +263,31 @@ static void PushMessage() } while (gIsListening) { - // 中断式,兼顾及时性和CPU使用率 - rv = WaitForSingleObject(g_hEvent, 1000); // 等待消息,每秒检查一次条件 - if (rv == WAIT_TIMEOUT) { - continue; - } else if (rv != WAIT_OBJECT_0) { - LOG_ERROR("WaitForSingleObject ERROR[{}]: {}", rv, GetLastError()); - continue; - } + unique_lock lock(gMutex); + if (gCV.wait_for(lock, chrono::milliseconds(1000), []() { return !gMsgQueue.empty(); })) { + while (!gMsgQueue.empty()) { + auto wxmgs = gMsgQueue.front(); + rsp.msg.wxmsg.is_self = wxmgs.is_self; + rsp.msg.wxmsg.is_group = wxmgs.is_group; + rsp.msg.wxmsg.type = wxmgs.type; + rsp.msg.wxmsg.id = (char *)wxmgs.id.c_str(); + rsp.msg.wxmsg.xml = (char *)wxmgs.xml.c_str(); + rsp.msg.wxmsg.sender = (char *)wxmgs.sender.c_str(); + rsp.msg.wxmsg.roomid = (char *)wxmgs.roomid.c_str(); + rsp.msg.wxmsg.content = (char *)wxmgs.content.c_str(); + gMsgQueue.pop(); + LOG_DEBUG("Recv msg: {}", wxmgs.content); + if (!pb_encode(&stream, Response_fields, &rsp)) { + LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream)); + continue; + } - while (!gMsgQueue.empty()) { - auto wxmgs = gMsgQueue.front(); - rsp.msg.wxmsg.is_self = wxmgs.is_self; - rsp.msg.wxmsg.is_group = wxmgs.is_group; - rsp.msg.wxmsg.type = wxmgs.type; - rsp.msg.wxmsg.id = (char *)wxmgs.id.c_str(); - rsp.msg.wxmsg.xml = (char *)wxmgs.xml.c_str(); - rsp.msg.wxmsg.sender = (char *)wxmgs.sender.c_str(); - rsp.msg.wxmsg.roomid = (char *)wxmgs.roomid.c_str(); - rsp.msg.wxmsg.content = (char *)wxmgs.content.c_str(); - gMsgQueue.pop(); - - if (!pb_encode(&stream, Response_fields, &rsp)) { - LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream)); - continue; - } - - rv = nng_send(msg_sock, buffer, stream.bytes_written, 0); - if (rv != 0) { - LOG_ERROR("nng_send: {}", rv); + rv = nng_send(msg_sock, buffer, stream.bytes_written, 0); + if (rv != 0) { + LOG_ERROR("nng_send: {}", rv); + } } } - ResetEvent(g_hEvent); } nng_close(msg_sock); } @@ -302,16 +299,6 @@ bool func_enable_recv_txt(uint8_t *out, size_t *len) rsp.which_msg = Response_status_tag; rsp.msg.status = -1; - g_hEvent = CreateEvent(NULL, // default security attributes - TRUE, // manual-reset event - FALSE, // initial state is nonsignaled - NULL // unnamed - ); - if (g_hEvent == NULL) { - LOG_ERROR("CreateEvent error: {}", GetLastError()); - return false; - } - ListenMessage(); HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL); if (msgThread != 0) {