WeChatFerry/spy/rpc_server.cpp

285 lines
8.2 KiB
C++
Raw Normal View History

2022-10-15 20:25:42 +08:00
#pragma warning(disable : 4251)
#include <memory>
#include <queue>
#include <random>
#include <sstream>
#include <string>
#include <thread>
#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include "../proto/wcf.grpc.pb.h"
2021-02-12 23:21:57 +08:00
2022-08-21 08:45:16 +08:00
#include "accept_new_friend.h"
2022-08-07 23:32:25 +08:00
#include "exec_sql.h"
2022-08-07 20:50:49 +08:00
#include "get_contacts.h"
2022-10-15 20:25:42 +08:00
#include "log.h"
2022-08-13 21:55:08 +08:00
#include "receive_msg.h"
2021-02-12 23:21:57 +08:00
#include "rpc_server.h"
2022-08-07 20:50:49 +08:00
#include "send_msg.h"
2022-08-13 21:55:08 +08:00
#include "spy.h"
2022-08-07 20:50:49 +08:00
#include "spy_types.h"
2022-08-07 23:32:25 +08:00
#include "util.h"
2022-10-15 20:25:42 +08:00
extern int IsLogin(void); // Defined in spy.cpp
extern std::string GetSelfWxid(); // Defined in spy.cpp
2022-08-13 21:55:08 +08:00
2022-10-15 20:25:42 +08:00
using namespace std;
2021-02-12 23:21:57 +08:00
2022-10-15 20:25:42 +08:00
using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerUnaryReactor;
using grpc::ServerWriteReactor;
using grpc::Status;
using wcf::Contacts;
using wcf::DbField;
using wcf::DbNames;
using wcf::DbQuery;
using wcf::DbRow;
using wcf::DbRows;
using wcf::DbTable;
using wcf::DbTables;
using wcf::Empty;
using wcf::ImageMsg;
using wcf::MsgTypes;
using wcf::Response;
using wcf::String;
using wcf::TextMsg;
using wcf::Verification;
using wcf::Wcf;
using wcf::WxMsg;
mutex gMutex;
queue<WxMsg> gMsgQueue;
condition_variable gCv;
bool gIsListening;
class WcfImpl final : public Wcf::CallbackService
2022-08-20 22:10:11 +08:00
{
2022-10-15 20:25:42 +08:00
public:
explicit WcfImpl() { }
2022-08-20 22:10:11 +08:00
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcIsLogin(CallbackServerContext *context, const Empty *empty, Response *rsp) override
{
int ret = IsLogin();
rsp->set_status(ret);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
return reactor;
2022-08-20 22:10:11 +08:00
}
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcGetSelfWxid(CallbackServerContext *context, const Empty *empty, String *rsp) override
{
string wxid = GetSelfWxid();
rsp->set_str(wxid);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
return reactor;
}
2022-08-20 22:10:11 +08:00
2022-10-15 20:25:42 +08:00
ServerWriteReactor<WxMsg> *RpcEnableRecvMsg(CallbackServerContext *context, const Empty *empty) override
{
class Getter : public ServerWriteReactor<WxMsg>
{
public:
Getter()
{
LOG_INFO("Enable message listening.")
ListenMessage(); // gIsListening = true;
NextWrite();
}
void OnDone() override { delete this; }
void OnWriteDone(bool /*ok*/) override { NextWrite(); }
private:
void NextWrite()
{
unique_lock<std::mutex> lock(gMutex);
gCv.wait(lock, [&] { return !gMsgQueue.empty(); });
tmp_ = gMsgQueue.front();
gMsgQueue.pop();
lock.unlock();
if (gIsListening) {
StartWrite(&tmp_);
} else {
LOG_INFO("Disable message listening.")
Finish(Status::OK); // 结束本次通信
}
}
WxMsg tmp_; // 如果将它放到 NextWrite 内部StartWrite 调用时可能已经出了作用域
};
2021-02-12 23:21:57 +08:00
2022-10-15 20:25:42 +08:00
return new Getter();
}
2022-09-25 11:22:24 +08:00
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcDisableRecvMsg(CallbackServerContext *context, const Empty *empty, Response *rsp) override
2021-02-12 23:21:57 +08:00
{
2022-10-15 20:25:42 +08:00
if (gIsListening) {
UnListenMessage(); // gIsListening = false;
// 发送消息,触发 NextWrite 的 Finish
WxMsg wxMsg;
unique_lock<std::mutex> lock(gMutex);
gMsgQueue.push(wxMsg);
lock.unlock();
gCv.notify_all();
2021-02-12 23:21:57 +08:00
}
2022-10-15 20:25:42 +08:00
rsp->set_status(0);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
return reactor;
2021-02-12 23:21:57 +08:00
}
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcSendTextMsg(CallbackServerContext *context, const TextMsg *msg, Response *rsp) override
2021-02-12 23:21:57 +08:00
{
2022-10-15 20:25:42 +08:00
wstring wswxid = String2Wstring(msg->receiver());
wstring wsmsg = String2Wstring(msg->msg());
wstring wsatusers = String2Wstring(msg->aters());
SendTextMessage(wswxid.c_str(), wsmsg.c_str(), wsatusers.c_str());
rsp->set_status(0);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
return reactor;
2021-02-12 23:21:57 +08:00
}
2022-08-20 15:15:04 +08:00
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcSendImageMsg(CallbackServerContext *context, const ImageMsg *msg, Response *rsp) override
{
wstring wswxid = String2Wstring(msg->receiver());
wstring wspath = String2Wstring(msg->path());
SendImageMessage(wswxid.c_str(), wspath.c_str());
rsp->set_status(0);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
return reactor;
}
2021-02-12 23:21:57 +08:00
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcGetMsgTypes(CallbackServerContext *context, const Empty *empty, MsgTypes *rsp) override
{
GetMsgTypes(rsp);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
2021-02-12 23:21:57 +08:00
2022-10-15 20:25:42 +08:00
return reactor;
}
2021-08-22 21:57:16 +08:00
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcGetContacts(CallbackServerContext *context, const Empty *empty, Contacts *rsp) override
{
bool ret = GetContacts(rsp);
auto *reactor = context->DefaultReactor();
if (ret) {
reactor->Finish(Status::OK);
} else {
reactor->Finish(Status::CANCELLED);
}
2022-08-07 20:50:49 +08:00
2022-10-15 20:25:42 +08:00
return reactor;
2022-08-07 20:50:49 +08:00
}
2022-08-07 20:08:54 +08:00
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcGetDbNames(CallbackServerContext *context, const Empty *empty, DbNames *rsp) override
{
GetDbNames(rsp);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
return reactor;
2022-08-07 20:50:49 +08:00
}
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcGetDbTables(CallbackServerContext *context, const String *db, DbTables *rsp) override
{
GetDbTables(db->str(), rsp);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
2022-08-07 20:50:49 +08:00
2022-10-15 20:25:42 +08:00
return reactor;
}
2022-08-07 20:50:49 +08:00
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcExecDbQuery(CallbackServerContext *context, const DbQuery *query, DbRows *rsp) override
{
ExecDbQuery(query->db(), query->sql(), rsp);
auto *reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
2022-08-07 20:50:49 +08:00
2022-10-15 20:25:42 +08:00
return reactor;
2022-08-07 20:50:49 +08:00
}
2022-10-15 20:25:42 +08:00
ServerUnaryReactor *RpcAcceptNewFriend(CallbackServerContext *context, const Verification *v,
Response *rsp) override
{
bool ret = AcceptNewFriend(String2Wstring(v->v3()), String2Wstring(v->v4()));
auto *reactor = context->DefaultReactor();
if (ret) {
rsp->set_status(0);
reactor->Finish(Status::OK);
} else {
LOG_ERROR("AcceptNewFriend failed.")
rsp->set_status(-1); // TODO: Unify error code
reactor->Finish(Status::CANCELLED);
2022-08-07 20:50:49 +08:00
}
2022-10-15 20:25:42 +08:00
return reactor;
2022-08-07 20:50:49 +08:00
}
2022-10-15 20:25:42 +08:00
};
2022-08-07 20:50:49 +08:00
2022-10-15 20:25:42 +08:00
static DWORD lThreadId = 0;
static bool lIsRunning = false;
static ServerBuilder lBuilder;
2022-08-07 20:50:49 +08:00
2022-10-15 20:25:42 +08:00
static unique_ptr<Server> &GetServer()
{
static unique_ptr<Server> server(lBuilder.BuildAndStart());
return server;
2022-08-07 20:50:49 +08:00
}
2021-08-22 21:57:16 +08:00
2022-10-15 20:25:42 +08:00
static int runServer()
2022-08-07 23:32:25 +08:00
{
2022-10-15 20:25:42 +08:00
string server_address("localhost:10086");
WcfImpl service;
2022-08-07 23:32:25 +08:00
2022-10-15 20:25:42 +08:00
lBuilder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
lBuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 2000);
lBuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 3000);
lBuilder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
lBuilder.RegisterService(&service);
2022-08-07 23:32:25 +08:00
2022-10-15 20:25:42 +08:00
unique_ptr<Server> &server = GetServer();
LOG_INFO("Server listening on {}", server_address);
LOG_DEBUG("server: {}", fmt::ptr(server));
lIsRunning = true;
server->Wait();
2022-08-07 23:32:25 +08:00
return 0;
}
2022-10-15 20:25:42 +08:00
int RpcStartServer()
2022-08-07 23:49:37 +08:00
{
2022-10-15 20:25:42 +08:00
HANDLE rpcThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)runServer, NULL, NULL, &lThreadId);
if (rpcThread != 0) {
CloseHandle(rpcThread);
2022-08-07 23:49:37 +08:00
}
return 0;
}
2022-10-15 20:25:42 +08:00
int RpcStopServer()
2021-02-12 23:21:57 +08:00
{
2022-10-15 20:25:42 +08:00
if (lIsRunning) {
UnListenMessage();
unique_ptr<Server> &server = GetServer();
LOG_DEBUG("server: {}", fmt::ptr(server));
server->Shutdown();
LOG_INFO("Server stoped.");
2022-08-20 22:10:11 +08:00
}
2022-08-13 21:55:08 +08:00
2022-08-20 22:10:11 +08:00
return 0;
2021-02-12 23:21:57 +08:00
}