|
|
@@ -0,0 +1,162 @@
|
|
|
+import ctypes
|
|
|
+from ctypes import wintypes
|
|
|
+import psutil
|
|
|
+from typing import Optional, List, Dict
|
|
|
+import sys
|
|
|
+import os
|
|
|
+
|
|
|
+# Agregar el directorio padre al path para importar config
|
|
|
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
+from config import MEMORY_SEARCH_CONFIG, UI_MESSAGES
|
|
|
+
|
|
|
+# Constantes de Windows API
|
|
|
+PROCESS_ALL_ACCESS = 0x1F0FFF
|
|
|
+MEM_COMMIT = 0x1000
|
|
|
+MEM_RESERVE = 0x2000
|
|
|
+PAGE_EXECUTE_READWRITE = 0x40
|
|
|
+PAGE_READWRITE = 0x04
|
|
|
+PAGE_EXECUTE_READ = 0x20
|
|
|
+
|
|
|
+# Definir MEMORY_BASIC_INFORMATION
|
|
|
+class MEMORY_BASIC_INFORMATION(ctypes.Structure):
|
|
|
+ _fields_ = [
|
|
|
+ ("BaseAddress", ctypes.c_void_p),
|
|
|
+ ("AllocationBase", ctypes.c_void_p),
|
|
|
+ ("AllocationProtect", wintypes.DWORD),
|
|
|
+ ("RegionSize", ctypes.c_size_t),
|
|
|
+ ("State", wintypes.DWORD),
|
|
|
+ ("Protect", wintypes.DWORD),
|
|
|
+ ("Type", wintypes.DWORD)
|
|
|
+ ]
|
|
|
+
|
|
|
+# Cargar kernel32
|
|
|
+kernel32 = ctypes.windll.kernel32
|
|
|
+
|
|
|
+class MemoryManager:
|
|
|
+ """Gestor de memoria para operaciones de lectura/escritura en procesos"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.process_name = None
|
|
|
+ self.process_handle = None
|
|
|
+ self.process_id = None
|
|
|
+
|
|
|
+ def find_process(self) -> bool:
|
|
|
+ """Encuentra el proceso por nombre"""
|
|
|
+ try:
|
|
|
+ for proc in psutil.process_iter(['pid', 'name']):
|
|
|
+ if proc.info['name'].lower() == self.process_name.lower():
|
|
|
+ self.process_id = proc.info['pid']
|
|
|
+ return True
|
|
|
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
|
+ pass
|
|
|
+ return False
|
|
|
+
|
|
|
+ def open_process(self, process_name: str) -> bool:
|
|
|
+ """Abre el proceso para lectura/escritura"""
|
|
|
+ self.process_name = process_name
|
|
|
+ if not self.process_id:
|
|
|
+ if not self.find_process():
|
|
|
+ print(UI_MESSAGES['error']['process_not_found'].format(process_name=self.process_name))
|
|
|
+ print(" Asegúrate de que el juego esté ejecutándose.")
|
|
|
+ return False
|
|
|
+
|
|
|
+ self.process_handle = kernel32.OpenProcess(
|
|
|
+ PROCESS_ALL_ACCESS, False, self.process_id
|
|
|
+ )
|
|
|
+
|
|
|
+ if not self.process_handle:
|
|
|
+ print(UI_MESSAGES['error']['process_not_opened'].format(process_name=self.process_name))
|
|
|
+ print(" Ejecuta este programa como administrador.")
|
|
|
+ return False
|
|
|
+
|
|
|
+ print(UI_MESSAGES['success']['process_opened'].format(process_name=self.process_name, pid=self.process_id))
|
|
|
+ return True
|
|
|
+
|
|
|
+ def read_memory(self, address: int, size: int) -> Optional[bytes]:
|
|
|
+ """Lee memoria del proceso"""
|
|
|
+ buffer = ctypes.create_string_buffer(size)
|
|
|
+ bytes_read = ctypes.c_size_t()
|
|
|
+
|
|
|
+ success = kernel32.ReadProcessMemory(
|
|
|
+ self.process_handle,
|
|
|
+ ctypes.c_void_p(address),
|
|
|
+ buffer,
|
|
|
+ size,
|
|
|
+ ctypes.byref(bytes_read)
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ return buffer.raw[:bytes_read.value]
|
|
|
+ return None
|
|
|
+
|
|
|
+ def write_memory(self, address: int, data: bytes) -> bool:
|
|
|
+ """Escribe memoria en el proceso"""
|
|
|
+ # Cambiar protección de memoria
|
|
|
+ old_protect = wintypes.DWORD()
|
|
|
+ kernel32.VirtualProtectEx(
|
|
|
+ self.process_handle,
|
|
|
+ ctypes.c_void_p(address),
|
|
|
+ len(data),
|
|
|
+ PAGE_EXECUTE_READWRITE,
|
|
|
+ ctypes.byref(old_protect)
|
|
|
+ )
|
|
|
+
|
|
|
+ bytes_written = ctypes.c_size_t()
|
|
|
+ success = kernel32.WriteProcessMemory(
|
|
|
+ self.process_handle,
|
|
|
+ ctypes.c_void_p(address),
|
|
|
+ data,
|
|
|
+ len(data),
|
|
|
+ ctypes.byref(bytes_written)
|
|
|
+ )
|
|
|
+
|
|
|
+ # Restaurar protección original
|
|
|
+ kernel32.VirtualProtectEx(
|
|
|
+ self.process_handle,
|
|
|
+ ctypes.c_void_p(address),
|
|
|
+ len(data),
|
|
|
+ old_protect.value,
|
|
|
+ ctypes.byref(old_protect)
|
|
|
+ )
|
|
|
+
|
|
|
+ return success and bytes_written.value == len(data)
|
|
|
+
|
|
|
+ def get_memory_regions(self) -> List[Dict]:
|
|
|
+ """Obtiene las regiones de memoria ejecutables del proceso"""
|
|
|
+ regions = []
|
|
|
+ address = 0
|
|
|
+
|
|
|
+ while address < MEMORY_SEARCH_CONFIG['max_address']:
|
|
|
+ mbi = MEMORY_BASIC_INFORMATION()
|
|
|
+ size = kernel32.VirtualQueryEx(
|
|
|
+ self.process_handle,
|
|
|
+ ctypes.c_void_p(address),
|
|
|
+ ctypes.byref(mbi),
|
|
|
+ ctypes.sizeof(mbi)
|
|
|
+ )
|
|
|
+
|
|
|
+ if size == 0:
|
|
|
+ break
|
|
|
+
|
|
|
+ if mbi.BaseAddress is None or mbi.RegionSize is None:
|
|
|
+ address += 0x1000
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Solo regiones ejecutables
|
|
|
+ if (mbi.State == MEM_COMMIT and
|
|
|
+ mbi.Protect in [PAGE_EXECUTE_READ, PAGE_EXECUTE_READWRITE]):
|
|
|
+ regions.append({
|
|
|
+ 'base': mbi.BaseAddress,
|
|
|
+ 'size': mbi.RegionSize,
|
|
|
+ 'protect': mbi.Protect
|
|
|
+ })
|
|
|
+
|
|
|
+ address = mbi.BaseAddress + mbi.RegionSize
|
|
|
+
|
|
|
+ return regions
|
|
|
+
|
|
|
+ def close_process(self):
|
|
|
+ """Cierra el handle del proceso"""
|
|
|
+ if self.process_handle:
|
|
|
+ kernel32.CloseHandle(self.process_handle)
|
|
|
+ self.process_handle = None
|