Make message passing thread safe

This commit is contained in:
Changhua 2023-02-26 23:07:04 +08:00
parent 91458af420
commit c4d525ee23
2 changed files with 36 additions and 45 deletions

View File

@ -1,6 +1,8 @@
#pragma execution_character_set("utf-8") #pragma execution_character_set("utf-8")
#include "framework.h" #include "framework.h"
#include <condition_variable>
#include <mutex>
#include <queue> #include <queue>
#include "load_calls.h" #include "load_calls.h"
@ -9,7 +11,8 @@
// Defined in rpc_server.cpp // Defined in rpc_server.cpp
extern bool gIsListening; extern bool gIsListening;
extern HANDLE g_hEvent; extern mutex gMutex;
extern condition_variable gCV;
extern queue<WxMsg_t> gMsgQueue; extern queue<WxMsg_t> gMsgQueue;
// Defined in spy.cpp // Defined in spy.cpp
@ -87,11 +90,12 @@ void DispatchMsg(DWORD reg)
} }
wxMsg.content = GetStringByAddress(*p + g_WxCalls.recvMsg.content); wxMsg.content = GetStringByAddress(*p + g_WxCalls.recvMsg.content);
// 推送到队列 {
gMsgQueue.push(wxMsg); unique_lock<mutex> lock(gMutex);
gMsgQueue.push(wxMsg); // 推送到队列
}
// 通知各方消息就绪 gCV.notify_all(); // 通知各方消息就绪
SetEvent(g_hEvent);
} }
__declspec(naked) void RecieveMsgFunc() __declspec(naked) void RecieveMsgFunc()

View File

@ -1,6 +1,9 @@
#pragma warning(disable : 4251) #pragma warning(disable : 4251)
#include <chrono>
#include <condition_variable>
#include <memory> #include <memory>
#include <mutex>
#include <queue> #include <queue>
#include <random> #include <random>
#include <sstream> #include <sstream>
@ -34,7 +37,8 @@ extern int IsLogin(void); // Defined in spy.cpp
extern string GetSelfWxid(); // Defined in spy.cpp extern string GetSelfWxid(); // Defined in spy.cpp
bool gIsListening; bool gIsListening;
HANDLE g_hEvent; // New message signal mutex gMutex;
condition_variable gCV;
queue<WxMsg_t> gMsgQueue; queue<WxMsg_t> gMsgQueue;
static DWORD lThreadId = 0; static DWORD lThreadId = 0;
@ -259,38 +263,31 @@ static void PushMessage()
} }
while (gIsListening) { while (gIsListening) {
// 中断式兼顾及时性和CPU使用率 unique_lock<mutex> lock(gMutex);
rv = WaitForSingleObject(g_hEvent, 1000); // 等待消息,每秒检查一次条件 if (gCV.wait_for(lock, chrono::milliseconds(1000), []() { return !gMsgQueue.empty(); })) {
if (rv == WAIT_TIMEOUT) { while (!gMsgQueue.empty()) {
continue; auto wxmgs = gMsgQueue.front();
} else if (rv != WAIT_OBJECT_0) { rsp.msg.wxmsg.is_self = wxmgs.is_self;
LOG_ERROR("WaitForSingleObject ERROR[{}]: {}", rv, GetLastError()); rsp.msg.wxmsg.is_group = wxmgs.is_group;
continue; rsp.msg.wxmsg.type = wxmgs.type;
} rsp.msg.wxmsg.id = (char *)wxmgs.id.c_str();
rsp.msg.wxmsg.xml = (char *)wxmgs.xml.c_str();
rsp.msg.wxmsg.sender = (char *)wxmgs.sender.c_str();
rsp.msg.wxmsg.roomid = (char *)wxmgs.roomid.c_str();
rsp.msg.wxmsg.content = (char *)wxmgs.content.c_str();
gMsgQueue.pop();
LOG_DEBUG("Recv msg: {}", wxmgs.content);
if (!pb_encode(&stream, Response_fields, &rsp)) {
LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream));
continue;
}
while (!gMsgQueue.empty()) { rv = nng_send(msg_sock, buffer, stream.bytes_written, 0);
auto wxmgs = gMsgQueue.front(); if (rv != 0) {
rsp.msg.wxmsg.is_self = wxmgs.is_self; LOG_ERROR("nng_send: {}", rv);
rsp.msg.wxmsg.is_group = wxmgs.is_group; }
rsp.msg.wxmsg.type = wxmgs.type;
rsp.msg.wxmsg.id = (char *)wxmgs.id.c_str();
rsp.msg.wxmsg.xml = (char *)wxmgs.xml.c_str();
rsp.msg.wxmsg.sender = (char *)wxmgs.sender.c_str();
rsp.msg.wxmsg.roomid = (char *)wxmgs.roomid.c_str();
rsp.msg.wxmsg.content = (char *)wxmgs.content.c_str();
gMsgQueue.pop();
if (!pb_encode(&stream, Response_fields, &rsp)) {
LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream));
continue;
}
rv = nng_send(msg_sock, buffer, stream.bytes_written, 0);
if (rv != 0) {
LOG_ERROR("nng_send: {}", rv);
} }
} }
ResetEvent(g_hEvent);
} }
nng_close(msg_sock); nng_close(msg_sock);
} }
@ -302,16 +299,6 @@ bool func_enable_recv_txt(uint8_t *out, size_t *len)
rsp.which_msg = Response_status_tag; rsp.which_msg = Response_status_tag;
rsp.msg.status = -1; rsp.msg.status = -1;
g_hEvent = CreateEvent(NULL, // default security attributes
TRUE, // manual-reset event
FALSE, // initial state is nonsignaled
NULL // unnamed
);
if (g_hEvent == NULL) {
LOG_ERROR("CreateEvent error: {}", GetLastError());
return false;
}
ListenMessage(); ListenMessage();
HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL); HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL);
if (msgThread != 0) { if (msgThread != 0) {