WeChatFerry/http/wcfhttp/core.py
2023-05-06 20:53:26 +08:00

70 lines
2.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
from typing import Any
import requests
from fastapi import FastAPI
from pydantic import BaseModel
from wcferry import Wcf, WxMsg
__version__ = "3.7.0.30.25"
class Msg(BaseModel):
id: str
type: int
xml: str
sender: str
roomid: str
content: str
thumb: str
extra: str
is_self: bool
is_group: bool
class Http(FastAPI):
"""WeChatFerry HTTP 客户端文档地址http://IP:PORT/docs"""
def __init__(self, wcf: Wcf, cb: str, **extra: Any) -> None:
super().__init__(**extra)
self.LOG = logging.getLogger(__name__)
self.wcf = wcf
self._set_cb(cb)
self.add_api_route("/msg_cb", self.msg_cb, methods=["POST"], summary="接收消息回调样例")
def _set_cb(self, cb):
def callback(msg: WxMsg):
data = {}
data["id"] = msg.id
data["type"] = msg.type
data["xml"] = msg.xml
data["sender"] = msg.sender
data["roomid"] = msg.roomid
data["content"] = msg.content
data["thumb"] = msg.thumb
data["extra"] = msg.extra
data["is_self"] = msg.from_self()
data["is_group"] = msg.from_group()
try:
rsp = requests.post(url=cb, json=data)
if rsp.status_code != 200:
self.LOG.error(f"消息转发失败HTTP 状态码为: {rsp.status_code}")
except Exception as e:
self.LOG.error(f"消息转发异常: {e}")
if cb:
self.LOG.info(f"消息回调: {cb}")
self.wcf.enable_recv_msg(callback=callback)
else:
self.LOG.info(f"没有设置回调,打印消息")
self.wcf.enable_recv_msg(print)
def msg_cb(self, msg: Msg):
"""示例回调方法,简单打印消息"""
print(f"收到消息:{msg}")
return {"status": 0, "message": "成功"}