memory_manager.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import ctypes
  2. from ctypes import wintypes
  3. import psutil
  4. from typing import Optional, List, Dict
  5. import sys
  6. import os
  7. # Agregar el directorio padre al path para importar config
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. from config import MEMORY_SEARCH_CONFIG, UI_MESSAGES
  10. # Constantes de Windows API
  11. PROCESS_ALL_ACCESS = 0x1F0FFF
  12. MEM_COMMIT = 0x1000
  13. MEM_RESERVE = 0x2000
  14. PAGE_EXECUTE_READWRITE = 0x40
  15. PAGE_READWRITE = 0x04
  16. PAGE_EXECUTE_READ = 0x20
  17. # Definir MEMORY_BASIC_INFORMATION
  18. class MEMORY_BASIC_INFORMATION(ctypes.Structure):
  19. _fields_ = [
  20. ("BaseAddress", ctypes.c_void_p),
  21. ("AllocationBase", ctypes.c_void_p),
  22. ("AllocationProtect", wintypes.DWORD),
  23. ("RegionSize", ctypes.c_size_t),
  24. ("State", wintypes.DWORD),
  25. ("Protect", wintypes.DWORD),
  26. ("Type", wintypes.DWORD)
  27. ]
  28. # Cargar kernel32
  29. kernel32 = ctypes.windll.kernel32
  30. class MemoryManager:
  31. """Gestor de memoria para operaciones de lectura/escritura en procesos"""
  32. def __init__(self):
  33. self.process_name = None
  34. self.process_handle = None
  35. self.process_id = None
  36. def find_process(self) -> bool:
  37. """Encuentra el proceso por nombre"""
  38. try:
  39. for proc in psutil.process_iter(['pid', 'name']):
  40. if proc.info['name'].lower() == self.process_name.lower():
  41. self.process_id = proc.info['pid']
  42. return True
  43. except (psutil.NoSuchProcess, psutil.AccessDenied):
  44. pass
  45. return False
  46. def open_process(self, process_name: str) -> bool:
  47. """Abre el proceso para lectura/escritura"""
  48. self.process_name = process_name
  49. if not self.process_id:
  50. if not self.find_process():
  51. print(UI_MESSAGES['error']['process_not_found'].format(process_name=self.process_name))
  52. print(" Asegúrate de que el juego esté ejecutándose.")
  53. return False
  54. self.process_handle = kernel32.OpenProcess(
  55. PROCESS_ALL_ACCESS, False, self.process_id
  56. )
  57. if not self.process_handle:
  58. print(UI_MESSAGES['error']['process_not_opened'].format(process_name=self.process_name))
  59. print(" Ejecuta este programa como administrador.")
  60. return False
  61. print(UI_MESSAGES['success']['process_opened'].format(process_name=self.process_name, pid=self.process_id))
  62. return True
  63. def read_memory(self, address: int, size: int) -> Optional[bytes]:
  64. """Lee memoria del proceso"""
  65. buffer = ctypes.create_string_buffer(size)
  66. bytes_read = ctypes.c_size_t()
  67. success = kernel32.ReadProcessMemory(
  68. self.process_handle,
  69. ctypes.c_void_p(address),
  70. buffer,
  71. size,
  72. ctypes.byref(bytes_read)
  73. )
  74. if success:
  75. return buffer.raw[:bytes_read.value]
  76. return None
  77. def write_memory(self, address: int, data: bytes) -> bool:
  78. """Escribe memoria en el proceso"""
  79. # Cambiar protección de memoria
  80. old_protect = wintypes.DWORD()
  81. kernel32.VirtualProtectEx(
  82. self.process_handle,
  83. ctypes.c_void_p(address),
  84. len(data),
  85. PAGE_EXECUTE_READWRITE,
  86. ctypes.byref(old_protect)
  87. )
  88. bytes_written = ctypes.c_size_t()
  89. success = kernel32.WriteProcessMemory(
  90. self.process_handle,
  91. ctypes.c_void_p(address),
  92. data,
  93. len(data),
  94. ctypes.byref(bytes_written)
  95. )
  96. # Restaurar protección original
  97. kernel32.VirtualProtectEx(
  98. self.process_handle,
  99. ctypes.c_void_p(address),
  100. len(data),
  101. old_protect.value,
  102. ctypes.byref(old_protect)
  103. )
  104. return success and bytes_written.value == len(data)
  105. def get_memory_regions(self) -> List[Dict]:
  106. """Obtiene las regiones de memoria ejecutables del proceso"""
  107. regions = []
  108. address = 0
  109. while address < MEMORY_SEARCH_CONFIG['max_address']:
  110. mbi = MEMORY_BASIC_INFORMATION()
  111. size = kernel32.VirtualQueryEx(
  112. self.process_handle,
  113. ctypes.c_void_p(address),
  114. ctypes.byref(mbi),
  115. ctypes.sizeof(mbi)
  116. )
  117. if size == 0:
  118. break
  119. if mbi.BaseAddress is None or mbi.RegionSize is None:
  120. address += 0x1000
  121. continue
  122. # Solo regiones ejecutables
  123. if (mbi.State == MEM_COMMIT and
  124. mbi.Protect in [PAGE_EXECUTE_READ, PAGE_EXECUTE_READWRITE]):
  125. regions.append({
  126. 'base': mbi.BaseAddress,
  127. 'size': mbi.RegionSize,
  128. 'protect': mbi.Protect
  129. })
  130. address = mbi.BaseAddress + mbi.RegionSize
  131. return regions
  132. def close_process(self):
  133. """Cierra el handle del proceso"""
  134. if self.process_handle:
  135. kernel32.CloseHandle(self.process_handle)
  136. self.process_handle = None