加快wxinfo获取,速度提升10%

This commit is contained in:
xaoyaoo 2024-07-21 09:04:37 +08:00
parent d08ae98b26
commit ab587e3bc7

View File

@ -10,10 +10,10 @@ import json
import os import os
import re import re
import winreg import winreg
import psutil
import pymem
from typing import List, Union from typing import List, Union
from .utils import pattern_scan_all, verify_key, get_exe_version, get_exe_bit, info_error from .utils import pattern_scan_all, verify_key, get_exe_version, get_exe_bit, info_error
from .ctypes_utils import get_process_list, get_info_with_key, get_memory_maps, get_process_exe_path, \
get_file_version_info
from .memory_search import search_memory from .memory_search import search_memory
import ctypes.wintypes as wintypes import ctypes.wintypes as wintypes
@ -174,6 +174,7 @@ def get_key(pid, db_path, addr_len):
:param addr_len: 地址长度 :param addr_len: 地址长度
:return: 返回key :return: 返回key
""" """
def read_key_bytes(h_process, address, address_len=8): def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len) array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None" if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
@ -189,30 +190,23 @@ def get_key(pid, db_path, addr_len):
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db") MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
# start_adress = 0 start_adress = 0
# end_adress = 0x7FFFFFFFFFFFFFFF end_adress = 0x7FFFFFFFFFFFFFFF
#
# memory_maps = get_memory_maps(pid)
# for module in memory_maps:
# if module.FileName and 'WeChatWin.dll' in module.FileName:
# start_adress = module.BaseAddress
# end_adress = module.BaseAddress + module.RegionSize
# # print(start_adress, end_adress)
# hProcess = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
# type1_addrs = search_memory(hProcess, phone_type1.encode(), start_adress, end_adress)
# type2_addrs = search_memory(hProcess, phone_type2.encode(), start_adress, end_adress)
# type3_addrs = search_memory(hProcess, phone_type3.encode(), start_adress, end_adress)
#
# print(type1_addrs, type2_addrs, type3_addrs)
memory_maps = get_memory_maps(pid)
pm = pymem.Pymem(pid) for module in memory_maps:
module_name = "WeChatWin.dll" if module.FileName and 'WeChatWin.dll' in module.FileName:
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True) start_adress = module.BaseAddress
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True) end_adress = module.BaseAddress + module.RegionSize
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True) break
# print(start_adress, end_adress)
# print(type1_addrs, type2_addrs, type3_addrs) hProcess = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
type1_addrs = search_memory(hProcess, phone_type1.encode(), max_num=2, start_address=start_adress,
end_address=end_adress)
type2_addrs = search_memory(hProcess, phone_type2.encode(), max_num=2, start_address=start_adress,
end_address=end_adress)
type3_addrs = search_memory(hProcess, phone_type3.encode(), max_num=2, start_address=start_adress,
end_address=end_adress)
type_addrs = [] type_addrs = []
if len(type1_addrs) >= 2: type_addrs += type1_addrs if len(type1_addrs) >= 2: type_addrs += type1_addrs
@ -224,7 +218,7 @@ def get_key(pid, db_path, addr_len):
for i in type_addrs[::-1]: for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len): for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(pm.process_handle, j, addr_len) key_bytes = read_key_bytes(hProcess, j, addr_len)
if key_bytes == "None": if key_bytes == "None":
continue continue
if verify_key(key_bytes, MicroMsg_path): if verify_key(key_bytes, MicroMsg_path):
@ -232,10 +226,6 @@ def get_key(pid, db_path, addr_len):
return "None" return "None"
from .ctypes_utils import get_process_list, get_info_with_key, get_memory_maps, get_process_exe_path, \
get_file_version_info
def get_details(pid, version_list: dict = None, is_logging: bool = False): def get_details(pid, version_list: dict = None, is_logging: bool = False):
path = get_process_exe_path(pid) path = get_process_exe_path(pid)
rd = {'pid': pid, 'version': get_file_version_info(path), rd = {'pid': pid, 'version': get_file_version_info(path),
@ -273,7 +263,7 @@ def get_details(pid, version_list: dict = None, is_logging: bool = False):
rd['mobile'] = get_info_string(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None" rd['mobile'] = get_info_string(Handle, mobile_baseaddr, 64) if bias_list[2] != 0 else "None"
rd['name'] = get_info_name(Handle, name_baseaddr, addrLen, 64) if bias_list[0] != 0 else "None" rd['name'] = get_info_name(Handle, name_baseaddr, addrLen, 64) if bias_list[0] != 0 else "None"
rd['mail'] = get_info_string(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None" rd['mail'] = get_info_string(Handle, mail_baseaddr, 64) if bias_list[3] != 0 else "None"
# rd['key'] = get_info_with_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None" rd['key'] = get_info_with_key(Handle, key_baseaddr, addrLen) if bias_list[4] != 0 else "None"
rd['wxid'] = get_info_wxid(Handle) rd['wxid'] = get_info_wxid(Handle)