Fix typing

This commit is contained in:
Changhua 2023-09-21 22:46:26 +08:00
parent 5a0b774c22
commit c201bbf85a

View File

@ -13,7 +13,7 @@ import sys
from queue import Queue
from threading import Thread
from time import sleep
from typing import Callable, List, Optional
from typing import Callable, Dict, List, Optional
import pynng
import requests
@ -161,7 +161,7 @@ class Wcf():
return rsp.str
def get_msg_types(self) -> dict:
def get_msg_types(self) -> Dict:
"""获取所有消息类型"""
req = wcf_pb2.Request()
req.func = wcf_pb2.FUNC_GET_MSG_TYPES # FUNC_GET_MSG_TYPES
@ -171,7 +171,7 @@ class Wcf():
return dict(sorted(dict(types).items()))
def get_contacts(self) -> List[dict]:
def get_contacts(self) -> List[Dict]:
"""获取完整通讯录"""
req = wcf_pb2.Request()
req.func = wcf_pb2.FUNC_GET_CONTACTS # FUNC_GET_CONTACTS
@ -208,14 +208,14 @@ class Wcf():
return dbs
def get_tables(self, db: str) -> List[dict]:
def get_tables(self, db: str) -> List[Dict]:
"""获取 db 中所有表
Args:
db (str): 数据库名可通过 `get_dbs` 查询
Returns:
List[dict]: `db` 下的所有表名及对应建表语句
List[Dict]: `db` 下的所有表名及对应建表语句
"""
req = wcf_pb2.Request()
req.func = wcf_pb2.FUNC_GET_DB_TABLES # FUNC_GET_DB_TABLES
@ -225,7 +225,7 @@ class Wcf():
return tables
def get_user_info(self) -> dict:
def get_user_info(self) -> Dict:
"""获取登录账号个人信息"""
req = wcf_pb2.Request()
req.func = wcf_pb2.FUNC_GET_USER_INFO # FUNC_GET_USER_INFO
@ -482,7 +482,7 @@ class Wcf():
return rsp.status
def query_sql(self, db: str, sql: str) -> List[dict]:
def query_sql(self, db: str, sql: str) -> List[Dict]:
"""执行 SQL如果数据量大注意分页以免 OOM
Args:
@ -490,7 +490,7 @@ class Wcf():
sql (str): 要执行的 SQL
Returns:
List[dict]: 查询结果
List[Dict]: 查询结果
"""
result = []
req = wcf_pb2.Request()
@ -526,7 +526,7 @@ class Wcf():
rsp = self._send_request(req)
return rsp.status
def get_friends(self) -> List[dict]:
def get_friends(self) -> List[Dict]:
"""获取好友列表"""
not_friends = {
"fmessage": "朋友推荐消息",
@ -538,9 +538,9 @@ class Wcf():
friends = []
for cnt in self.get_contacts():
if (cnt["wxid"].endswith("@chatroom") or # 群聊
cnt["wxid"].startswith("gh_") or # 公众号
cnt["wxid"] in not_friends.keys() # 其他杂号
):
cnt["wxid"].startswith("gh_") or # 公众号
cnt["wxid"] in not_friends.keys() # 其他杂号
):
continue
friends.append(cnt)
@ -631,33 +631,32 @@ class Wcf():
rsp = self._send_request(req)
return rsp.status
def get_chatroom_members(self, roomid: str) -> List[dict]:
def get_chatroom_members(self, roomid: str) -> Dict:
"""获取群成员
Args:
roomid (str): 群的 id
Returns:
List[dict]: 群成员列表: [{wxid: 昵称}, ...]
Dict: 群成员列表: {wxid1: 昵称1, wxid2: 昵称2, ...}
"""
pass
members = {}
contacts = self.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;")
contacts = {contact["UserName"]: contact["NickName"]for contact in contacts}
crs = self.query_sql("MicroMsg.db", f"SELECT RoomData FROM ChatRoom WHERE ChatRoomName = '{roomid}'")
crs = self.query_sql("MicroMsg.db", f"SELECT RoomData FROM ChatRoom WHERE ChatRoomName = '{roomid}';")
if not crs:
return []
return members
bs = crs[0].get("RoomData")
if not bs:
return []
return members
crd = RoomData()
crd.ParseFromString(bs)
if not bs:
return []
return members
names = {}
for member in crd.members:
names[member.wxid] = member.name if member.name else contacts.get(member.wxid, "")
members[member.wxid] = member.name if member.name else contacts.get(member.wxid, "")
return names
return members