PyWxDump/pywxdump/api/utils.py
2024-08-18 00:03:55 +08:00

271 lines
7.2 KiB
Python

# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: utils.py
# Description:
# Author: xaoyaoo
# Date: 2024/01/16
# -------------------------------------------------------------------------------
import base64
import json
import os
import random
import re
import string
import traceback
from .rjson import ReJson
from functools import wraps
import logging
server_loger = logging.getLogger("server")
rs_loger = server_loger
ls_loger = server_loger
class ConfData(object):
_instances = None
def __new__(cls, *args, **kwargs):
if cls._instances:
return cls._instances
cls._instances = object.__new__(cls)
return cls._instances
def __init__(self):
self._work_path = None
self.conf_file = None
self.auto_setting = None
self.is_init = False
self.conf = {}
self.init()
@property
def cf(self):
if not self.is_init:
self.init()
return self.conf_file
@property
def work_path(self):
if not self.is_init:
self.init()
return self._work_path
@property
def at(self):
if not self.is_init:
self.init()
return self.auto_setting
def init(self):
self.is_init = False
work_path = os.getenv("PYWXDUMP_WORK_PATH")
conf_file = os.getenv("PYWXDUMP_CONF_FILE")
auto_setting = os.getenv("PYWXDUMP_AUTO_SETTING")
if work_path is None or conf_file is None or auto_setting is None:
return False
self._work_path = work_path
self.conf_file = conf_file
self.auto_setting = auto_setting
self.is_init = True
if not os.path.exists(self.conf_file):
self.set_conf(self.auto_setting, "last", "")
self.read_conf()
return True
def read_conf(self):
if not os.path.exists(self.conf_file):
return False
try:
with open(self.conf_file, 'r') as f:
conf = json.load(f)
self.conf = conf
return True
except FileNotFoundError:
logging.error(f"Session file not found: {self.conf_file}")
return False
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return False
def write_conf(self):
if not self.is_init:
self.init()
try:
with open(self.conf_file, 'w') as f:
json.dump(self.conf, f, indent=4, ensure_ascii=False)
return True
except Exception as e:
logging.error(f"Error writing to file: {e}")
return False
def set_conf(self, wxid, arg, value):
if not self.is_init:
self.init()
if wxid not in self.conf:
self.conf[wxid] = {}
if not isinstance(self.conf[wxid], dict):
self.conf[wxid] = {}
self.conf[wxid][arg] = value
self.write_conf()
def get_conf(self, wxid, arg):
if not self.is_init:
self.init()
return self.conf.get(wxid, {}).get(arg, None)
def get_local_wxids(self):
if not self.is_init:
self.init()
return list(self.conf.keys())
def get_db_config(self):
if not self.is_init:
self.init()
my_wxid = self.get_conf(self.at, "last")
return self.get_conf(my_wxid, "db_config")
gc: ConfData = ConfData()
def get_conf_local_wxid(conf_file):
try:
with open(conf_file, 'r') as f:
conf = json.load(f)
except FileNotFoundError:
logging.error(f"Session file not found: {conf_file}")
return None
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return None
return list(conf.keys())
def get_conf(conf_file, wxid, arg):
try:
with open(conf_file, 'r') as f:
conf = json.load(f)
except FileNotFoundError:
logging.error(f"Session file not found: {conf_file}")
return None
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return None
return conf.get(wxid, {}).get(arg, None)
def get_conf_wxids(conf_file):
try:
with open(conf_file, 'r') as f:
conf = json.load(f)
except FileNotFoundError:
logging.error(f"Session file not found: {conf_file}")
return None
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return None
return list(conf.keys())
def set_conf(conf_file, wxid, arg, value):
try:
with open(conf_file, 'r') as f:
conf = json.load(f)
except FileNotFoundError:
conf = {}
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON file: {e}")
return False
if wxid not in conf:
conf[wxid] = {}
if not isinstance(conf[wxid], dict):
conf[wxid] = {}
conf[wxid][arg] = value
try:
with open(conf_file, 'w') as f:
json.dump(conf, f, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"Error writing to file: {e}")
return False
return True
def is_port_in_use(_host, _port):
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((_host, _port))
except socket.error:
return True
return False
def validate_title(title):
"""
校验文件名是否合法
"""
rstr = r"[\/\\\:\*\?\"\<\>\|\.]" # '/ \ : * ? " < > |'
new_title = re.sub(rstr, "_", title) # 替换为下划线
return new_title
def error9999(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
traceback_data = traceback.format_exc()
rdata = f"{traceback_data}"
# logging.error(rdata)
return ReJson(9999, body=f"{str(e)}\n{rdata}", error=str(e))
return wrapper
def asyncError9999(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
traceback_data = traceback.format_exc()
rdata = f"{traceback_data}"
# logging.error(rdata)
return ReJson(9999, body=f"{str(e)}\n{rdata}", error=str(e))
return wrapper
def gen_base64(path):
# 获取文件名后缀
extension = os.path.splitext(path)[1]
if extension == '.js':
start_str = 'data:text/javascript;base64,'
elif extension == '.css':
start_str = 'data:text/css;base64,'
elif extension == '.html':
start_str = 'data:text/html;base64,'
elif extension == '.json':
start_str = 'data:application/json;base64,'
else:
start_str = 'data:text/plain;base64,'
with open(path, 'rb') as file:
js_code = file.read()
base64_encoded_js = base64.b64encode(js_code).decode('utf-8')
return start_str + base64_encoded_js
def random_str(num=16):
return ''.join(random.sample(string.ascii_letters + string.digits, num))