From 27a4a44c9f6a1fd3ba19ec378c143cd6e002cbf5 Mon Sep 17 00:00:00 2001 From: Changhua Date: Sat, 25 Feb 2023 21:58:00 +0800 Subject: [PATCH] Update python client --- python/MANIFEST.in | 2 +- python/README.MD | 51 ++++--- python/demo.py | 38 +++--- python/setup.py | 2 +- python/wcferry/client.py | 281 +++++++++++++++++++++++++-------------- 5 files changed, 222 insertions(+), 152 deletions(-) diff --git a/python/MANIFEST.in b/python/MANIFEST.in index f98b04c..e228aeb 100644 --- a/python/MANIFEST.in +++ b/python/MANIFEST.in @@ -1,2 +1,2 @@ include wcferry/*.dll -exclude demo.py +include wcferry/*.exe diff --git a/python/README.MD b/python/README.MD index ef6ac65..d73ca91 100644 --- a/python/README.MD +++ b/python/README.MD @@ -1,6 +1,5 @@ # WeChatFerry Python 客户端 ⚠️ **只支持 Windows** ⚠️ -⚠️ **只支持 32 位 Python** ⚠️ ## 快速开始 ```sh @@ -12,48 +11,44 @@ pip install wcferry #! /usr/bin/env python3 # -*- coding: utf-8 -*- -import signal -from time import sleep +import logging from wcferry import Wcf def main(): - wcf = Wcf() + LOG = logging.getLogger("Demo") + LOG.info("Start demo...") + wcf = Wcf(debug=True) # 默认连接本地服务 + # wcf = Wcf("tcp://127.0.0.1:10086") # 连接远端服务 - def handler(sig, frame): - wcf.cleanup() # 退出前清理环境 - exit(0) + LOG.info(f"Is Login: {True if wcf.is_login() else False}") + LOG.info(f"SelfWxid: {wcf.get_self_wxid()}") - signal.signal(signal.SIGINT, handler) - sleep(1) # Slow down - print(f"Is Login: {True if wcf.is_login() else False}") - print(f"SelfWxid: {wcf.get_self_wxid()}") - - sleep(1) - wcf.enable_recv_msg(print) - # wcf.disable_recv_msg() # 当需要停止接收消息的时候,随时调用 + wcf.enable_recv_msg(LOG.info) + # wcf.disable_recv_msg() # Call anytime when you don't want to receive messages ret = wcf.send_text("Hello world.", "filehelper") - print(f"send_text: {ret}") + LOG.info(f"send_text: {ret}") ret = wcf.send_image("TEQuant.jpeg", "filehelper") - print(f"send_image: {ret}") + LOG.info(f"send_image: {ret}") - print(f"Message types:\n{wcf.get_msg_types()}") - print(f"Contacts:\n{wcf.get_contacts()}") + LOG.info(f"Message types:\n{wcf.get_msg_types()}") + LOG.info(f"Contacts:\n{wcf.get_contacts()}") - print(f"DBs:\n{wcf.get_dbs()}") - print(f"Tables:\n{wcf.get_tables('db')}") - print(f"Results:\n{wcf.query_sql('MicroMsg.db', 'SELECT * FROM Contact LIMIT 1;')}") + LOG.info(f"DBs:\n{wcf.get_dbs()}") + LOG.info(f"Tables:\n{wcf.get_tables('db')}") + LOG.info(f"Results:\n{wcf.query_sql('MicroMsg.db', 'SELECT * FROM Contact LIMIT 1;')}") - # wcf.accept_new_friend("v3", "v4") # 需要真正的 V3、V4 信息 + # wcf.accept_new_friend("v3", "v4") # 需要真正的 V3、V4 信息 - # 阻塞程序,让程序一直运行 + # Keep running to receive messages wcf.keep_running() if __name__ == "__main__": + logging.basicConfig(level='DEBUG', format="%(asctime)s %(message)s") main() ``` @@ -68,13 +63,13 @@ source .env/Scripts/activate # 升级 pip pip install --upgrade pip # 安装依赖包 -pip install grpcio grpcio-tools +pip install grpcio-tools pynng ``` -### 重新生成 gRPC 文件 +### 重新生成 PB 文件 ```sh -cd wcf -python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I=../ wcf.proto +cd python +python -m grpc_tools.protoc --python_out=. --proto_path=..\rpc\proto\ wcf.proto ``` ### 参考项目 [README](../README.MD) diff --git a/python/demo.py b/python/demo.py index d4a7887..15b14e6 100644 --- a/python/demo.py +++ b/python/demo.py @@ -2,42 +2,34 @@ # -*- coding: utf-8 -*- import logging -import signal -from time import sleep from wcferry import Wcf def main(): - logging.info("Start demo...") - wcf = Wcf() # 默认连接本地服务 - # wcf = Wcf("IP:10086") # 连接远端服务 + LOG = logging.getLogger("Demo") + LOG.info("Start demo...") + wcf = Wcf(debug=True) # 默认连接本地服务 + # wcf = Wcf("tcp://127.0.0.1:10086") # 连接远端服务 - def handler(sig, frame): - wcf.cleanup() - exit(0) + LOG.info(f"Is Login: {True if wcf.is_login() else False}") + LOG.info(f"SelfWxid: {wcf.get_self_wxid()}") - signal.signal(signal.SIGINT, handler) - sleep(1) # Slow down - print(f"Is Login: {True if wcf.is_login() else False}") - print(f"SelfWxid: {wcf.get_self_wxid()}") - - sleep(1) - wcf.enable_recv_msg(print) + wcf.enable_recv_msg(LOG.info) # wcf.disable_recv_msg() # Call anytime when you don't want to receive messages ret = wcf.send_text("Hello world.", "filehelper") - print(f"send_text: {ret}") + LOG.info(f"send_text: {ret}") ret = wcf.send_image("TEQuant.jpeg", "filehelper") - print(f"send_image: {ret}") + LOG.info(f"send_image: {ret}") - print(f"Message types:\n{wcf.get_msg_types()}") - print(f"Contacts:\n{wcf.get_contacts()}") + LOG.info(f"Message types:\n{wcf.get_msg_types()}") + LOG.info(f"Contacts:\n{wcf.get_contacts()}") - print(f"DBs:\n{wcf.get_dbs()}") - print(f"Tables:\n{wcf.get_tables('db')}") - print(f"Results:\n{wcf.query_sql('MicroMsg.db', 'SELECT * FROM Contact LIMIT 1;')}") + LOG.info(f"DBs:\n{wcf.get_dbs()}") + LOG.info(f"Tables:\n{wcf.get_tables('db')}") + LOG.info(f"Results:\n{wcf.query_sql('MicroMsg.db', 'SELECT * FROM Contact LIMIT 1;')}") # wcf.accept_new_friend("v3", "v4") # 需要真正的 V3、V4 信息 @@ -46,5 +38,5 @@ def main(): if __name__ == "__main__": - logging.basicConfig(level='DEBUG') + logging.basicConfig(level='DEBUG', format="%(asctime)s %(message)s") main() diff --git a/python/setup.py b/python/setup.py index 620d4c3..3de057e 100644 --- a/python/setup.py +++ b/python/setup.py @@ -26,8 +26,8 @@ setup( include_package_data=True, install_requires=[ "setuptools", - "grpcio", "grpcio-tools", + "pynng" ], classifiers=[ "Environment :: Win32 (MS Windows)", diff --git a/python/wcferry/client.py b/python/wcferry/client.py index 7715d3a..f79ec94 100644 --- a/python/wcferry/client.py +++ b/python/wcferry/client.py @@ -2,23 +2,23 @@ # -*- coding: utf-8 -*- import atexit -import ctypes +import base64 import logging import os import re import sys from threading import Thread from time import sleep -from typing import List, Callable, Optional +from typing import Callable, List, Optional -import grpc +import pynng +from google.protobuf import json_format WCF_ROOT = os.path.abspath(os.path.dirname(__file__)) sys.path.insert(0, WCF_ROOT) -import wcf_pb2 # noqa -import wcf_pb2_grpc # noqa +import wcf_pb2 # noqa -__version__ = "3.7.0.30.12" +__version__ = "3.7.0.30.13" class Wcf(): @@ -58,21 +58,34 @@ class Wcf(): """是否文本消息""" return self.type == 1 - def __init__(self, host_port: str = None) -> None: + def __init__(self, host_port: str = None, debug: bool = False) -> None: self._local_host = False self._is_running = False - self._enable_recv_msg = False + self._is_receiving_msg = False self.LOG = logging.getLogger("WCF") if host_port is None: self._local_host = True - host_port = "127.0.0.1:10086" - self._sdk = ctypes.cdll.LoadLibrary(f"{WCF_ROOT}/sdk.dll") - if self._sdk.WxInitSDK() != 0: + host_port = "tcp://127.0.0.1:10086" + cmd = f"{WCF_ROOT}/wcf.exe start {'debug' if debug else ''}" + if os.system(cmd) != 0: self.LOG.error("初始化失败!") + return + + # 连接 RPC + self.cmd_socket = pynng.Pair1() # Client --> Server,发送消息 + self.cmd_socket.send_timeout = 2000 # 发送 2 秒超时 + self.cmd_socket.recv_timeout = 2000 # 接收 2 秒超时 + self.cmd_socket.dial(host_port, block=False) + + self.msg_socket = pynng.Pair1() # Server --> Client,接收消息 + self.msg_socket.send_timeout = 2000 # 发送 2 秒超时 + self.msg_socket.recv_timeout = 2000 # 接收 2 秒超时 + self.msg_url = host_port.replace("10086", "10087") + + atexit.register(self.cleanup) # 退出的时候停止消息接收,防止内存泄露 + while not self.is_login(): # 等待微信登录成功 + sleep(1) - self._channel = grpc.insecure_channel(host_port) - self._stub = wcf_pb2_grpc.WcfStub(self._channel) - atexit.register(self.disable_recv_msg) # 退出的时候停止消息接收,防止内存泄露 self._is_running = True self.contacts = [] self._SQL_TYPES = {1: int, 2: float, 3: lambda x: x.decode("utf-8"), 4: bytes, 5: lambda x: None} @@ -82,17 +95,18 @@ class Wcf(): self.cleanup() def cleanup(self) -> None: - """停止 gRPC,关闭连接,回收资源""" + """关闭连接,回收资源""" if not self._is_running: return self.disable_recv_msg() - self._channel.close() + self.cmd_socket.close() + if self._local_host: - self._sdk.WxDestroySDK() - handle = self._sdk._handle - del self._sdk - ctypes.windll.kernel32.FreeLibrary(handle) + cmd = f"{WCF_ROOT}/wcf.exe stop" + if os.system(cmd) != 0: + self.LOG.error("退出失败!") + return self._is_running = False def keep_running(self): @@ -103,109 +117,184 @@ class Wcf(): except Exception as e: self.cleanup() - def is_login(self) -> int: + def _send_request(self, req: wcf_pb2.Request) -> wcf_pb2.Response: + data = req.SerializeToString() + self.cmd_socket.send(data) + rsp = wcf_pb2.Response() + rsp.ParseFromString(self.cmd_socket.recv_msg().bytes) + return rsp + + def is_login(self) -> bool: """是否已经登录""" - rsp = self._stub.RpcIsLogin(wcf_pb2.Empty()) - return rsp.status + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_IS_LOGIN # FUNC_IS_LOGIN + rsp = self._send_request(req) + + return rsp.status == 1 def get_self_wxid(self) -> str: """获取登录账户的 wxid""" - rsp = self._stub.RpcGetSelfWxid(wcf_pb2.Empty()) + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_GET_SELF_WXID # FUNC_GET_SELF_WXID + rsp = self._send_request(req) + return rsp.str - def _rpc_get_message(self, func): - rsps = self._stub.RpcEnableRecvMsg(wcf_pb2.Empty()) - try: - for rsp in rsps: - func(self.WxMsg(rsp)) - except Exception as e: - self.LOG.error(f"RpcEnableRecvMsg: {e}") - finally: - self.disable_recv_msg() + def get_msg_types(self) -> dict: + """获取所有消息类型""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_GET_MSG_TYPES # FUNC_GET_MSG_TYPES + rsp = self._send_request(req) + types = json_format.MessageToDict(rsp.types)["types"] + types = {int(k): v for k, v in types.items()} + + return dict(sorted(dict(types).items())) + + def get_contacts(self) -> List[dict]: + """获取完整通讯录""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_GET_CONTACTS # FUNC_GET_CONTACTS + rsp = self._send_request(req) + contacts = json_format.MessageToDict(rsp.contacts)["contacts"] + + for cnt in contacts: + gender = cnt.get("gender", "") + if gender == 1: + gender = "男" + elif gender == 2: + gender = "女" + contact = { + "wxid": cnt.get("wxid", ""), + "code": cnt.get("code", ""), + "name": cnt.get("name", ""), + "country": cnt.get("country", ""), + "province": cnt.get("province", ""), + "city": cnt.get("city", ""), + "gender": gender} + self.contacts.append(contact) + return self.contacts + + def get_dbs(self) -> List[str]: + """获取所有数据库""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_GET_DB_NAMES # FUNC_GET_DB_NAMES + rsp = self._send_request(req) + dbs = json_format.MessageToDict(rsp.dbs)["names"] + + return dbs + + def get_tables(self, db: str) -> List[dict]: + """获取 db 中所有表""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_GET_DB_TABLES # FUNC_GET_DB_TABLES + req.str = db + rsp = self._send_request(req) + tables = json_format.MessageToDict(rsp.tables).get("tables", []) + + return tables + + def send_text(self, msg: str, receiver: str, aters: Optional[str] = "") -> int: + """发送文本消息""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_SEND_TXT # FUNC_SEND_TXT + req.txt.msg = msg + req.txt.receiver = receiver + if aters: + req.txt.aters = aters + rsp = self._send_request(req) + return rsp.status + + def send_image(self, path: str, receiver: str) -> int: + """发送图片""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_SEND_IMG # FUNC_SEND_IMG + req.file.path = path + req.file.receiver = receiver + rsp = self._send_request(req) + return rsp.status + + def send_file(self, path: str, receiver: str) -> int: + """发送文件""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_SEND_FILE # FUNC_SEND_FILE + req.file.path = path + req.file.receiver = receiver + rsp = self._send_request(req) + return rsp.status def enable_recv_msg(self, callback: Callable[[WxMsg], None] = None) -> bool: """设置接收消息回调""" - if self._enable_recv_msg: + # TODO: 加队列,消息推送有超时,需要先缓存下来 + def listening_msg(): + rsp = wcf_pb2.Response() + self.msg_socket.dial(self.msg_url, block=True) + while self._is_receiving_msg: + try: + rsp.ParseFromString(self.msg_socket.recv_msg().bytes) + except Exception as e: + pass + else: + callback(self.WxMsg(rsp.wxmsg)) + # 退出前关闭通信通道 + self.msg_socket.close() + + if self._is_receiving_msg: return True if callback is None: return False - self._enable_recv_msg = True + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_ENABLE_RECV_TXT # FUNC_ENABLE_RECV_TXT + rsp = self._send_request(req) + if rsp.status != 0: + return False + + self._is_receiving_msg = True # 阻塞,把控制权交给用户 # self._rpc_get_message(callback) # 不阻塞,启动一个新的线程来接收消息 - Thread(target=self._rpc_get_message, name="GetMessage", args=(callback,), daemon=True).start() + Thread(target=listening_msg, name="GetMessage", daemon=True).start() return True def disable_recv_msg(self) -> int: """停止接收消息""" - if not self._enable_recv_msg: - return -1 + if not self._is_receiving_msg: + return 0 - rsp = self._stub.RpcDisableRecvMsg(wcf_pb2.Empty()) - if rsp.status == 0: - self._enable_recv_msg = False + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_DISABLE_RECV_TXT # FUNC_DISABLE_RECV_TXT + rsp = self._send_request(req) + self._is_receiving_msg = False return rsp.status - def send_text(self, msg: str, receiver: str, aters: Optional[str] = "") -> int: - """发送文本消息""" - rsp = self._stub.RpcSendTextMsg(wcf_pb2.TextMsg(msg=msg, receiver=receiver, aters=aters)) - return rsp.status - - def send_image(self, path: str, receiver: str) -> int: - """发送图片""" - rsp = self._stub.RpcSendImageMsg(wcf_pb2.ImageMsg(path=path, receiver=receiver)) - return rsp.status - - def get_msg_types(self) -> dict: - """获取所有消息类型""" - rsp = self._stub.RpcGetMsgTypes(wcf_pb2.Empty()) - return dict(sorted(dict(rsp.types).items())) - - def get_contacts(self) -> List[dict]: - """获取完整通讯录""" - rsp = self._stub.RpcGetContacts(wcf_pb2.Empty()) - for cnt in rsp.contacts: - gender = "" - if cnt.gender == 1: - gender = "男" - elif cnt.gender == 2: - gender = "女" - self.contacts.append({"wxid": cnt.wxid, "code": cnt.code, "name": cnt.name, - "country": cnt.country, "province": cnt.province, "city": cnt.city, "gender": gender}) - return self.contacts - - def get_dbs(self) -> List[str]: - """获取所有数据库""" - rsp = self._stub.RpcGetDbNames(wcf_pb2.Empty()) - return rsp.names - - def get_tables(self, db: str) -> List[dict]: - """获取 db 中所有表""" - tables = [] - rsp = self._stub.RpcGetDbTables(wcf_pb2.String(str=db)) - for tbl in rsp.tables: - tables.append({"name": tbl.name, "sql": tbl.sql}) - return tables - def query_sql(self, db: str, sql: str) -> List[dict]: """执行 SQL""" result = [] - rsp = self._stub.RpcExecDbQuery(wcf_pb2.DbQuery(db=db, sql=sql)) - for r in rsp.rows: + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_EXEC_DB_QUERY # FUNC_EXEC_DB_QUERY + req.query.db = db + req.query.sql = sql + rsp = self._send_request(req) + rows = json_format.MessageToDict(rsp.rows)["rows"] + for r in rows: row = {} - for f in r.fields: - row[f.column] = self._SQL_TYPES[f.type](f.content) + for f in r["fields"]: + c = base64.b64decode(f.get("content", "")) + row[f["column"]] = self._SQL_TYPES[f["type"]](c) result.append(row) return result def accept_new_friend(self, v3: str, v4: str) -> int: - """通过好友验证""" - rsp = self._stub.RpcAcceptNewFriend(wcf_pb2.Verification(v3=v3, v4=v4)) + """发送文件""" + req = wcf_pb2.Request() + req.func = wcf_pb2.FUNC_ACCEPT_FRIEND # FUNC_ACCEPT_FRIEND + req.v.v3 = v3 + req.v.v4 = v4 + rsp = self._send_request(req) return rsp.status def get_friends(self) -> List[dict]: @@ -218,18 +307,12 @@ class Wcf(): "newsapp": "新闻", } friends = [] - rsp = self._stub.RpcGetContacts(wcf_pb2.Empty()) - for cnt in rsp.contacts: + for cnt in self.get_contacts(): if (cnt.wxid.endswith("@chatroom") # 群聊 - or cnt.wxid.startswith("gh_") # 公众号 - or cnt.wxid in not_friends.keys() # 其他杂号 + or cnt.wxid.startswith("gh_") # 公众号 + or cnt.wxid in not_friends.keys() # 其他杂号 ): continue - gender = "" - if cnt.gender == 1: - gender = "男" - elif cnt.gender == 2: - gender = "女" - friends.append({"wxid": cnt.wxid, "code": cnt.code, "name": cnt.name, - "country": cnt.country, "province": cnt.province, "city": cnt.city, "gender": gender}) + friends.append(cnt) + return friends