Refactor: replace C-style code with modern C++ idioms

This commit is contained in:
Changhua 2025-01-30 20:34:45 +08:00
parent 15267632a1
commit 2a27dec354

View File

@ -39,18 +39,19 @@
namespace fs = std::filesystem; namespace fs = std::filesystem;
bool gIsLogging = false; constexpr size_t DEFAULT_BUF_SIZE = 16 * 1024 * 1024;
bool gIsListening = false;
bool gIsListeningPyq = false;
mutex gMutex;
condition_variable gCV;
queue<WxMsg_t> gMsgQueue;
static int lport = 0; static int cmdPort = 0;
static DWORD lThreadId = 0; static bool isRpcRunning = false;
static bool lIsRunning = false; static bool isReveivingMsg = false;
static nng_socket cmdSock, msgSock; // TODO: 断开检测 static HANDLE cmdThread = NULL;
static uint8_t gBuffer[G_BUF_SIZE] = { 0 }; static HANDLE msgThread = NULL;
static nng_socket cmdSock = NNG_SOCKET_INITIALIZER; // TODO: 断开检测
static nng_socket msgSock = NNG_SOCKET_INITIALIZER; // TODO: 断开检测
auto &msgHandler = MessageHandler::getInstance();
static std::string BuildUrl(int port) { return "tcp://0.0.0.0:" + std::to_string(port); }
bool func_is_login(uint8_t *out, size_t *len) bool func_is_login(uint8_t *out, size_t *len)
{ {
@ -116,7 +117,7 @@ bool func_get_msg_types(uint8_t *out, size_t *len)
rsp.func = Functions_FUNC_GET_MSG_TYPES; rsp.func = Functions_FUNC_GET_MSG_TYPES;
rsp.which_msg = Response_types_tag; rsp.which_msg = Response_types_tag;
MsgTypes_t types = GetMsgTypes(); MsgTypes_t types = msgHandler.GetMsgTypes();
rsp.msg.types.types.funcs.encode = encode_types; rsp.msg.types.types.funcs.encode = encode_types;
rsp.msg.types.types.arg = &types; rsp.msg.types.types.arg = &types;
@ -429,67 +430,68 @@ bool func_forward_msg(uint64_t id, char *receiver, uint8_t *out, size_t *len)
static void PushMessage() static void PushMessage()
{ {
static uint8_t buffer[G_BUF_SIZE] = { 0 };
int rv; int rv;
Response rsp = Response_init_default; Response rsp = Response_init_default;
rsp.func = Functions_FUNC_ENABLE_RECV_TXT; rsp.func = Functions_FUNC_ENABLE_RECV_TXT;
rsp.which_msg = Response_wxmsg_tag; rsp.which_msg = Response_wxmsg_tag;
std::vector<uint8_t> msgBuffer(DEFAULT_BUF_SIZE);
pb_ostream_t stream = pb_ostream_from_buffer(buffer, G_BUF_SIZE); pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size());
char url[URL_SIZE + 1] = { 0 }; std::string url = BuildUrl(cmdPort + 1);
sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport + 1);
if ((rv = nng_pair1_open(&msgSock)) != 0) { if ((rv = nng_pair1_open(&msgSock)) != 0) {
LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv));
return; return;
} }
if ((rv = nng_listen(msgSock, url, NULL, 0)) != 0) { if ((rv = nng_listen(msgSock, url.c_str(), NULL, 0)) != 0) {
LOG_ERROR("nng_listen error {}", nng_strerror(rv)); LOG_ERROR("nng_listen error {}", nng_strerror(rv));
return; return;
} }
LOG_INFO("MSG Server listening on {}", url); LOG_INFO("MSG Server listening on {}", url.c_str());
if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 2000)) != 0) { if ((rv = nng_setopt_ms(msgSock, NNG_OPT_SENDTIMEO, 5000)) != 0) {
LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv)); LOG_ERROR("nng_setopt_ms: {}", nng_strerror(rv));
return; return;
} }
while (gIsListening) { while (msgHandler.isMessageListening()) {
unique_lock<mutex> lock(gMutex); std::unique_lock<std::mutex> lock(msgHandler.getMutex());
if (gCV.wait_for(lock, chrono::milliseconds(1000), []() { return !gMsgQueue.empty(); })) { std::optional<WxMsg_t> msgOpt;
while (!gMsgQueue.empty()) { auto hasMessage = [&]() {
auto wxmsg = gMsgQueue.front(); msgOpt = msgHandler.popMessage();
rsp.msg.wxmsg.id = wxmsg.id; return msgOpt.has_value();
rsp.msg.wxmsg.is_self = wxmsg.is_self; };
rsp.msg.wxmsg.is_group = wxmsg.is_group;
rsp.msg.wxmsg.type = wxmsg.type;
rsp.msg.wxmsg.ts = wxmsg.ts;
rsp.msg.wxmsg.roomid = (char *)wxmsg.roomid.c_str();
rsp.msg.wxmsg.content = (char *)wxmsg.content.c_str();
rsp.msg.wxmsg.sender = (char *)wxmsg.sender.c_str();
rsp.msg.wxmsg.sign = (char *)wxmsg.sign.c_str();
rsp.msg.wxmsg.thumb = (char *)wxmsg.thumb.c_str();
rsp.msg.wxmsg.extra = (char *)wxmsg.extra.c_str();
rsp.msg.wxmsg.xml = (char *)wxmsg.xml.c_str();
gMsgQueue.pop();
LOG_DEBUG("Push msg: {}", wxmsg.content);
pb_ostream_t stream = pb_ostream_from_buffer(buffer, G_BUF_SIZE);
if (!pb_encode(&stream, Response_fields, &rsp)) {
LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream));
continue;
}
rv = nng_send(msgSock, buffer, stream.bytes_written, 0); if (msgHandler.getConditionVariable().wait_for(lock, std::chrono::milliseconds(1000), hasMessage)) {
if (rv != 0) { WxMsg_t wxmsg = std::move(msgOpt.value());
LOG_ERROR("msgSock-nng_send: {}", nng_strerror(rv)); rsp.msg.wxmsg.id = wxmsg.id;
} rsp.msg.wxmsg.is_self = wxmsg.is_self;
LOG_DEBUG("Send data length {}", stream.bytes_written); rsp.msg.wxmsg.is_group = wxmsg.is_group;
rsp.msg.wxmsg.type = wxmsg.type;
rsp.msg.wxmsg.ts = wxmsg.ts;
rsp.msg.wxmsg.roomid = (char *)wxmsg.roomid.c_str();
rsp.msg.wxmsg.content = (char *)wxmsg.content.c_str();
rsp.msg.wxmsg.sender = (char *)wxmsg.sender.c_str();
rsp.msg.wxmsg.sign = (char *)wxmsg.sign.c_str();
rsp.msg.wxmsg.thumb = (char *)wxmsg.thumb.c_str();
rsp.msg.wxmsg.extra = (char *)wxmsg.extra.c_str();
rsp.msg.wxmsg.xml = (char *)wxmsg.xml.c_str();
LOG_DEBUG("Push msg: {}", wxmsg.content);
pb_ostream_t stream = pb_ostream_from_buffer(msgBuffer.data(), msgBuffer.size());
if (!pb_encode(&stream, Response_fields, &rsp)) {
LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream));
continue;
} }
rv = nng_send(msgSock, msgBuffer.data(), stream.bytes_written, 0);
if (rv != 0) {
LOG_ERROR("msgSock-nng_send: {}", nng_strerror(rv));
}
LOG_DEBUG("Send data length {}", stream.bytes_written);
} }
} }
nng_close(msgSock);
} }
bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len) bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len)
@ -497,22 +499,18 @@ bool func_enable_recv_txt(bool pyq, uint8_t *out, size_t *len)
Response rsp = Response_init_default; Response rsp = Response_init_default;
rsp.func = Functions_FUNC_ENABLE_RECV_TXT; rsp.func = Functions_FUNC_ENABLE_RECV_TXT;
rsp.which_msg = Response_status_tag; rsp.which_msg = Response_status_tag;
rsp.msg.status = 0; rsp.msg.status = msgHandler.ListenMsg();
if (!gIsListening) { if (rsp.msg.status == 0) {
ListenMessage();
if (pyq) { if (pyq) {
ListenPyq(); msgHandler.ListenPyq();
} }
HANDLE msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL); msgThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PushMessage, NULL, NULL, NULL);
if (msgThread == NULL) { if (msgThread == NULL) {
rsp.msg.status = GetLastError(); rsp.msg.status = GetLastError();
LOG_ERROR("func_enable_recv_txt failed: {}", rsp.msg.status); LOG_ERROR("func_enable_recv_txt failed: {}", rsp.msg.status);
} else {
CloseHandle(msgThread);
} }
} }
pb_ostream_t stream = pb_ostream_from_buffer(out, *len); pb_ostream_t stream = pb_ostream_from_buffer(out, *len);
if (!pb_encode(&stream, Response_fields, &rsp)) { if (!pb_encode(&stream, Response_fields, &rsp)) {
LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream)); LOG_ERROR("Encoding failed: {}", PB_GET_ERROR(&stream));
@ -528,10 +526,15 @@ bool func_disable_recv_txt(uint8_t *out, size_t *len)
Response rsp = Response_init_default; Response rsp = Response_init_default;
rsp.func = Functions_FUNC_DISABLE_RECV_TXT; rsp.func = Functions_FUNC_DISABLE_RECV_TXT;
rsp.which_msg = Response_status_tag; rsp.which_msg = Response_status_tag;
rsp.msg.status = 0; rsp.msg.status = msgHandler.UnListenMsg();
UnListenPyq(); if (rsp.msg.status == 0) {
UnListenMessage(); // 可能需要1秒之后才能退出见 PushMessage msgHandler.UnListenPyq();
if (msgThread != NULL) {
TerminateThread(msgThread, 0);
msgThread = NULL;
}
}
pb_ostream_t stream = pb_ostream_from_buffer(out, *len); pb_ostream_t stream = pb_ostream_from_buffer(out, *len);
if (!pb_encode(&stream, Response_fields, &rsp)) { if (!pb_encode(&stream, Response_fields, &rsp)) {
@ -990,52 +993,51 @@ static bool dispatcher(uint8_t *in, size_t in_len, uint8_t *out, size_t *out_len
return ret; return ret;
} }
static int RunServer() static int RunRpcServer()
{ {
int rv = 0; int rv = 0;
char url[URL_SIZE + 1] = { 0 }; std::string url = BuildUrl(cmdPort);
sprintf_s(url, URL_SIZE, "%s:%d", BASE_URL, lport);
if ((rv = nng_pair1_open(&cmdSock)) != 0) { if ((rv = nng_pair1_open(&cmdSock)) != 0) {
LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv)); LOG_ERROR("nng_pair0_open error {}", nng_strerror(rv));
return rv; return rv;
} }
if ((rv = nng_listen(cmdSock, (char *)url, NULL, 0)) != 0) { if ((rv = nng_listen(cmdSock, url.c_str(), NULL, 0)) != 0) {
LOG_ERROR("nng_listen error {}", nng_strerror(rv)); LOG_ERROR("nng_listen error {}", nng_strerror(rv));
return rv; return rv;
} }
LOG_INFO("CMD Server listening on {}", (char *)url); LOG_INFO("CMD Server listening on {}", url.c_str());
if ((rv = nng_setopt_ms(cmdSock, NNG_OPT_SENDTIMEO, 1000)) != 0) { if ((rv = nng_setopt_ms(cmdSock, NNG_OPT_SENDTIMEO, 1000)) != 0) {
LOG_ERROR("nng_setopt_ms error: {}", nng_strerror(rv)); LOG_ERROR("nng_setopt_ms error: {}", nng_strerror(rv));
return rv; return rv;
} }
lIsRunning = true; std::vector<uint8_t> cmdBuffer(DEFAULT_BUF_SIZE);
while (lIsRunning) { isRpcRunning = true;
while (isRpcRunning) {
uint8_t *in = NULL; uint8_t *in = NULL;
size_t in_len, out_len = G_BUF_SIZE; size_t in_len, out_len = cmdBuffer.size();
if ((rv = nng_recv(cmdSock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) { if ((rv = nng_recv(cmdSock, &in, &in_len, NNG_FLAG_ALLOC)) != 0) {
LOG_ERROR("cmdSock-nng_recv error: {}", nng_strerror(rv)); LOG_ERROR("cmdSock-nng_recv error: {}", nng_strerror(rv));
break; break;
} }
try { try {
// LOG_BUFFER(in, in_len); // LOG_BUFFER(in, in_len);
if (dispatcher(in, in_len, gBuffer, &out_len)) { if (dispatcher(in, in_len, cmdBuffer.data(), &out_len)) {
LOG_DEBUG("Send data length {}", out_len); LOG_DEBUG("Send data length {}", out_len);
// LOG_BUFFER(gBuffer, out_len); // LOG_BUFFER(cmdBuffer.data(), out_len);
rv = nng_send(cmdSock, gBuffer, out_len, 0); rv = nng_send(cmdSock, cmdBuffer.data(), out_len, 0);
if (rv != 0) { if (rv != 0) {
LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv)); LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv));
} }
} else { // Error } else { // Error
LOG_ERROR("Dispatcher failed..."); LOG_ERROR("Dispatcher failed...");
rv = nng_send(cmdSock, gBuffer, 0, 0); rv = nng_send(cmdSock, cmdBuffer.data(), 0, 0);
if (rv != 0) { if (rv != 0) {
LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv)); LOG_ERROR("cmdSock-nng_send: {}", nng_strerror(rv));
} }
// break;
} }
} catch (const std::exception &e) { } catch (const std::exception &e) {
LOG_ERROR(GB2312ToUtf8(e.what())); LOG_ERROR(GB2312ToUtf8(e.what()));
@ -1045,21 +1047,22 @@ static int RunServer()
nng_free(in, in_len); nng_free(in, in_len);
} }
RpcStopServer(); RpcStopServer();
LOG_DEBUG("Leave RunServer"); LOG_DEBUG("Leave RunRpcServer");
return rv; return rv;
} }
int RpcStartServer(int port) int RpcStartServer(int port)
{ {
if (lIsRunning) { if (isRpcRunning) {
return 0; LOG_WARN("RPC 服务已经启动");
return 1;
} }
lport = port; cmdPort = port;
cmdThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RunRpcServer, NULL, NULL, NULL);
HANDLE rpcThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)RunServer, NULL, NULL, &lThreadId); if (cmdThread == NULL) {
if (rpcThread != 0) { LOG_ERROR("CreateThread failed: {}", GetLastError());
CloseHandle(rpcThread); return -1;
} }
#if ENABLE_WX_LOG #if ENABLE_WX_LOG
EnableLog(); EnableLog();
@ -1069,16 +1072,29 @@ int RpcStartServer(int port)
int RpcStopServer() int RpcStopServer()
{ {
if (lIsRunning) { if (!isRpcRunning) {
nng_close(cmdSock); LOG_WARN("RPC 服务未启动");
nng_close(msgSock); return 1;
// UnListenMessage();
lIsRunning = false;
Sleep(1000);
LOG_INFO("Server stoped.");
} }
nng_close(cmdSock);
nng_close(msgSock);
msgHandler.UnListenPyq();
msgHandler.UnListenMsg();
#if ENABLE_WX_LOG #if ENABLE_WX_LOG
DisableLog(); DisableLog();
#endif #endif
if (cmdThread != NULL) {
WaitForSingleObject(cmdThread, INFINITE);
CloseHandle(cmdThread);
cmdThread = NULL;
}
if (msgThread != NULL) {
WaitForSingleObject(msgThread, INFINITE);
CloseHandle(msgThread);
msgThread = NULL;
}
isRpcRunning = false;
return 0; return 0;
} }