Add queue to avoid losing message

This commit is contained in:
Changhua 2023-02-27 23:36:17 +08:00
parent cb536e1943
commit 3f377a14bb
2 changed files with 70 additions and 8 deletions

View File

@ -2,21 +2,41 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import logging import logging
from threading import Thread
from wcferry import Wcf from wcferry import Wcf
LOG = logging.getLogger("Demo")
def process_msg(wcf: Wcf):
"""处理接收到的消息"""
while wcf.is_receiving_msg():
try:
msg = wcf.get_msg()
except Exception as e:
continue
LOG.info(msg) # 简单打印
def main(): def main():
LOG = logging.getLogger("Demo")
LOG.info("Start demo...") LOG.info("Start demo...")
wcf = Wcf(debug=True) # 默认连接本地服务 wcf = Wcf(debug=True) # 默认连接本地服务
# wcf = Wcf("tcp://127.0.0.1:10086") # 连接远端服务 # wcf = Wcf("tcp://127.0.0.1:10086") # 连接远端服务
LOG.info(f"Is Login: {True if wcf.is_login() else False}") LOG.info(f"已经登录: {True if wcf.is_login() else False}")
LOG.info(f"SelfWxid: {wcf.get_self_wxid()}") LOG.info(f"wxid: {wcf.get_self_wxid()}")
wcf.enable_recv_msg(LOG.info) # 允许接收消息
# wcf.disable_recv_msg() # Call anytime when you don't want to receive messages # wcf.enable_recv_msg(LOG.info) # deprecated
# 允许接收消息
wcf.enable_receiving_msg()
Thread(target=process_msg, name="GetMessage", args=(wcf,), daemon=True).start()
# wcf.disable_recv_msg() # 当需要停止接收消息时调用
ret = wcf.send_text("Hello world.", "filehelper") ret = wcf.send_text("Hello world.", "filehelper")
LOG.info(f"send_text: {ret}") LOG.info(f"send_text: {ret}")
@ -33,7 +53,7 @@ def main():
# wcf.accept_new_friend("v3", "v4") # 需要真正的 V3、V4 信息 # wcf.accept_new_friend("v3", "v4") # 需要真正的 V3、V4 信息
# Keep running to receive messages # 一直运行
wcf.keep_running() wcf.keep_running()

View File

@ -7,6 +7,7 @@ import logging
import os import os
import re import re
import sys import sys
from queue import Queue
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from typing import Callable, List, Optional from typing import Callable, List, Optional
@ -88,6 +89,7 @@ class Wcf():
self._is_running = True self._is_running = True
self.contacts = [] self.contacts = []
self.msgQ = Queue()
self._SQL_TYPES = {1: int, 2: float, 3: lambda x: x.decode("utf-8"), 4: bytes, 5: lambda x: None} self._SQL_TYPES = {1: int, 2: float, 3: lambda x: x.decode("utf-8"), 4: bytes, 5: lambda x: None}
self.self_wxid = self.get_self_wxid() self.self_wxid = self.get_self_wxid()
@ -124,6 +126,9 @@ class Wcf():
rsp.ParseFromString(self.cmd_socket.recv_msg().bytes) rsp.ParseFromString(self.cmd_socket.recv_msg().bytes)
return rsp return rsp
def is_receiving_msg(self) -> bool:
return self._is_receiving_msg
def is_login(self) -> bool: def is_login(self) -> bool:
"""是否已经登录""" """是否已经登录"""
req = wcf_pb2.Request() req = wcf_pb2.Request()
@ -222,9 +227,45 @@ class Wcf():
rsp = self._send_request(req) rsp = self._send_request(req)
return rsp.status return rsp.status
def get_msg(self, block=True) -> WxMsg:
return self.msgQ.get(block, timeout=1)
def enable_receiving_msg(self) -> bool:
"""允许接收消息"""
def listening_msg():
rsp = wcf_pb2.Response()
self.msg_socket.dial(self.msg_url, block=True)
while self._is_receiving_msg:
try:
rsp.ParseFromString(self.msg_socket.recv_msg().bytes)
except Exception as e:
pass
else:
self.msgQ.put(self.WxMsg(rsp.wxmsg))
# 退出前关闭通信通道
self.msg_socket.close()
if self._is_receiving_msg:
return True
req = wcf_pb2.Request()
req.func = wcf_pb2.FUNC_ENABLE_RECV_TXT # FUNC_ENABLE_RECV_TXT
rsp = self._send_request(req)
if rsp.status != 0:
return False
self._is_receiving_msg = True
# 阻塞,把控制权交给用户
# self._rpc_get_message(callback)
# 不阻塞,启动一个新的线程来接收消息
Thread(target=listening_msg, name="GetMessage", daemon=True).start()
return True
def enable_recv_msg(self, callback: Callable[[WxMsg], None] = None) -> bool: def enable_recv_msg(self, callback: Callable[[WxMsg], None] = None) -> bool:
"""设置接收消息回调""" """设置接收消息回调"""
# TODO: 加队列,消息推送有超时,需要先缓存下来
def listening_msg(): def listening_msg():
rsp = wcf_pb2.Response() rsp = wcf_pb2.Response()
self.msg_socket.dial(self.msg_url, block=True) self.msg_socket.dial(self.msg_url, block=True)
@ -238,6 +279,7 @@ class Wcf():
# 退出前关闭通信通道 # 退出前关闭通信通道
self.msg_socket.close() self.msg_socket.close()
self.LOG.warning("将会移除,请用 enable_receiving_msg 和 get_msg")
if self._is_receiving_msg: if self._is_receiving_msg:
return True return True
@ -252,7 +294,7 @@ class Wcf():
self._is_receiving_msg = True self._is_receiving_msg = True
# 阻塞,把控制权交给用户 # 阻塞,把控制权交给用户
# self._rpc_get_message(callback) # listening_msg()
# 不阻塞,启动一个新的线程来接收消息 # 不阻塞,启动一个新的线程来接收消息
Thread(target=listening_msg, name="GetMessage", daemon=True).start() Thread(target=listening_msg, name="GetMessage", daemon=True).start()