| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import ctypes
- from ctypes import wintypes
- import psutil
- from typing import Optional, List, Dict
- import sys
- import os
- # Agregar el directorio padre al path para importar config
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from config import MEMORY_SEARCH_CONFIG, UI_MESSAGES
- # Constantes de Windows API
- PROCESS_ALL_ACCESS = 0x1F0FFF
- MEM_COMMIT = 0x1000
- MEM_RESERVE = 0x2000
- PAGE_EXECUTE_READWRITE = 0x40
- PAGE_READWRITE = 0x04
- PAGE_EXECUTE_READ = 0x20
- # Definir MEMORY_BASIC_INFORMATION
- class MEMORY_BASIC_INFORMATION(ctypes.Structure):
- _fields_ = [
- ("BaseAddress", ctypes.c_void_p),
- ("AllocationBase", ctypes.c_void_p),
- ("AllocationProtect", wintypes.DWORD),
- ("RegionSize", ctypes.c_size_t),
- ("State", wintypes.DWORD),
- ("Protect", wintypes.DWORD),
- ("Type", wintypes.DWORD)
- ]
- # Cargar kernel32
- kernel32 = ctypes.windll.kernel32
- class MemoryManager:
- """Gestor de memoria para operaciones de lectura/escritura en procesos"""
-
- def __init__(self):
- self.process_name = None
- self.process_handle = None
- self.process_id = None
-
- def find_process(self) -> bool:
- """Encuentra el proceso por nombre"""
- try:
- for proc in psutil.process_iter(['pid', 'name']):
- if proc.info['name'].lower() == self.process_name.lower():
- self.process_id = proc.info['pid']
- return True
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- return False
-
- def open_process(self, process_name: str) -> bool:
- """Abre el proceso para lectura/escritura"""
- self.process_name = process_name
- if not self.process_id:
- if not self.find_process():
- print(UI_MESSAGES['error']['process_not_found'].format(process_name=self.process_name))
- print(" Asegúrate de que el juego esté ejecutándose.")
- return False
-
- self.process_handle = kernel32.OpenProcess(
- PROCESS_ALL_ACCESS, False, self.process_id
- )
-
- if not self.process_handle:
- print(UI_MESSAGES['error']['process_not_opened'].format(process_name=self.process_name))
- print(" Ejecuta este programa como administrador.")
- return False
-
- print(UI_MESSAGES['success']['process_opened'].format(process_name=self.process_name, pid=self.process_id))
- return True
-
- def read_memory(self, address: int, size: int) -> Optional[bytes]:
- """Lee memoria del proceso"""
- buffer = ctypes.create_string_buffer(size)
- bytes_read = ctypes.c_size_t()
-
- success = kernel32.ReadProcessMemory(
- self.process_handle,
- ctypes.c_void_p(address),
- buffer,
- size,
- ctypes.byref(bytes_read)
- )
-
- if success:
- return buffer.raw[:bytes_read.value]
- return None
-
- def write_memory(self, address: int, data: bytes) -> bool:
- """Escribe memoria en el proceso"""
- # Cambiar protección de memoria
- old_protect = wintypes.DWORD()
- kernel32.VirtualProtectEx(
- self.process_handle,
- ctypes.c_void_p(address),
- len(data),
- PAGE_EXECUTE_READWRITE,
- ctypes.byref(old_protect)
- )
-
- bytes_written = ctypes.c_size_t()
- success = kernel32.WriteProcessMemory(
- self.process_handle,
- ctypes.c_void_p(address),
- data,
- len(data),
- ctypes.byref(bytes_written)
- )
-
- # Restaurar protección original
- kernel32.VirtualProtectEx(
- self.process_handle,
- ctypes.c_void_p(address),
- len(data),
- old_protect.value,
- ctypes.byref(old_protect)
- )
-
- return success and bytes_written.value == len(data)
-
- def get_memory_regions(self) -> List[Dict]:
- """Obtiene las regiones de memoria ejecutables del proceso"""
- regions = []
- address = 0
-
- while address < MEMORY_SEARCH_CONFIG['max_address']:
- mbi = MEMORY_BASIC_INFORMATION()
- size = kernel32.VirtualQueryEx(
- self.process_handle,
- ctypes.c_void_p(address),
- ctypes.byref(mbi),
- ctypes.sizeof(mbi)
- )
-
- if size == 0:
- break
-
- if mbi.BaseAddress is None or mbi.RegionSize is None:
- address += 0x1000
- continue
-
- # Solo regiones ejecutables
- if (mbi.State == MEM_COMMIT and
- mbi.Protect in [PAGE_EXECUTE_READ, PAGE_EXECUTE_READWRITE]):
- regions.append({
- 'base': mbi.BaseAddress,
- 'size': mbi.RegionSize,
- 'protect': mbi.Protect
- })
-
- address = mbi.BaseAddress + mbi.RegionSize
-
- return regions
-
- def close_process(self):
- """Cierra el handle del proceso"""
- if self.process_handle:
- kernel32.CloseHandle(self.process_handle)
- self.process_handle = None
|