diff --git a/.gitignore b/.gitignore index 521cfd3..e0a1364 100644 --- a/.gitignore +++ b/.gitignore @@ -3,11 +3,7 @@ node_modules CRITICAL_ERROR.txt **/__pycache__/** -stuff/ -extractors/** logs/ -nmspy/data/functions/ .cache/* -extractors_v2/extracted/** .venv/ diff --git a/LICENCE.md b/LICENCE.md new file mode 100644 index 0000000..d176b42 --- /dev/null +++ b/LICENCE.md @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 monkeyman192 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..fab9e8d --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include nmspy/pymhf.cfg diff --git a/NMS.py.cfg b/NMS.py.cfg deleted file mode 100644 index 5202f1f..0000000 --- a/NMS.py.cfg +++ /dev/null @@ -1,7 +0,0 @@ -[NMS] -path = C:/Games/No Man's Sky/Binaries/NMS.exe -hash = 014f5fd1837e2bd8356669b92109fd3add116137 - -[NMS.py] -log_level = info - diff --git a/docs/examples/basic_mod.md b/docs/examples/basic_mod.md index f5a34ea..e99e069 100644 --- a/docs/examples/basic_mod.md +++ b/docs/examples/basic_mod.md @@ -4,7 +4,7 @@ Our first mod will be something simple which will demonstate a number of concept ```py import nmspy.data.function_hooks as hooks -from nmspy.hooking import disable, main_loop, on_key_pressed, on_key_release +from pymhf.core.hooking import disable, main_loop, on_key_pressed, on_key_release from nmspy.memutils import map_struct import nmspy.data.structs as nms_structs from nmspy.mod_loader import NMSMod diff --git a/docs/index.md b/docs/index.md index 8f75cbd..f739f90 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,2 +1,4 @@ -# NMS.py +# pyMHF +*pyMHF* is a python Modding and Hooking Framework. +It is designed to make it very easy to create libraries for any game or application which can then be used to make mods. diff --git a/docs/setup.md b/docs/setup.md index 46e484e..9ce4733 100644 --- a/docs/setup.md +++ b/docs/setup.md @@ -3,10 +3,10 @@ 1. Ensure that python 3.9+ is installed 1. Install the required dependencies. Run the following in the current directory: `python -m pip install -r requirements.txt` 1. (Optional - only if you want to be able to reload mods) Install `cyminhook` from the `_cyminhook` folder. I have precompiled wheels for both python 3.9 and 3.10. Install by running `python -m pip install ./_cyminhook/cyminhook-0.1.4-cp39-cp39-win_amd64.whl` (change `39` to `310` for python 3.10). -1. Modify `NMS.py.cfg` to have the correct binary path. Note that currently the only supported binary is the one which has the hash listed in the file. You can check the hash of your NMS binary by running `certutil -hashfile "NMS.exe" SHA1` in the directory with the NMS.exe binary. +1. Modify `pymhf.cfg` to have the correct binary path. Note that currently the only supported binary is the one which has the hash listed in the file. You can check the hash of your NMS binary by running `certutil -hashfile "NMS.exe" SHA1` in the directory with the NMS.exe binary. 1. in a terminal run `python main.py` -You should have another popup appear with the NMS.py logo at the top, and then a series of log messages. +You should have another popup appear with the pyMHF logo at the top, and then a series of log messages. If this doesn't occur then your firewall may be blocking the port 6770, so make sure that a TCP connection is allowed on this port on your local network (ie. 127.0.0.0, or possibly 0.0.0.) If all goes well you should see `"Serving on executor ('127.0.0.1', 6770)"` diff --git a/docs/writing_libraries.md b/docs/writing_libraries.md new file mode 100644 index 0000000..e5b62eb --- /dev/null +++ b/docs/writing_libraries.md @@ -0,0 +1,21 @@ +# Writing Libraries + +The primary usage of *pyMHF* is to facilitate writing python libraries which can then be used to write mods. +*pyMHF* provides all the tools required to make setting up a library easy, so that one only has to provide the definitions and all the hooking and mod complexity will be handled automatically. + +Follow the next steps to get your library project set up. + +## Creating the folder structure + +``` +LibraryName +├── functions +│ ├── __init__.py +│ ├── call_sigs.py +│ ├── hooks.py +│ ├── offsets.py +├── types +│ ├── structs.py +├── __init__.py +└── pymhf.cfg +``` \ No newline at end of file diff --git a/docs/writing_mods.md b/docs/writing_mods.md index dc3ea53..ba9fe12 100644 --- a/docs/writing_mods.md +++ b/docs/writing_mods.md @@ -1,6 +1,6 @@ -## Writing a mod using NMS.py +## Writing a mod using pyMHF -NMS.py contains offsets and mappings for many functions in the game. +*pyMHF* contains offsets and mappings for many functions in the game. It aims to make hooking the various game functions as simple as possible with an easy to understand syntax. First, we need a mod class which will contain all the relevant hooks for your mod. @@ -8,9 +8,9 @@ First, we need a mod class which will contain all the relevant hooks for your mo The basics look something like this: ```py -from nmspy.mod_loader import NMSMod +from pymhf import Mod -class MyMod(NMSMod): +class MyMod(Mod): __author__ = "you!" __description__ = "Your fantastic mod!" __version__ = "1.0" @@ -24,6 +24,7 @@ This is best shown with an example: ```py import logging +from pymhf import Mod import nmspy.data.function_hooks as hooks class MyMod(NMSMod): diff --git a/injected.py b/injected.py deleted file mode 100644 index a1ae6ad..0000000 --- a/injected.py +++ /dev/null @@ -1,241 +0,0 @@ -import asyncio -import builtins -from concurrent.futures import ThreadPoolExecutor -import configparser -import ctypes -import ctypes.wintypes -from functools import partial -import locale -import logging -import logging.handlers -import os -import os.path as op -import time -import traceback -from typing import Optional -import sys - - -socket_logger_loaded = False -nms = None - -try: - rootLogger = logging.getLogger('') - rootLogger.setLevel(logging.INFO) - socketHandler = logging.handlers.SocketHandler( - "localhost", - logging.handlers.DEFAULT_TCP_LOGGING_PORT - ) - rootLogger.addHandler(socketHandler) - logging.info("Loading NMS.py...") - socket_logger_loaded = True - - import nmspy._internal as _internal - - # Before any nmspy.data imports occur, set the os.environ value for the - # binary hash: - if _internal.BINARY_HASH: - os.environ["NMS_BINARY_HASH"] = _internal.BINARY_HASH - else: - # If there is no binary hash, something has gone wrong. Exit now since - # we can't continue. - sys.exit(-1) - - config = configparser.ConfigParser() - # Currently it's in the same directory as this file... - cfg_file = op.join(_internal.CWD, "NMS.py.cfg") - read = config.read(cfg_file) - log_level = config.get("NMS.py", "log_level", fallback="info") - debug_mode = log_level.lower() == "debug" - if debug_mode: - rootLogger.setLevel(logging.DEBUG) - - import nmspy.data.structs as nms_structs - from nmspy.hooking import hook_manager - from nmspy.protocols import ( - ExecutionEndedException, - custom_exception_handler, - ESCAPE_SEQUENCE - ) - from nmspy.memutils import map_struct, getsize - from nmspy.mod_loader import ModManager - from nmspy.caching import globals_cache, load_caches - import nmspy.common as nms - - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - - hook_logger = logging.getLogger("HookManager") - - # Some variables to handle some globally defined objects or contexts. - # Note that depending on the game version one or more of these may never be - # set. - metadata_registry = {} - nvg_context = None - - # Before any hooks are registered, load the caches. - load_caches(_internal.BINARY_HASH) - - # Since we are running inside a thread, `asyncio.get_event_loop` will - # generally fail. - # Detect this and create a new event loop anyway since we are running in a - # thread under the process we have been injected into, and not the original - # python thread that is running the show. - try: - loop = asyncio.get_event_loop() - except (RuntimeError, ValueError) as e: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Set the custom exception handler on the loop - loop.set_exception_handler(custom_exception_handler) - - # NOTE: This class MUST be defined here. If it's defined in a separate file - # then the hack done to persist data to the current global context will not - # work. - class ExecutingProtocol(asyncio.Protocol): - """ A protocol factory to be passed to a asyncio loop.create_server call - which will accept requests, execute them and persist any variables to - globals().""" - def connection_made(self, transport: asyncio.transports.WriteTransport): - peername = transport.get_extra_info('peername') - self.transport: asyncio.transports.WriteTransport = transport - builtins.print('Connection from {}'.format(peername)) - # Overwrite print so that any `print` statements called in the commands - # to be executed will be written back out of the socket they came in. - globals()['print'] = partial(builtins.print, file=self) - - def write(self, value: str): - """ Method to allow this protocol to be used as a file to write to. - This allows us to have `print` write to this protocol.""" - self.transport.write(value.encode()) - - def data_received(self, __data: bytes): - # Have an "escape sequence" which will force this to exit. - # This way we can kill it if need be from the other end. - if __data == ESCAPE_SEQUENCE: - print("\nReceived exit command!") - raise ExecutionEndedException - try: - exec(__data.decode()) - except: - print(traceback.format_exc()) - else: - self.persist_to_globals(locals()) - - def persist_to_globals(self, data: dict): - """ Take the dictionary which was determined by calling `locals()`, and - update `gloabsl()` with it.""" - data.pop("self") - data.pop(f"_{type(self).__name__}__data") - globals().update(data) - - def eof_received(self): - # Do nothing. - pass - - def connection_lost(self, exc): - # Once the connection is lost. Restore `print` back to normal. - globals()['print'] = builtins.print - - - def top_globals(limit: Optional[int] = 10): - """ Return the top N objects in globals() by size (in bytes). """ - globs = globals() - data = [] - for key, value in globs.items(): - if not key.startswith("__"): - try: - data.append((key, *getsize(value))) - except TypeError: - pass - data.sort(key=lambda x: x[1], reverse=True) - if limit is not None: - return data[:limit] - else: - return data - - # Patch the locale to make towupper work. - # Python must change this so we change it back otherwise calls to `towupper` - # in the various functions to set and get keypresses don't work correctly. - locale.setlocale(locale.LC_CTYPE, "C") - - # Load any globals based on any cached offsets. - for global_name, relative_offset in globals_cache.items(): - # For each global, construct the object and then assign it to the - # nms. - setattr( - nms, - global_name, - map_struct( - _internal.BASE_ADDRESS + relative_offset, - getattr(nms_structs, "c" + global_name) - ) - ) - - nms.executor = ThreadPoolExecutor(1, thread_name_prefix="NMS_Executor") - _internal._executor = ThreadPoolExecutor(1, thread_name_prefix="NMS.py_Internal_Executor") - - mod_manager = ModManager(hook_manager) - - # TODO: Need to re-write how we load mods. - # To enable compound mods - - # First, load our internal mod before anything else. - mod_manager.load_mod_folder(op.join(_internal.CWD, "nmspy/_internals/mods")) - mod_manager.enable_all(quiet=not debug_mode) - - logging.info("NMS.py injection complete!") - - # Also load any mods after all the internal hooks: - start_time = time.time() - _loaded = 0 - bold = "\u001b[4m" - reset = "\u001b[0m" - logging.info(bold + "Loading mods" + reset) - try: - mod_manager.load_mod_folder(op.join(_internal.CWD, "mods")) - _loaded = mod_manager.enable_all() - except: - logging.exception(traceback.format_exc()) - logging.info(f"Loaded {_loaded} mods in {time.time() - start_time:.3f}s") - - for func_name, hook_class in hook_manager.failed_hooks.items(): - offset = hook_class.target - _data = (ctypes.c_char * 0x20).from_address(offset) - hook_logger.error(f"Hook {func_name} first 0x20 bytes: {_data.value.hex()}") - - # Each client connection will create a new protocol instance - coro = loop.create_server(ExecutingProtocol, '127.0.0.1', 6770) - server = loop.run_until_complete(coro) - - logging.info(f'Serving on executor {server.sockets[0].getsockname()}') - loop.run_forever() - - # Close the server. - server.close() - loop.run_until_complete(server.wait_closed()) - loop.close() - -except Exception as e: - # If we hit this, something has gone wrong. Log to the current directory. - - try: - # Try and log to the current working directory. - # Sometimes this may fail as the error is an "internal" one, so we will - # add a fail-safe to log to the users' home directory so that it at - # least is logged somewhere. - # TODO: Document this behaviour. - import nmspy._internal as _internal - - with open(op.join(_internal.CWD, "CRITICAL_ERROR.txt"), "w") as f: - traceback.print_exc(file=f) - if socket_logger_loaded: - logging.error("An error occurred while loading NMS.py:") - logging.exception(traceback.format_exc()) - except: - with open(op.join(op.expanduser("~"), "CRITICAL_ERROR.txt"), "w") as f: - traceback.print_exc(file=f) -finally: - if nms and nms.executor is not None: - nms.executor.shutdown(wait=False, cancel_futures=True) - diff --git a/log_terminal.py b/log_terminal.py deleted file mode 100644 index 2a368e9..0000000 --- a/log_terminal.py +++ /dev/null @@ -1,123 +0,0 @@ -import pickle -import logging -import logging.handlers -import os -import os.path as op -import select -import socketserver -import struct -import time - -import nmspy - - -# Logo generated using https://patorjk.com/software/taag/ -LOGO = """ - ____ _____ ____ ____ ______ _______ ____ ____ -|_ \|_ _| |_ \ / _| .' ____ \ |_ __ \ |_ _||_ _| - | \ | | | \/ | | (___ \_| | |__) | \ \ / / - | |\ \| | | |\ /| | _.____`. | ___/ \ \/ / - _| |_\ |_ _| |_\/_| |_ | \____) | _ _| |_ _| |_ -|_____|\____| |_____||_____| \______.' (_) |_____| |______| -""" - - -CWD = op.dirname(__file__) -LOGDIR = op.join(CWD, "logs") -os.makedirs(LOGDIR, exist_ok=True) - -# NB: This code is mostly taken from the python stdlib docs. - -class LogRecordStreamHandler(socketserver.StreamRequestHandler): - """Handler for a streaming logging request. - - This basically logs the record using whatever logging policy is - configured locally. - """ - - def handle(self): - """ - Handle multiple requests - each expected to be a 4-byte length, - followed by the LogRecord in pickle format. Logs the record - according to whatever policy is configured locally. - """ - try: - while True: - chunk = self.connection.recv(4) - if len(chunk) < 4: - break - slen = struct.unpack('>L', chunk)[0] - chunk = self.connection.recv(slen) - while len(chunk) < slen: - chunk = chunk + self.connection.recv(slen - len(chunk)) - obj = self.unPickle(chunk) - record = logging.makeLogRecord(obj) - self.handleLogRecord(record) - except ConnectionResetError: - return - - def unPickle(self, data): - return pickle.loads(data) - - def handleLogRecord(self, record): - # If a name is specified, we use the named logger rather than the one - # implied by the record. - if self.server.logname is not None: - name = self.server.logname - else: - name = record.name - logger = logging.getLogger(name) - # N.B. EVERY record gets logged. This is because Logger.handle - # is normally called AFTER logger-level filtering. If you want - # to do filtering, do it at the client end to save wasting - # cycles and network bandwidth! - logger.handle(record) - - -class LogRecordSocketReceiver(socketserver.ThreadingTCPServer): - """ - Simple TCP socket-based logging receiver suitable for testing. - """ - - allow_reuse_address = True - - def __init__(self, host='localhost', - port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, - handler=LogRecordStreamHandler): - socketserver.ThreadingTCPServer.__init__(self, (host, port), handler) - self.abort = 0 - self.timeout = 1 - self.logname = None - - def serve_until_stopped(self): - abort = 0 - try: - while not abort: - rd, wr, ex = select.select([self.socket.fileno()], - [], [], - self.timeout) - if rd: - self.handle_request() - abort = self.abort - except KeyboardInterrupt: - print("Ending logging server...") - -def main(): - formatter = logging.Formatter("%(asctime)s %(name)-15s %(levelname)-6s %(message)s") - # TODO: Need to make this strip the ANSI escape chars from the written log - file_handler = logging.FileHandler(op.join(LOGDIR, f"nmspy-{time.strftime('%Y%m%dT%H%M%S')}.log"), encoding="utf-8") - file_handler.setFormatter(formatter) - stream_handler = logging.StreamHandler() - stream_handler.setFormatter(formatter) - root_logger = logging.getLogger() - root_logger.addHandler(file_handler) - root_logger.addHandler(stream_handler) - tcpserver = LogRecordSocketReceiver() - print(LOGO) - print(f"Version: {nmspy.__version__}") - print('Logger waiting for backend process... Please wait...') - tcpserver.serve_until_stopped() - - -if __name__ == '__main__': - main() diff --git a/logo.txt b/logo.txt new file mode 100644 index 0000000..b4e8189 --- /dev/null +++ b/logo.txt @@ -0,0 +1,6 @@ + ____ _____ ____ ____ ______ _______ ____ ____ +|_ \|_ _| |_ \ / _| .' ____ \ |_ __ \ |_ _||_ _| + | \ | | | \/ | | (___ \_| | |__) | \ \ / / + | |\ \| | | |\ /| | _.____`. | ___/ \ \/ / + _| |_\ |_ _| |_\/_| |_ | \____) | _ _| |_ _| |_ +|_____|\____| |_____||_____| \______.' (_) |_____| |______| \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 208f5bf..0000000 --- a/main.py +++ /dev/null @@ -1,180 +0,0 @@ -import asyncio -import concurrent.futures -import configparser -from functools import partial -import os -import os.path as op -from signal import SIGTERM -import time - - -import pymem -import pymem.process - -from nmspy.caching import hash_bytes -from nmspy.process import start_process -from nmspy.protocols import ESCAPE_SEQUENCE, TerminalProtocol -from nmspy.logging import open_log_console - - -CWD = op.dirname(__file__) - - -# Parse the config file first so we can load anything we need to know. -config = configparser.ConfigParser() -# Currently it's in the same directory as this file... -cfg_file = op.join(CWD, "NMS.py.cfg") -read = config.read(cfg_file) -binary_path = config["NMS"]["path"] -root_dir = config["NMS"]["root_dir"] - - -# Steam: -# binary_path = "C:/Program Files (x86)/Steam/steamapps/common/No Man's Sky/Binaries/NMS.exe" -# GOG 4.13, aka, "the good shit": -# binary_path = "C:/Games/No Man's Sky/Binaries/NMS.exe" -executor = None -futures = [] -loop = asyncio.get_event_loop() - - -def kill_injected_code(loop): - # End one last "escape sequence" message: - client_completed = asyncio.Future() - client_factory = partial( - TerminalProtocol, - message=ESCAPE_SEQUENCE.decode(), - future=client_completed - ) - factory_coroutine = loop.create_connection( - client_factory, - '127.0.0.1', - 6770, - ) - loop.run_until_complete(factory_coroutine) - loop.run_until_complete(client_completed) - - -try: - log_pid = open_log_console(op.join(CWD, "log_terminal.py")) - # Have a small nap just to give it some time. - time.sleep(0.5) - print(f"Opened the console log with PID: {log_pid}") - with open(binary_path, "rb") as f: - binary_hash = hash_bytes(f) - print(f"Exe hash is: {binary_hash}") - - process_handle, thread_handle, pid, tid = start_process(binary_path, creationflags=0x4) - print(f'Opened NMS with PID: {pid}') - # Wait some time for the data to be written to memory. - time.sleep(3) - - # Get the base address - nms = pymem.Pymem('NMS.exe') - print(f"proc id from pymem: {nms.process_id}") - nms.inject_python_interpreter() - pb = nms.process_base - binary_base = pb.lpBaseOfDll - binary_size = pb.SizeOfImage - # Inject the base address as a "global" variable into the python interpreter - # of NMS.exe - print(f"The NMS handle: {nms.process_handle}, base: 0x{binary_base:X}") - - # Inject some other dlls: - # pymem.process.inject_dll(nms.process_handle, b"path") - - cwd = CWD.replace("\\", "\\\\") - nms.inject_python_shellcode(f"CWD = '{cwd}'") - nms.inject_python_shellcode("import sys") - nms.inject_python_shellcode("sys.path.append(CWD)") - - # Inject _preinject AFTER modifying the sys.path for now until we have - # nmspy installed via pip. - with open(op.join(CWD, "nmspy", "_scripts", "_preinject.py"), "r") as f: - _preinject_shellcode = f.read() - nms.inject_python_shellcode(_preinject_shellcode) - # Inject the common NMS variables which are required for general use. - nms.inject_python_shellcode(f"nmspy._internal.BASE_ADDRESS = {binary_base}") - nms.inject_python_shellcode(f"nmspy._internal.SIZE_OF_IMAGE = {binary_size}") - nms.inject_python_shellcode(f"nmspy._internal.CWD = '{cwd}'") - nms.inject_python_shellcode(f"nmspy._internal.HANDLE = {nms.process_handle}") - nms.inject_python_shellcode(f"nmspy._internal.BINARY_HASH = '{binary_hash}'") - nms.inject_python_shellcode( - f"nmspy._internal.NMS_ROOT_DIR = \"{root_dir}\"" - ) - # Inject the script - with open(op.join(CWD, "injected.py"), "r") as f: - shellcode = f.read() - executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) - print("Injecting hooking code") - futures.append(executor.submit(nms.inject_python_shellcode, shellcode)) - - try: - input("Press something to start NMS") - except KeyboardInterrupt: - # Kill the injected code so that we don't wait forever for the future to end. - kill_injected_code(loop) - raise - print(f"Opening thread {thread_handle}") - # thread_handle = pymem.process.open_thread(main_thread.thread_id) - pymem.ressources.kernel32.ResumeThread(thread_handle) - - print("NMS.py interactive python command prompt") - print("Type any valid python commands to execute them within the NMS process") - while True: - try: - input_ = input(">>> ") - client_completed = asyncio.Future() - client_factory = partial( - TerminalProtocol, - message=input_, - future=client_completed - ) - factory_coroutine = loop.create_connection( - client_factory, - '127.0.0.1', - 6770, - ) - loop.run_until_complete(factory_coroutine) - loop.run_until_complete(client_completed) - except KeyboardInterrupt: - kill_injected_code(loop) - raise -except KeyboardInterrupt: - # If it's a keyboard interrupt, just pass as it will have bubbled up from - # below. - pass -except Exception as e: - # Any other exception we want to actually know about. - print(e) - raise -finally: - loop.close() - try: - for future in concurrent.futures.as_completed(futures, timeout=5): - print(future) - except TimeoutError: - # Don't really care. - print("Got a time out error...") - pass - if executor is not None: - executor.shutdown(wait=False) - try: - with open(op.join(CWD, "end.py"), "r") as f: - close_shellcode = f.read() - nms.inject_python_shellcode(close_shellcode) - print("Just injected the close command?") - # Kill the NMS process. - except: - pass - finally: - print("Forcibly shutting down NMS") - time.sleep(1) - for _pid in {nms.process_id, pid, log_pid}: - try: - os.kill(_pid, SIGTERM) - print(f"Just killed process {_pid}") - except: - # If we can't kill it, it's probably already dead. Just continue. - print(f"Failed to kill process {_pid}. It was likely already dead...") - pass diff --git a/mods/disable_mod_warning.py b/mods/disable_mod_warning.py index 61a905d..9f4fd25 100644 --- a/mods/disable_mod_warning.py +++ b/mods/disable_mod_warning.py @@ -1,15 +1,17 @@ -from nmspy.hooking import one_shot -from nmspy.memutils import map_struct +from pymhf.core.hooking import one_shot +from pymhf.core.memutils import map_struct +from pymhf.gui import no_gui import nmspy.data.structs as nms_structs -import nmspy.data.function_hooks as hooks -from nmspy.mod_loader import NMSMod +import nmspy.data.functions.hooks as hooks +from nmspy import NMSMod +@no_gui class DisableModWarning(NMSMod): __author__ = "monkeyman192" __description__ = "Disable mod warning screen" __version__ = "1.0" - __NMSPY_required_version__ = "0.6.0" + __NMSPY_required_version__ = "0.7.0" @one_shot @hooks.cTkFileSystem.Construct.after diff --git a/mods/misc_tests.py b/mods/misc_tests.py index 0c30311..f787aa5 100644 --- a/mods/misc_tests.py +++ b/mods/misc_tests.py @@ -1,12 +1,12 @@ # import ctypes import logging -from nmspy.hooking import one_shot, disable, manual_hook +from pymhf.core.hooking import one_shot, disable import nmspy.common as nms -from nmspy.memutils import map_struct +from pymhf.core.memutils import map_struct import nmspy.data.structs as nms_structs -import nmspy.data.function_hooks as hooks -from nmspy.mod_loader import NMSMod +import nmspy.data.functions.hooks as hooks +from nmspy import NMSMod # from nmspy._types import FUNCDEF @@ -18,6 +18,7 @@ # ) +@disable class MiscMod(NMSMod): __author__ = "monkeyman192" __description__ = "Misc stuff..." @@ -93,13 +94,11 @@ def generate_solarsystem(self, this, lbUseSettingsFile, lSeed): # return self.original(this, leIndex) @one_shot - @hooks.cGcGameState.LoadSpecificSave + @hooks.cGcGameState.LoadSpecificSave.after @disable - def load_specific_save(self, this, leSpecificSave): + def load_specific_save(self, this, leSpecificSave, _result_): logging.info(f"cGcGameState*: {this}, save type: {leSpecificSave}") - ret = hooks.cGcGameState.LoadSpecificSave.original(this, leSpecificSave) - logging.info(str(ret)) - return ret + logging.info(_result_) def slow_thing(self): import time diff --git a/mods/test_mod.py b/mods/test_mod.py index e7b80d7..d5b1a7d 100644 --- a/mods/test_mod.py +++ b/mods/test_mod.py @@ -1,17 +1,21 @@ import logging import ctypes +from dataclasses import dataclass -import nmspy.data.function_hooks as hooks -from nmspy.hooking import disable, main_loop, on_key_pressed, on_key_release -from nmspy.memutils import map_struct +import nmspy.data.functions.hooks as hooks +from pymhf.core.hooking import disable, on_key_pressed, on_key_release +from pymhf.core.memutils import map_struct import nmspy.data.structs as nms_structs -from nmspy.mod_loader import NMSMod, ModState -from nmspy.calling import call_function - +from pymhf.core.mod_loader import ModState +from nmspy import NMSMod +from nmspy.decorators import main_loop +from pymhf.core.calling import call_function +from pymhf.gui.decorators import gui_variable, gui_button, STRING +@dataclass class TestModState(ModState): - def __init__(self): - self.value: int = 1 + value: int = 1 + text: str = "Hi" @disable @@ -19,34 +23,36 @@ class TestMod(NMSMod): __author__ = "monkeyman192" __description__ = "A simple test mod" __version__ = "0.1" - __NMSPY_required_version__ = "0.6.0" + __NMSPY_required_version__ = "0.7.0" state = TestModState() def __init__(self): super().__init__() - self.text: str = "NO" self.should_print = False @property - def _text(self): - return self.text.encode() + @STRING("Replace 'Options' prefix: ") + def option_replace(self): + return self.state.text - @on_key_pressed("space") - def press(self): - self.text = f"BBB {self.state.value}" + @option_replace.setter + def option_replace(self, value): + self.state.text = value + + @property + def _text(self): + return f"{self.state.text}: {self.state.value}".encode() @on_key_release("space") def release(self): - self.text = "NO" self.state.value += 1 - @hooks.nvgText + @hooks.nvgText.before def change_test(self, ctx, x: float, y: float, string, end): if string == b"Options": string = ctypes.c_char_p(b"Hi") - call_function("nvgText", ctx, x + 30, y, ctypes.c_char_p(self._text), end) - return hooks.nvgText.original(ctx, x, y, string, end) + return ctx, x + 30, y, ctypes.c_char_p(self._text), end # @nvgText.before # def change_test_after(self, ctx, x: float, y: float, string, end): @@ -93,4 +99,4 @@ def detour(self, this): @main_loop.before def do_something(self): if self.should_print: - logging.info(self.text) + logging.info(self._text) diff --git a/nmspy/__init__.py b/nmspy/__init__.py index fcfcccd..973df82 100644 --- a/nmspy/__init__.py +++ b/nmspy/__init__.py @@ -4,4 +4,12 @@ try: __version__ = version("nmspy") except PackageNotFoundError: - __version__ = "0.6.12" + pass + +from nmspy._types import NMSMod # noqa + +import nmspy.data.structs +import nmspy.data + +__pymhf_types__ = nmspy.data.structs +__pymhf_functions__ = nmspy.data.functions diff --git a/nmspy/_internal.py b/nmspy/_internal.py deleted file mode 100644 index c64cee5..0000000 --- a/nmspy/_internal.py +++ /dev/null @@ -1,27 +0,0 @@ -from concurrent.futures import ThreadPoolExecutor - -CWD: str = "" -HANDLE = None -BINARY_HASH: str = "" -BASE_ADDRESS: int = -1 -SIZE_OF_IMAGE: int = -1 -NMS_ROOT_DIR: str = "" - -_executor: ThreadPoolExecutor = None # type: ignore - -class _GameState: - def __init__(self): - self._game_loaded = False - - @property - def game_loaded(self): - return self._game_loaded - - @game_loaded.setter - def game_loaded(self, val: bool): - # The game can become loaded, but it can't become unloaded... - if val is True: - self._game_loaded = val - - -GameState: _GameState = _GameState() diff --git a/nmspy/_internals/mods/globals.py b/nmspy/_internals/mods/globals.py index 038ff68..cfb9d30 100644 --- a/nmspy/_internals/mods/globals.py +++ b/nmspy/_internals/mods/globals.py @@ -2,24 +2,26 @@ import logging +import pymhf.core._internal as _internal +from pymhf.core.memutils import map_struct from nmspy.caching import globals_cache -import nmspy._internal as _internal import nmspy.common as nms import nmspy.data.structs as structs -import nmspy.data.function_hooks as hooks -from nmspy.memutils import map_struct -from nmspy.mod_loader import NMSMod +import nmspy.data.functions.hooks as hooks +from nmspy._types import NMSMod +from pymhf.gui.decorators import no_gui +@no_gui class _INTERNAL_LoadGlobals(NMSMod): __author__ = "monkeyman192" __description__ = "Load globals for internal use" __version__ = "0.1" - @hooks.cTkMetaData.ReadGlobalFromFile[structs.cGcWaterGlobals].after - def read_water_globals(self, lpData: int, lpacFilename: bytes): - logging.info(f"cGcWaterGlobals*: 0x{lpData:X}, filename: {lpacFilename}") - globals_cache.set("GcWaterGlobals", lpData - _internal.BASE_ADDRESS) - nms.GcWaterGlobals = map_struct(lpData, structs.cGcWaterGlobals) + # @hooks.cTkMetaData.ReadGlobalFromFile[structs.cGcWaterGlobals].after + # def read_water_globals(self, lpData: int, lpacFilename: bytes): + # logging.info(f"cGcWaterGlobals*: 0x{lpData:X}, filename: {lpacFilename}") + # globals_cache.set("GcWaterGlobals", lpData - _internal.BASE_ADDRESS) + # nms.GcWaterGlobals = map_struct(lpData, structs.cGcWaterGlobals) # TODO: Add others... diff --git a/nmspy/_internals/mods/singletons.py b/nmspy/_internals/mods/singletons.py index 1de415b..efebedc 100644 --- a/nmspy/_internals/mods/singletons.py +++ b/nmspy/_internals/mods/singletons.py @@ -1,35 +1,28 @@ from logging import getLogger import traceback +from pymhf.core._types import DetourTime +from pymhf.gui.decorators import no_gui +import pymhf.core._internal as _internal +from pymhf.core.hooking import one_shot, hook_manager +from pymhf.core.memutils import map_struct import nmspy.common as nms -import nmspy._internal as _internal import nmspy._internals.staging as staging import nmspy.data.structs as structs -import nmspy.data.function_hooks as hooks -from nmspy.hooking import one_shot, hook_manager -from nmspy.memutils import map_struct -from nmspy.mod_loader import NMSMod +import nmspy.data.functions.hooks as hooks +from nmspy._types import NMSMod from nmspy.states import StateEnum -from nmspy.utils import safe_assign_enum logger = getLogger() +@no_gui class _INTERNAL_LoadSingletons(NMSMod): __author__ = "monkeyman192" __description__ = "Load singletons and other important objects" __version__ = "0.1" - def run_state_change_funcs(self, state): - for func in hook_manager.on_state_change_funcs[state]: - try: - func() - except: - logger.exception(traceback.format_exception()) - # TODO: remove the hook... - pass - @one_shot @hooks.cTkDynamicGravityControl.Construct.before def load_gravity_singleton(self, this): @@ -53,14 +46,17 @@ def cGcRealityManager_initializer(self, this): @hooks.cTkFSMState.StateChange.after def state_change(self, this, lNewStateID, lpUserData, lbForceRestart): + logger.info(f"New State: {lNewStateID}") if lNewStateID == StateEnum.ApplicationGameModeSelectorState.value: curr_gamestate = _internal.GameState.game_loaded _internal.GameState.game_loaded = True if _internal.GameState.game_loaded != curr_gamestate: # Only call this the first time the game loads - _internal._executor.submit(self.run_state_change_funcs, "MODESELECTOR") + hook_manager.call_custom_callbacks("MODESELECTOR", DetourTime.AFTER) + hook_manager.call_custom_callbacks("MODESELECTOR", DetourTime.NONE) else: - _internal._executor.submit(self.run_state_change_funcs, lNewStateID.decode()) + hook_manager.call_custom_callbacks(lNewStateID.decode(), DetourTime.AFTER) + hook_manager.call_custom_callbacks(lNewStateID.decode(), DetourTime.NONE) # @one_shot # @hooks.cGcRealityManager.Construct.after @@ -72,11 +68,12 @@ def state_change(self, this, lNewStateID, lpUserData, lbForceRestart): # except: # logging.info(traceback.format_exc()) - @hooks.cGcApplication.Update - def _main_loop(self, *args): - """ The main application loop. Run any before or after functions here. """ - for func in hook_manager.main_loop_before_funcs: - func() - hooks.cGcApplication.Update.original(*args) - for func in hook_manager.main_loop_after_funcs: - func() + @hooks.cGcApplication.Update.before + def _main_loop_before(self, *args): + """ The main application loop. Run any before functions here. """ + hook_manager.call_custom_callbacks("MAIN_LOOP", DetourTime.BEFORE) + + @hooks.cGcApplication.Update.after + def _main_loop_before(self, *args): + """ The main application loop. Run any after functions here. """ + hook_manager.call_custom_callbacks("MAIN_LOOP", DetourTime.AFTER) diff --git a/nmspy/_scripts/_preinject.py b/nmspy/_scripts/_preinject.py deleted file mode 100644 index 6fac516..0000000 --- a/nmspy/_scripts/_preinject.py +++ /dev/null @@ -1,5 +0,0 @@ -# This file contains a number of things to import or set before anything else is -# injected. -# For the most part this imports stuff so that the import cache is primed so -# that any subsequent imports do not overwrite the existing data. -import nmspy._internal diff --git a/nmspy/_types.py b/nmspy/_types.py index 023eae5..8da6885 100644 --- a/nmspy/_types.py +++ b/nmspy/_types.py @@ -1,70 +1,32 @@ -# Have the NMS* class definitions here since there is a circular dependency -# otherwise. +from typing import Any -from collections import namedtuple -from ctypes import CFUNCTYPE -from _ctypes import CFuncPtr -from typing import Callable, Any, Optional -import inspect -from types import MethodType +from pymhf import Mod -import cyminhook +def _state_change_hook_predicate(value: Any) -> bool: + """ Determine if the object has the _trigger_on_state property. + This will only be methods on NMSMod classes which are decorated with + @on_state_change or on_fully_booted. + """ + return hasattr(value, "_trigger_on_state") -FUNCDEF = namedtuple("FUNCDEF", ["restype", "argtypes"]) +def _main_loop_predicate(value: Any) -> bool: + """ Determine if the objecy has the _is_main_loop_func property. + This will only be methods on Mod classes which are decorated with either + @main_loop.before or @main_loop.after + """ + return getattr(value, "_is_main_loop_func", False) -class NMSHook(cyminhook.MinHook): - original: Callable[..., Any] - target: int - detour: Callable[..., Any] - signature: CFuncPtr - _name: str - _should_enable: bool - _invalid: bool - _pattern: Optional[str] - mod: Any - def __init__(self, - *, - detour: Optional[Callable[..., Any]] = None, - signature: Optional[CFuncPtr] = None, - target: Optional[int] = None - ): - # Normally defined classes will not be "compound compatible". - # This means that they will be the only function to hook a given game - # function. - if detour is not None: - # If detour is provided, then bind it to the detour method of - # ourself. - setattr(self, "detour", MethodType(detour, self)) - self._is_compound_compatible = False - if not hasattr(self, "_should_enable"): - self._should_enable = True - for _, obj in inspect.getmembers(self): - if hasattr(obj, "_before"): - self._before_hook = obj - self._is_compound_compatible = True - elif hasattr(obj, "_after"): - self._after_hook = obj - self._is_compound_compatible = True - # Only initialize the cyminhook subclass if we are not a compound-compatible hook. - # If it's a compound hook then initializing it will cause and error to - # occur. We will initialize the hook properly when we create the actual - # compound hook. - if not self._is_compound_compatible: - super().__init__(signature=signature, target=target) - self.state = "initialized" +def _state_change_hook_predicate(value: Any) -> bool: + """ Determine if the object has the _trigger_on_state property. + This will only be methods on Mod classes which are decorated with + @on_state_change or on_fully_booted. + """ + return hasattr(value, "_trigger_on_state") - def close(self): - super().close() - self.state = "closed" - def enable(self): - if self._should_enable: - super().enable() - self.state = "enabled" - - def disable(self): - super().disable() - self.state = "disabled" \ No newline at end of file +class NMSMod(Mod): + def __init__(self): + super().__init__() diff --git a/nmspy/caching.py b/nmspy/caching.py index cdc7d6a..bdfb1cd 100644 --- a/nmspy/caching.py +++ b/nmspy/caching.py @@ -7,7 +7,7 @@ import os.path as op from typing import Optional -import nmspy._internal as _internal +import pymhf.core._internal as _internal CACHE_DIR = ".cache" diff --git a/nmspy/calling.py b/nmspy/calling.py deleted file mode 100644 index fc22012..0000000 --- a/nmspy/calling.py +++ /dev/null @@ -1,48 +0,0 @@ -from ctypes import CFUNCTYPE -from logging import getLogger -from typing import Optional - -import nmspy._internal as _internal -from nmspy.data.function_call_sigs import FUNC_CALL_SIGS -from nmspy.data import FUNC_OFFSETS -from nmspy._types import FUNCDEF - - -calling_logger = getLogger("CallingManager") - - -def call_function(name: str, *args, overload: Optional[str] = None): - _sig = FUNC_CALL_SIGS[name] - offset = FUNC_OFFSETS[name] - if isinstance(_sig, FUNCDEF): - sig = CFUNCTYPE(_sig.restype, *_sig.argtypes) - else: - # Look up the overload: - if (osig := _sig.get(overload)) is not None: # type: ignore - sig = CFUNCTYPE(osig.restype, *osig.argtypes) - else: - # Need to fallback on something. Raise a warning that no - # overload was defined and that it will fallback to the - # first entry in the dict. - first = list(_sig.items())[0] - calling_logger.warning( - f"No function arguments overload was provided for {name}. " - ) - calling_logger.warning( - f"Falling back to the first overload ({first[0]})") - sig = CFUNCTYPE(first[1].restype, *first[1].argtypes) - if isinstance(offset, dict): - # Handle overloads - if (_offset := offset.get(overload)) is not None: # type: ignore - offset = _offset - else: - _offset = list(offset.items())[0] - calling_logger.warning( - f"No function arguments overload was provided for {name}. " - ) - calling_logger.warning( - f"Falling back to the first overload ({_offset[0]})") - offset = _offset[1] - - cfunc = sig(_internal.BASE_ADDRESS + offset) - return cfunc(*args) diff --git a/nmspy/common.py b/nmspy/common.py index c504f56..160ba42 100644 --- a/nmspy/common.py +++ b/nmspy/common.py @@ -2,7 +2,7 @@ import os import os.path as op -import nmspy._internal as _internal +import pymhf.core._internal as _internal import nmspy.data.structs as nms_structs # Store all the globals here like this so that we may access them easily and @@ -47,6 +47,6 @@ # TODO: Move somewhere else? Not sure where but this doesn't really fit here... executor: ThreadPoolExecutor = None # type: ignore -mod_save_dir = op.join(_internal.NMS_ROOT_DIR, "NMSPY_SAVES") +mod_save_dir = op.join(_internal.GAME_ROOT_DIR, "MOD_SAVES") if not op.exists(mod_save_dir): os.makedirs(mod_save_dir) diff --git a/nmspy/data/__init__.old b/nmspy/data/__init__.old deleted file mode 100644 index a9ac6e8..0000000 --- a/nmspy/data/__init__.old +++ /dev/null @@ -1,23 +0,0 @@ -import importlib -import os -from typing import Union - -from logging import getLogger - -logger = getLogger() - -FUNC_OFFSETS: dict[str, Union[int, dict[str, int]]] - -# Get the binary hash -binary_hash = os.environ.get("NMS_BINARY_HASH") -if binary_hash: - offsets = importlib.import_module(f"nmspy.data.{binary_hash}.function_offsets") - if offsets.__binary_hash__ != binary_hash: - raise ImportError( - f"Binary hash in offsets file ({offsets.__binaryhash__}) doesn't " - f"match the one provided ({binary_hash})" - ) - FUNC_OFFSETS = offsets.FUNC_OFFSETS -else: - logger.info("No binary hash provided... There will be issues...") - FUNC_OFFSETS = {} diff --git a/nmspy/data/__init__.py b/nmspy/data/__init__.py index 7b29d20..9f02099 100644 --- a/nmspy/data/__init__.py +++ b/nmspy/data/__init__.py @@ -1 +1 @@ -from .function_offsets import FUNC_OFFSETS +from .functions.offsets import FUNC_OFFSETS diff --git a/nmspy/data/common.py b/nmspy/data/common.py index d204e58..56cbab5 100644 --- a/nmspy/data/common.py +++ b/nmspy/data/common.py @@ -4,7 +4,8 @@ import types from typing import Any, Union, TypeVar, Generic, Type, Annotated, Generator -from nmspy.data.cpptypes import std +from pymhf.extensions.cpptypes import std +from pymhf.core.memutils import map_struct # from nmspy.hashing import fnv_1a @@ -293,8 +294,6 @@ class cTkDynamicArray(ctypes.Structure, Generic[T]): @property def value(self) -> Any: - from nmspy.memutils import map_struct - if self.array == 0 or self.size == 0: # Empty lists are store with an empty pointer in mem. return [] @@ -528,6 +527,9 @@ def __setitem__(self, key: int, value: bool): def ones(self) -> list[int]: return [i for i in range(self._size) if self[i]] + def __eq__(self, other: "cTkBitArray"): + return self.ones() == other.ones() + def __str__(self): """ A string representation. This will be an "unwrapped" version of how it's actually represented in @@ -575,7 +577,7 @@ def __class_getitem__(cls: Type["cTkListNode"], key: tuple[Type[T1], Type[T2]]): _cls._template_type1 = _type1 _cls._template_type2 = _type2 _cls._fields_ = [ - ("value", std.pair[_type1, _type2]), + ("_value", std.pair[_type1, _type2]), ("hash", ctypes.c_uint64), ("_next", ctypes.c_uint64), ("_prev", ctypes.c_uint64), @@ -583,15 +585,19 @@ def __class_getitem__(cls: Type["cTkListNode"], key: tuple[Type[T1], Type[T2]]): ] return _cls + @property + def value(self): + return (self._value.first, self._value.second) + @property def next(self): - from nmspy.memutils import map_struct - return map_struct(self._next, cTkListNode[self._template_type1, self._template_type2]) + if self._next: + return map_struct(self._next, cTkListNode[self._template_type1, self._template_type2]) @property def prev(self): - from nmspy.memutils import map_struct - return map_struct(self._prev, cTkListNode[self._template_type1, self._template_type2]) + if self._prev: + return map_struct(self._prev, cTkListNode[self._template_type1, self._template_type2]) class cTkLinearHashTable(ctypes.Structure, Generic[T1, T2]): diff --git a/nmspy/data/cpptypes.py b/nmspy/data/cpptypes.py deleted file mode 100644 index 50c3248..0000000 --- a/nmspy/data/cpptypes.py +++ /dev/null @@ -1,216 +0,0 @@ -import ctypes -import types -from typing import Any, TYPE_CHECKING, TypeVar, Generic, Union, Type, Generator - - -if TYPE_CHECKING: - from ctypes import _Pointer - -CTYPES = Union[ctypes._SimpleCData, ctypes.Structure, ctypes._Pointer] - -T = TypeVar("T", bound=CTYPES) -N = TypeVar("N", bound=int) -T1 = TypeVar("T1", bound=CTYPES) -T2 = TypeVar("T2", bound=CTYPES) - -# Good source of info for these: -# # https://devblogs.microsoft.com/search?query=inside+stl&blog=%2Foldnewthing%2F - - - -class _array(ctypes.Structure, Generic[T, N]): - _elements: list[T] - def __class_getitem__(cls: Type["_array"], key: tuple[Type[T], int]): - _type, _size = key - _cls: Type[_array[T, N]] = types.new_class( - f"std::aray<{_type}, {_size}>", (cls,) - ) - _cls._fields_ = [ # type: ignore - ("_elements", _type * _size), - ] - return _cls - - def __len__(self) -> int: - return len(self._elements) - - def __getitem__(self, i: int) -> T: - return self._elements[i] - - def __setitem__(self, i: int, val: T): - self._elements[i] = val - - def __iter__(self) -> Generator[T, None, None]: - for i in range(len(self)): - yield self[i] - -class _vb_val(ctypes.Structure, Generic[T]): - # std::vector > _Myvec; - # unsigned __int64 _Mysize; - pass - - -class _vector(ctypes.Structure, Generic[T]): - _template_type: T - if TYPE_CHECKING: - _first: _Pointer[Any] - _last: _Pointer[Any] - _end: _Pointer[Any] - - def __class_getitem__(cls: Type["_vector"], type_: Type[T]): - _cls: Type[_vector[T]] = types.new_class( - f"std::vector<{type_}>", (cls,) - ) - _cls._template_type = type_ - _cls._fields_ = [ # type: ignore - ("_first", ctypes.POINTER(type_)), - ("_last", ctypes.POINTER(type_)), - ("_end", ctypes.POINTER(type_)), - ] - return _cls - - def __len__(self) -> int: - return ( - ctypes.addressof(self._last.contents) - - ctypes.addressof(self._first.contents) - ) // 0x8 # Assuming 64 bit architecture - - def __getitem__(self, i: int) -> T: - return self._first[i] - - def __iter__(self) -> Generator[T, None, None]: - for i in range(len(self)): - yield self[i] - - def clear(self): - """ - Empty the vector. This will remove all references to the elements. - USE THIS WISELY as is may cause issues. - """ - nullptr = ctypes.POINTER(self._template_type) - self._first = nullptr() - self._last = nullptr() - self._end = nullptr() - - -class _pair(ctypes.Structure, Generic[T1, T2]): - first: T1 - second: T2 - - def __class_getitem__(cls: Type["_pair"], key: tuple[Type[T1], Type[T2]]): - first, second = key - _cls: Type[_pair[T1, T2]] = types.new_class( - f"std::pair<{first}, {second}>", (cls,) - ) - _cls._fields_ = [ # type: ignore - ("first", first), - ("second", second), - ] - return _cls - -# 0x40 long -class _unordered_map(ctypes.Structure, Generic[T1, T2]): - - """ - std::_Hash< - std::_Umap_traits< - TkID<128>, - TkID<256>, - std::_Uhash_compare, - TkIDUnorderedMap::Hash128, - std::equal_to > - >, - TkSTLAllocatorShim< - std::pair const ,TkID<256> > - ,8, - -1> - , - 0> - > - +0x00 std::_Umap_traits,TkID<256>,std::_Uhash_compare,TkIDUnorderedMap::Hash128,std::equal_to > >,TkSTLAllocatorShim const ,TkID<256> >,8,-1>,0> _Traitsobj; - +0x08 std::list< - std::pair< - TkID<128> const , - TkID<256> - >, - TkSTLAllocatorShim< - std::pair< - TkID<128> const , - TkID<256> - >, - 8, - -1 - > - > _List; - +0x18 std::_Hash_vec const ,TkID<256> > > > >,8,-1> > _Vec; - +0x30 unsigned __int64 _Mask; - +0x38 unsigned __int64 _Maxidx; - """ - - pass - - -class _list_node(ctypes.Structure, Generic[T]): - """ - +0x00 std::_List_node const ,TkID<256> >,void *> *_Next; - +0x08 std::_List_node const ,TkID<256> >,void *> *_Prev; - +0x10 std::pair const ,TkID<128> > _Myval; - """ - if TYPE_CHECKING: - _next: _Pointer["_list_node"] - _myval: _pair - -_list_node._fields_ = [ - ("_next", ctypes.POINTER(_list_node)), # TEMP - ("_prev", ctypes.POINTER(_list_node)), # TEMP - ("_myval", _pair) - ] - - -# 0x10 long -class _list(ctypes.Structure, Generic[T]): - """ - +0x00 std::_List_node const ,TkID<256> >,void *> *_Myhead; - +0x08 unsigned __int64 _Mysize; - """ - pass - - -class std: - array = _array - vector = _vector - pair = _pair - list = _list - -# class STD: -# class ARRAY(): -# def __new__(cls, type_: Type[T], size: int) -> std.array: -# _cls = types.new_class( -# f"std::aray<{type_}, {size}>", (std.array,) -# ) -# _cls._fields_ = [ -# ("_elements", type_ * size), -# ] -# return _cls - - - -if __name__ == "__main__": - data = bytearray(b"\x01\x02\x00\x00\x07\x00\x00\x00") - pear = std.pair[ctypes.c_uint32, ctypes.c_int32] - k = pear.from_buffer(data) - print(k.first) - print(k.second) - harry = std.array[ctypes.c_ubyte, 6] - d = harry.from_buffer(data) - for i in d: - print(i) - print('setting') - d[3] = 9 - for i in d: - print(i) - - # print("AAAAAAAAA") - # arry = STD.ARRAY(ctypes.c_ubyte, 6) - # pp = arry.from_buffer(data) - # for i in pp: - # print(i) diff --git a/nmspy/data/engine.py b/nmspy/data/engine.py index a752d44..3739de0 100644 --- a/nmspy/data/engine.py +++ b/nmspy/data/engine.py @@ -5,7 +5,7 @@ import ctypes from typing import Optional -from nmspy.calling import call_function as _call_function +from pymhf.core.calling import call_function from nmspy.data.common import TkHandle, Vector3f, cTkMatrix34 @@ -15,7 +15,7 @@ def GetNodeAbsoluteTransMatrix( ) -> cTkMatrix34: if mat is None: mat = cTkMatrix34() - _call_function( + call_function( "Engine::GetNodeAbsoluteTransMatrix", node.lookupInt, ctypes.addressof(mat) @@ -32,7 +32,7 @@ def GetNodeTransMats( rel_mat = cTkMatrix34() if abs_mat is None: abs_mat = cTkMatrix34() - _call_function( + call_function( "Engine::GetNodeTransMats", node.lookupInt, ctypes.addressof(rel_mat), @@ -43,19 +43,19 @@ def GetNodeTransMats( def RequestRemoveNode(node: TkHandle) -> None: - _call_function("Engine::RequestRemoveNode", node.lookupInt) + call_function("Engine::RequestRemoveNode", node.lookupInt) def SetNodeActivation(node: TkHandle, active: bool): - _call_function("Engine::SetNodeActivation", node.lookupInt, active) + call_function("Engine::SetNodeActivation", node.lookupInt, active) def SetNodeActivationRecursive(node: TkHandle, active: bool): - _call_function("Engine::SetNodeActivationRecursive", node.lookupInt, active) + call_function("Engine::SetNodeActivationRecursive", node.lookupInt, active) def SetNodeTransMat(node: TkHandle, mat: cTkMatrix34): - _call_function( + call_function( "Engine::SetNodeTransMat", node.lookupInt, ctypes.addressof(mat), @@ -63,7 +63,7 @@ def SetNodeTransMat(node: TkHandle, mat: cTkMatrix34): def ShiftAllTransformsForNode(node: TkHandle, shift: Vector3f) -> None: - return _call_function( + return call_function( "Engine::ShiftAllTransformsForNode", node.lookupInt, ctypes.addressof(shift) diff --git a/nmspy/data/functions/__init__.py b/nmspy/data/functions/__init__.py new file mode 100644 index 0000000..7afa051 --- /dev/null +++ b/nmspy/data/functions/__init__.py @@ -0,0 +1,2 @@ +from .offsets import FUNC_OFFSETS # noqa +from .call_sigs import FUNC_CALL_SIGS # noqa diff --git a/nmspy/data/function_call_sigs.py b/nmspy/data/functions/call_sigs.py similarity index 99% rename from nmspy/data/function_call_sigs.py rename to nmspy/data/functions/call_sigs.py index 99bc76f..58d597f 100644 --- a/nmspy/data/function_call_sigs.py +++ b/nmspy/data/functions/call_sigs.py @@ -2,8 +2,9 @@ import ctypes.wintypes as wintypes from typing import Union -from nmspy._types import FUNCDEF +from pymhf.core._types import FUNCDEF import nmspy.data.common as common +import nmspy.data.structs as structs FUNC_CALL_SIGS: dict[str, Union[FUNCDEF, dict[str, FUNCDEF]]] = { "packed_store_active___un_3C_unf_3E_vyf": FUNCDEF( @@ -96833,6 +96834,10 @@ ctypes.c_ubyte, # bool ] ), + "cTkMetadataCache::GetInstance": FUNCDEF( + restype=ctypes.c_ulonglong, + argtypes=[], + ), "cTkMetadataCache::Load": FUNCDEF( restype=ctypes.c_ulonglong, # cGcBiomeListPerStarType * argtypes=[ @@ -291845,7 +291850,7 @@ "cTkFileSystem::Construct": FUNCDEF( restype=None, # void argtypes=[ - ctypes.c_ulonglong, # cTkFileSystem * + ctypes.POINTER(structs.cTkFileSystem), # cTkFileSystem * ctypes.c_int32, # int ] ), diff --git a/nmspy/data/function_hooks.py b/nmspy/data/functions/hooks.py similarity index 99% rename from nmspy/data/function_hooks.py rename to nmspy/data/functions/hooks.py index 00b7bd3..f41f79d 100644 --- a/nmspy/data/function_hooks.py +++ b/nmspy/data/functions/hooks.py @@ -1,5 +1,5 @@ -from nmspy.hooking import HookFactory -import nmspy.data.function_overloads as overloads +from pymhf.core.hooking import HookFactory +import nmspy.data.functions.overloads as overloads class packed_store_active___un_3C_unf_3E_vyf(HookFactory): _name = "packed_store_active___un_3C_unf_3E_vyf" diff --git a/nmspy/data/function_offsets.py b/nmspy/data/functions/offsets.py similarity index 99% rename from nmspy/data/function_offsets.py rename to nmspy/data/functions/offsets.py index 600187d..5030e44 100644 --- a/nmspy/data/function_offsets.py +++ b/nmspy/data/functions/offsets.py @@ -13812,6 +13812,7 @@ "cTkMetadataCache::Load": 0x11BA160, "cTkMetadataCache::Load": 0x11BA5B0, "cTkMetadataCache::Load": 0x11BAA00, + "cTkMetadataCache::GetInstance": 0x2813170, "cTkMetadataCache::Load": 0x11BAE50, "cTkMetadataCache::GetInstance": 0x11BB2A0, "cTkMetadataCache::Load": 0x11BB380, diff --git a/nmspy/data/function_overloads.py b/nmspy/data/functions/overloads.py similarity index 100% rename from nmspy/data/function_overloads.py rename to nmspy/data/functions/overloads.py diff --git a/nmspy/data/structs.py b/nmspy/data/structs.py index 32db76e..a84ca4d 100644 --- a/nmspy/data/structs.py +++ b/nmspy/data/structs.py @@ -12,13 +12,11 @@ import ctypes.wintypes from nmspy.data import common, enums as nms_enums -from nmspy.calling import call_function +from pymhf.core.calling import call_function # from nmspy.data.types import core, simple -from nmspy.data.cpptypes import std -from nmspy.memutils import map_struct -from nmspy.utils import safe_assign_enum - - +from pymhf.extensions.cpptypes import std +from pymhf.core.memutils import map_struct +from pymhf.core.utils import safe_assign_enum class cTkMetaDataXMLFunctionLookup(ctypes.Structure): @@ -6968,6 +6966,319 @@ class SaveThreadData(ctypes.Structure): ] +class cGcPlayerSpawnStateData(ctypes.Structure): + playerPositionInSystem: common.Vector4f + playerTransformAt: common.Vector4f + playerDeathRespawnPositionInSystem: common.Vector4f + playerDeathRespawnTransformAt: common.Vector4f + shipPositionInSystem: common.Vector4f + shipTransformAt: common.Vector4f + _meLastKnownPlayerState: ctypes.c_int32 + freighterPositionInSystem: common.Vector4f + freighterTransformAt: common.Vector4f + freighterTransformUp: common.Vector4f + abandonedFreighterPositionInSystem: common.Vector4f + abandonedFreighterTransformAt: common.Vector4f + abandonedFreighterTransformUp: common.Vector4f + +cGcPlayerSpawnStateData._fields_ = [ + ("playerPositionInSystem", common.Vector4f), + ("playerTransformAt", common.Vector4f), + ("playerDeathRespawnPositionInSystem", common.Vector4f), + ("playerDeathRespawnTransformAt", common.Vector4f), + ("shipPositionInSystem", common.Vector4f), + ("shipTransformAt", common.Vector4f), + ("_meLastKnownPlayerState", ctypes.c_int32), + ("padding0x64", ctypes.c_ubyte * 0xC), + ("freighterPositionInSystem", common.Vector4f), + ("freighterTransformAt", common.Vector4f), + ("freighterTransformUp", common.Vector4f), + ("abandonedFreighterPositionInSystem", common.Vector4f), + ("abandonedFreighterTransformAt", common.Vector4f), + ("abandonedFreighterTransformUp", common.Vector4f), +] + + +class cGcNetworkBufferHash_vtbl(ctypes.Structure): + cGcNetworkBufferHash_dtor_0: bytes + GetHashValue: bytes + GetHashTimestamp: bytes + GenerateHashValue: bytes + OnHashOffsetChanged: bytes + +cGcNetworkBufferHash_vtbl._fields_ = [ + ("cGcNetworkBufferHash_dtor_0", ctypes.c_ubyte * 0x8), + ("GetHashValue", ctypes.c_ubyte * 0x8), + ("GetHashTimestamp", ctypes.c_ubyte * 0x8), + ("GenerateHashValue", ctypes.c_ubyte * 0x8), + ("OnHashOffsetChanged", ctypes.c_ubyte * 0x8), +] + + +class sHashValue(ctypes.Structure): + hashValue: int + timeStamp: int + +sHashValue._fields_ = [ + ("hashValue", ctypes.c_uint16), + ("timeStamp", ctypes.c_int16), +] + + +class cGcNetworkBufferHash(ctypes.Structure): + __vftable: _Pointer["cGcNetworkBufferHash_vtbl"] + kiChunkSize: int + chunkHashOffset: int + chunkHashValues: std.vector[sHashValue] + timestamp: int + initialised: bool + +cGcNetworkBufferHash._fields_ = [ + ("__vftable", ctypes.POINTER(cGcNetworkBufferHash_vtbl)), + ("kiChunkSize", ctypes.c_int32), + ("chunkHashOffset", ctypes.c_int32), + ("chunkHashValues", std.vector[sHashValue]), + ("timestamp", ctypes.c_uint64), + ("initialised", ctypes.c_ubyte), + ("_padding", ctypes.c_ubyte * 0x7) +] + + +class cGcNetworkSynchronisedBuffer(cGcNetworkBufferHash, ctypes.Structure): + pass + +cGcNetworkSynchronisedBuffer._fields_ = [] + + +class cGcBaseBuildingPersistentBuffer(cGcNetworkSynchronisedBuffer, ctypes.Structure): + class BaseBuildingPersistentData(ctypes.Structure): + pass + + baseBuildingObjects: std.vector[cGcBaseBuildingPersistentBuffer.BaseBuildingPersistentData] + currentPlanetObjects: bytes + currentAddress: int + debugPositions: bool + networkOwnerId: bytes + bufferIndex: int + +cGcBaseBuildingPersistentBuffer._fields_ = [ + ("baseBuildingObjects", std.vector[cGcBaseBuildingPersistentBuffer.BaseBuildingPersistentData]), + ("currentPlanetObjects", ctypes.c_ubyte * 0x40), + ("currentAddress", ctypes.c_uint64), + ("debugPositions", ctypes.c_ubyte), + ("networkOwnerId", ctypes.c_ubyte * 0x40), + ("padding0xD9", ctypes.c_ubyte * 0x3), + ("bufferIndex", ctypes.c_uint32), +] + + +class cGcBaseBuildingGlobalBuffer(ctypes.Structure): + persistentBuffers: list[cGcBaseBuildingPersistentBuffer] + +cGcBaseBuildingGlobalBuffer._fields_ = [ + ("persistentBuffers", cGcBaseBuildingPersistentBuffer * 0x20), +] + + +class cGcPersistentInteractionsManager(ctypes.Structure): + baseBuildingBuffer: "cGcBaseBuildingGlobalBuffer" + persistentBaseBuffers: std.vector[cGcPlayerBasePersistentBuffer] + distressSignalBuffer: "cGcPersistentInteractionBuffer" + crateBuffer: "cGcPersistentInteractionBuffer" + destructableBuffer: "cGcPersistentInteractionBuffer" + costBuffer: "cGcPersistentInteractionBuffer" + buildingBuffer: "cGcPersistentInteractionBuffer" + creatureBuffer: "cGcPersistentInteractionBuffer" + personalBuffer: "cGcPersistentInteractionBuffer" + fireteamSyncBuffer: "cGcPersistentInteractionBuffer" + terrainEditBuffer: "cGcTerrainEditsPersistentBuffer" + tradingSupplyBuffer: "cGcTradingSupplyBuffer" + maintenanceBuffer: "cGcMaintenanceBuffer" + personalMaintenanceBuffer: "cGcMaintenanceBuffer" + visitedSystemsBuffer: "cGcVisitedSystemsBuffer" + +cGcPersistentInteractionsManager._fields_ = [ + ("baseBuildingBuffer", cGcBaseBuildingGlobalBuffer), + ("persistentBaseBuffers", std.vector[cGcPlayerBasePersistentBuffer]), + ("padding0x1C18", ctypes.c_ubyte * 0x8), + ("distressSignalBuffer", cGcPersistentInteractionBuffer), + ("crateBuffer", cGcPersistentInteractionBuffer), + ("destructableBuffer", cGcPersistentInteractionBuffer), + ("costBuffer", cGcPersistentInteractionBuffer), + ("buildingBuffer", cGcPersistentInteractionBuffer), + ("creatureBuffer", cGcPersistentInteractionBuffer), + ("personalBuffer", cGcPersistentInteractionBuffer), + ("fireteamSyncBuffer", cGcPersistentInteractionBuffer), + ("terrainEditBuffer", cGcTerrainEditsPersistentBuffer), + ("tradingSupplyBuffer", cGcTradingSupplyBuffer), + ("maintenanceBuffer", cGcMaintenanceBuffer), + ("personalMaintenanceBuffer", cGcMaintenanceBuffer), + ("visitedSystemsBuffer", cGcVisitedSystemsBuffer), +] + + +class cGcGameState(ctypes.Structure): + class SaveThreadData(ctypes.Structure): + gameState: _Pointer["cGcGameState"] + _meSaveReason: ctypes.c_int32 + showMessage: bool + fullSave: bool + playerStateDataToSave: bytes # cGcPlayerStateData + saveMaskFlagsToRemove: int + + SaveThreadData._fields_ = [ + ("gameState", ctypes.c_uint64), + ("_meSaveReason", ctypes.c_int32), + ("showMessage", ctypes.c_ubyte), + ("fullSave", ctypes.c_ubyte), + ("padding0xE", ctypes.c_ubyte * 0x2), + ("playerStateDataToSave", ctypes.c_ubyte * 496960), + ("saveMaskFlagsToRemove", ctypes.c_uint32), + ] + + RRIT: _Pointer[ctypes.c_uint32] + RRCE: _Pointer[ctypes.c_uint32] + RRBB: _Pointer[ctypes.c_uint32] + gameStateGroupNode: common.TkHandle + playerState: "cGcPlayerState" + savedSpawnState: "cGcPlayerSpawnStateData" + playerShipOwnership: "cGcPlayerShipOwnership" + playerVehicleOwnership: "cGcPlayerVehicleOwnership" + playerCreatureOwnership: "cGcPlayerCreatureOwnership" + playerMultitoolOwnership: "cGcPlayerMultitoolOwnership" + playerFreighterOwnership: bytes + playerFleetManager: bytes + playerSquadronOwnership: "cGcPlayerSquadronOwnership" + gameKnowledge: "cGcGameKnowledge" + # discoveryManager: "cGcDiscoveryManager" + # wonderManager: "cGcWonderManager" + # graveManager: "cGcGraveManager" + # msgBeaconManager: "cGcMsgBeaconManager" + # playerDiscoveryHelper: "cGcPlayerDiscoveryHelper" + # statsManager: "cGcStatsManager" + # telemetryManager: "cGcTelemetryManager" + # userSettings: "cGcUserSettings" + # userSeenItemsState: "cGcUserSeenItemsState" + # difficultySettings: "cGcDifficultySettings" + # mPMissionTracker: "cGcMPMissionTracker" + # entitlementManager: "cGcEntitlementManager" + # planetMappingManager: "cGcPlanetMappingManager" + # settlementStateManager: "cGcSettlementStateManager" + # saveStateDisplayTime: float + # _meSaveStateLastResult: ctypes.c_int32 + # lastSaveOperationTimestamp: int + # restoreRequested: bool + # _meRestoreType: ctypes.c_int32 + # savedInteractionsManager: "cGcPersistentInteractionsManager" # 1906688 + # pendingProgressWrite: bool + # delayedMicroSave: bool + # pendingDifficultySave: bool + # restartAllInactiveSeasonalMissions: bool + # _mePatchVersion: ctypes.c_int32 + # patchAffectsLoading: bool + # warpTunnelRes: common.cTkSmartResHandle + # teleportTunnelRes: common.cTkSmartResHandle + # blackHoleTunnelRes: common.cTkSmartResHandle + # portalTunnelRes: common.cTkSmartResHandle + # placeMarkerRes: common.cTkSmartResHandle + # inventoryStoreBalance: "cGcInventoryStoreBalance" + # playerRichPresence: "cGcRichPresence" + # singleMultiPositionInSync: bool + # saveCompletedThisFrame: bool + # startedSaveTime: float + # saveThreadData: _Pointer["cGcGameState.SaveThreadData"] + # saveThreadId: int + # saveRequestNewEvent: int + # saveThreadExitedEvent: int + # saveThreadRequestExit: bool + # pendingAsyncSaveRequest: bool + # _mePendingAsyncSaveRequestReason: ctypes.c_int32 + # pendingAsyncSaveRequestShowMessage: bool + # upgradeMessageFilterTimer: float + # networkClientLoad: bool + # lastDeathTriggeredSlotSelect: bool + # waitingForSeasonalGameMode: bool + # cloudSaveManager: bytes + +cGcGameState._fields_ = [ + ("RRIT", ctypes.POINTER(ctypes.c_int32)), + ("RRCE", ctypes.POINTER(ctypes.c_int32)), + ("RRBB", ctypes.POINTER(ctypes.c_int32)), + ("gameStateGroupNode", common.TkHandle), + ("padding0x1C", ctypes.c_ubyte * 0x4), + ("playerState", cGcPlayerState), + ("savedSpawnState", cGcPlayerSpawnStateData), + ("playerShipOwnership", cGcPlayerShipOwnership), + ("playerVehicleOwnership", cGcPlayerVehicleOwnership), + ("playerCreatureOwnership", cGcPlayerCreatureOwnership), + ("playerMultitoolOwnership", cGcPlayerMultitoolOwnership), + ("playerFreighterOwnership", ctypes.c_ubyte * 0x3940), + ("playerFleetManager", ctypes.c_ubyte * 0x10F00), + ("playerSquadronOwnership", cGcPlayerSquadronOwnership), + ("gameKnowledge", cGcGameKnowledge), + # ("discoveryManager", cGcDiscoveryManager), + # ("padding0x1977D8", ctypes.c_ubyte * 0x8), + # ("wonderManager", cGcWonderManager), + # ("graveManager", cGcGraveManager), + # ("msgBeaconManager", cGcMsgBeaconManager), + # ("playerDiscoveryHelper", cGcPlayerDiscoveryHelper), + # ("statsManager", cGcStatsManager), + # ("telemetryManager", cGcTelemetryManager), + # ("padding0x1CCE08", ctypes.c_ubyte * 0x8), + # ("userSettings", cGcUserSettings), + # ("userSeenItemsState", cGcUserSeenItemsState), + # ("difficultySettings", cGcDifficultySettings), + # ("mPMissionTracker", cGcMPMissionTracker), + # ("entitlementManager", cGcEntitlementManager), + # ("planetMappingManager", cGcPlanetMappingManager), + # ("settlementStateManager", cGcSettlementStateManager), + # ("saveStateDisplayTime", ctypes.c_float), + # ("_meSaveStateLastResult", ctypes.c_int32), + # ("lastSaveOperationTimestamp", ctypes.c_uint64), + # ("restoreRequested", ctypes.c_ubyte), + # ("padding0x1D17F1", ctypes.c_ubyte * 0x3), + # ("_meRestoreType", ctypes.c_int32), + # ("padding0x1D17F8", ctypes.c_ubyte * 0x8), + # ("savedInteractionsManager", cGcPersistentInteractionsManager), + # ("pendingProgressWrite", ctypes.c_ubyte), + # ("delayedMicroSave", ctypes.c_ubyte), + # ("pendingDifficultySave", ctypes.c_ubyte), + # ("restartAllInactiveSeasonalMissions", ctypes.c_ubyte), + # ("_mePatchVersion", ctypes.c_int32), + # ("patchAffectsLoading", ctypes.c_ubyte), + # ("padding0x43C209", ctypes.c_ubyte * 0x3), + # ("warpTunnelRes", common.cTkSmartResHandle), + # ("teleportTunnelRes", common.cTkSmartResHandle), + # ("blackHoleTunnelRes", common.cTkSmartResHandle), + # ("portalTunnelRes", common.cTkSmartResHandle), + # ("placeMarkerRes", common.cTkSmartResHandle), + # ("inventoryStoreBalance", cGcInventoryStoreBalance), + # ("padding0x43C234", ctypes.c_ubyte * 0x4), + # ("playerRichPresence", cGcRichPresence), + # ("singleMultiPositionInSync", ctypes.c_ubyte), + # ("saveCompletedThisFrame", ctypes.c_ubyte), + # ("padding0x43C24A", ctypes.c_ubyte * 0x2), + # ("startedSaveTime", ctypes.c_float), + # ("saveThreadData", ctypes.POINTER(cGcGameState.SaveThreadData)), + # ("saveThreadId", ctypes.c_uint32), + # ("padding0x43C25C", ctypes.c_ubyte * 0x4), + # ("saveRequestNewEvent", ctypes.c_void_p), + # ("saveThreadExitedEvent", ctypes.c_void_p), + # ("saveThreadRequestExit", ctypes.c_ubyte), + # ("pendingAsyncSaveRequest", ctypes.c_ubyte), + # ("padding0x43C272", ctypes.c_ubyte * 0x2), + # ("_mePendingAsyncSaveRequestReason", ctypes.c_int32), + # ("pendingAsyncSaveRequestShowMessage", ctypes.c_ubyte), + # ("padding0x43C279", ctypes.c_ubyte * 0x3), + # ("upgradeMessageFilterTimer", ctypes.c_float), + # ("networkClientLoad", ctypes.c_ubyte), + # ("lastDeathTriggeredSlotSelect", ctypes.c_ubyte), + # ("waitingForSeasonalGameMode", ctypes.c_ubyte), + # ("padding0x43C283", ctypes.c_ubyte * 0x5), + # ("cloudSaveManager", ctypes.c_ubyte * 0x2D8), +] + + class cGcApplication(cTkFSM, ctypes.Structure): """The Main Application structure""" class Data(ctypes.Structure): diff --git a/nmspy/data/types/core.py b/nmspy/data/types/core.py deleted file mode 100644 index 99ad389..0000000 --- a/nmspy/data/types/core.py +++ /dev/null @@ -1,17 +0,0 @@ -# A collection of core internal types which, whilst you could consider common, -# They are specific to NMS, rather than beings "common" types such as colours. - -import ctypes - -class cTkSmartResHandle(ctypes.Structure): - _fields_ = [ - ("InternalHandle", ctypes.c_int32), - ] - InternalHandle: int - - -class TkHandle(ctypes.Structure): - _fields_ = [ - ("InternalHandle", ctypes.c_int32), - ] - InternalHandle: int diff --git a/nmspy/data/types/simple.py b/nmspy/data/types/simple.py deleted file mode 100644 index 7874716..0000000 --- a/nmspy/data/types/simple.py +++ /dev/null @@ -1,15 +0,0 @@ -# A collection of types which are "simple" -# They have no dependencies on other types but are still compound types with -# multiple fields. - -import ctypes - -class TkAudioID(ctypes.Structure): - _fields_ = [ - ("mpacName", ctypes.c_char_p), - ("muID", ctypes.c_uint32), - ("mbValid", ctypes.c_ubyte), - ] - mpacName: bytes - muID: int - mbValid: bool diff --git a/nmspy/decorators.py b/nmspy/decorators.py new file mode 100644 index 0000000..02f2623 --- /dev/null +++ b/nmspy/decorators.py @@ -0,0 +1,33 @@ +from pymhf.core._types import DetourTime + + +class main_loop: + @staticmethod + def before(func): + func._custom_trigger = "MAIN_LOOP" + func._hook_time = DetourTime.BEFORE + return func + + @staticmethod + def after(func): + func._custom_trigger = "MAIN_LOOP" + func._hook_time = DetourTime.AFTER + return func + + +def on_fully_booted(func): + """ + Configure the decorated function to be run once the game is considered + "fully booted". + This occurs when the games' internal state first changes to "mode selector" + (ie. just before the game mode selection screen appears). + """ + func._custom_trigger = "MODESELECTOR" + return func + + +def on_state_change(state): + def _inner(func): + func._custom_trigger = state + return func + return _inner \ No newline at end of file diff --git a/nmspy/errors.py b/nmspy/errors.py deleted file mode 100644 index e0d4929..0000000 --- a/nmspy/errors.py +++ /dev/null @@ -1,12 +0,0 @@ -class UnknownFunctionError(Exception): - pass - - -class HookError(Exception): - def __init__(self, status, *args, **kwargs): - super().__init__(*args, **kwargs) - self.status = status - - -class NoSaveError(Exception): - pass diff --git a/nmspy/extractors/metaclasses.py b/nmspy/extractors/metaclasses.py deleted file mode 100644 index 37c8c2c..0000000 --- a/nmspy/extractors/metaclasses.py +++ /dev/null @@ -1,52 +0,0 @@ -import nmspy.data.structs as nms_structs -from nmspy.memutils import map_struct - - -def extract_members(meta: nms_structs.cTkMetaDataClass, metadata_registry: dict): - member_data = [] - for member in meta.members: - member_data.append( - { - "name": member.name, - "type": member.type.name, - "innerType": member.innerType.name, - "size": member.size, - "count": member.count, - "offset": member.offset, - } - ) - if member.innerType == nms_structs.cTkMetaDataMember.eType.Class: - innerTypeClass = map_struct(member.classMetadata, nms_structs.cTkMetaDataClass) - member_data[-1]["innerType"] = innerTypeClass.name - if member.type == nms_structs.cTkMetaDataMember.eType.Class: - typeClass = map_struct(member.classMetadata, nms_structs.cTkMetaDataClass) - member_data[-1]["type"] = typeClass.name - if member.numEnumMembers != 0: - member_data[-1]["enumLookup"] = [(x.name, x.value) for x in member.enumLookup] - member_data[-1]["_enum_offset"] = member._enumLookup - metadata_registry[meta.name] = { - "nameHash": meta.nameHash, - "templateHash": meta.templateHash, - "members": member_data, - "is_enum": len(member_data) == 1 and "_enum_offset" in member_data[0] - } - - -def fixup_metadata_enums(metadata_registry: dict): - """ Loop over the metadata registry twice. First time collating all the - enum types, and then second time replacing all the in-line enums with types. - """ - enums = {} - # Create enum mapping. - for name, data in metadata_registry.items(): - if data.get("is_enum", False): - enums[data["members"][0]["_enum_offset"]] = name - # Now, go over the non-enum metaclasses and loop over their members. - # Any member which has an enum offset in the above mapping will be replaced. - for metaclass in metadata_registry.values(): - if not metaclass.get("is_enum", False): - for member in metaclass["members"]: - if enum_name := enums.get(member.get("_enum_offset")): - del member["_enum_offset"] - del member["enumLookup"] - member["enumType"] = enum_name diff --git a/nmspy/hooking.py b/nmspy/hooking.py deleted file mode 100644 index ac0c3a3..0000000 --- a/nmspy/hooking.py +++ /dev/null @@ -1,723 +0,0 @@ -import ast -from collections.abc import Callable -from ctypes import CFUNCTYPE -from _ctypes import CFuncPtr -from enum import Enum -from functools import wraps, update_wrapper, partial -import inspect -import logging -from typing import Any, Optional, Type, Union -import traceback - -import cyminhook - -import nmspy._internal as _internal -from nmspy.data import FUNC_OFFSETS -from nmspy.data.function_call_sigs import FUNC_CALL_SIGS -from nmspy.errors import UnknownFunctionError -from nmspy.memutils import find_bytes -from nmspy._types import NMSHook, FUNCDEF -from nmspy.caching import function_cache, pattern_cache -from nmspy.states import StateEnum - -hook_logger = logging.getLogger("HookManager") - - -# Currently unused, but can maybe figure out how to utilise it. -# It currently doesn't work I think because we subclass from the cyminhook class -# which is cdef'd, and I guess ast falls over trying to get the actual source... -# Can possible use annotations and inspect the return type (return `None` -# explictly eg.) to give some hints. Maybe just raise warnings etc. -def _detour_is_valid(f): - for node in ast.walk(ast.parse(inspect.getsource(f))): - if isinstance(node, ast.Return): - return True - return False - - -class DetourTime(Enum): - NONE = 0 - BEFORE = 1 - AFTER = 2 - - -ORIGINAL_MAPPING = dict() - - -class _NMSHook(cyminhook.MinHook): - original: Callable[..., Any] - target: int - detour: Callable[..., Any] - signature: CFuncPtr - _name: str - _should_enable: bool - _invalid: bool = False - _call_func: Optional[FUNCDEF] - _is_one_shot: bool = False - - def __init__( - self, - detour: Callable[..., Any], - *, - name: Optional[str] = None, - offset: Optional[int] = None, - call_func: Optional[FUNCDEF] = None, - detour_time: DetourTime = DetourTime.NONE, - overload: Optional[str] = None, - should_enable: bool = True, - ): - self._mod = None - self._should_enable = should_enable - self._offset = offset - self._call_func = call_func - self._original_detour = detour - self.detour_time = detour_time - self.overload = overload - self.state = None - if name is not None: - self._name = name - else: - raise ValueError( - "name cannot be none. This should only happen if this class was" - " instantiated manually which you should NOT be doing." - ) - self._initialised = False - if self._should_enable: - self._init() - - def _init(self): - """ Actually initialise all the data. This is defined separately so that - any function which is marked with @disable doesn't get initialised. """ - if not self._offset and not self._call_func: - _offset = FUNC_OFFSETS.get(self._name) - if _offset is not None: - if isinstance(_offset, int): - self.target = _internal.BASE_ADDRESS + _offset - else: - # This is an overload - if self.overload is not None: - self.target = _internal.BASE_ADDRESS + _offset[self.overload] - else: - # Need to fallback on something. Raise a warning that no - # overload was defined and that it will fallback to the - # first entry in the dict. - first = list(_offset.items())[0] - hook_logger.warning( - f"No overload was provided for {self._name}. " - ) - hook_logger.warning( - f"Falling back to the first overload ({first[0]})") - self.target = _internal.BASE_ADDRESS + first[1] - else: - hook_logger.error(f"{self._name} has no known address (base: 0x{_internal.BASE_ADDRESS:X})") - self._invalid = True - else: - # This is a "manual" hook, insofar as the offset and function - # argument info is all provided manually. - if not self._offset and self._call_func: - raise ValueError("Both offset and call_func MUST be provided if defining hooks manually") - self.target = _internal.BASE_ADDRESS + self._offset - self.signature = CFUNCTYPE(self._call_func.restype, *self._call_func.argtypes) - self._initialised = True - return - if self._name in FUNC_CALL_SIGS: - sig = FUNC_CALL_SIGS[self._name] - if isinstance(sig, FUNCDEF): - self.signature = CFUNCTYPE(sig.restype, *sig.argtypes) - hook_logger.debug(f"Function {self._name} return type: {sig.restype} args: {sig.argtypes}") - if self.overload is not None: - hook_logger.warning( - f"An overload was provided for {self._name} but no overloaded" - " function definitions exist. This function may fail." - ) - else: - # Look up the overload: - if (osig := sig.get(self.overload)) is not None: # type: ignore - self.signature = CFUNCTYPE(osig.restype, *osig.argtypes) - hook_logger.debug(f"Function {self._name} return type: {osig.restype} args: {osig.argtypes}") - else: - # Need to fallback on something. Raise a warning that no - # overload was defined and that it will fallback to the - # first entry in the dict. - first = list(sig.items())[0] - hook_logger.warning( - f"No function arguments overload was provided for {self._name}. " - ) - hook_logger.warning( - f"Falling back to the first overload ({first[0]})") - self.signature = CFUNCTYPE(first[1].restype, *first[1].argtypes) - else: - hook_logger.error(f"{self._name} has no known call signature") - self._invalid = True - self._initialised = True - - def bind(self, cls = None) -> bool: - """ Actually initialise the base class. Returns whether the hook is bound. """ - # Associate the mod now whether or not we get one. This way if the - # function is actually disabled, we may enable it later with no issues. - self._mod = cls - if not self._should_enable: - return False - - if cls is not None: - self._detour_func = partial(self._original_detour, cls) - else: - self._detour_func = self._original_detour - - # Check the detour time and create an approriately "wrapped" function - # which will run instead of the original function. - if self.detour_time == DetourTime.BEFORE: - self.detour = self._before_detour - elif self.detour_time == DetourTime.AFTER: - # For an "after" hook, we need to determine if "_result_" is in the - # function arguments. - func_sig = inspect.signature(self._original_detour) - if "_result_" in func_sig.parameters.keys(): - self.detour = self._after_detour_with_return - else: - self.detour = self._after_detour - else: - self.detour = self._normal_detour - - # Check to see if it's a one shot and wrap this detour one more to be - # one-shot. - if self._is_one_shot: - self._non_oneshot_detour = self.detour - self.detour = self._oneshot_detour - - try: - super().__init__(signature=self.signature, target=self.target) - except cyminhook._cyminhook.Error as e: # type: ignore - if e.status == cyminhook._cyminhook.Status.MH_ERROR_ALREADY_CREATED: - # In this case, we'll get the already created hook, and add it - # to a list so that we can construct a compound hook. - hook_logger.info("ALREADY CREATED!!!") - hook_logger.error(f"Failed to initialize hook {self._name}") - hook_logger.error(e) - hook_logger.error(e.status.name[3:].replace("_", " ")) - self.state = "failed" - return False - ORIGINAL_MAPPING[self._name] = self.original - self.state = "initialized" - if not hasattr(self, "_should_enable"): - self._should_enable = True - return True - - def __call__(self, *args, **kwargs): - return self.detour(*args, **kwargs) - - def __get__(self, instance, owner=None): - # Pass the instance through to the __call__ function so that we can use - # this decorator on a method of a class. - return partial(self.__call__, instance) - - def _oneshot_detour(self, *args): - ret = self._non_oneshot_detour(*args) - self.disable() - hook_logger.debug(f"Disabling a one-shot hook ({self._name})") - return ret - - def _normal_detour(self, *args): - # Run a detour as provided by the user. - return self._detour_func(*args) - - def _before_detour(self, *args): - # A detour to be run instead of the normal one when we are registered as - # a "before" hook. - ret = self._detour_func(*args) - # If we get a return value that is not None, then pass it through. - if ret is not None: - return self.original(*ret) - else: - return self.original(*args) - - def _after_detour(self, *args): - # Detour which will be run instead of the normal one. - # The original function will be run before. - ret = self.original(*args) - self._detour_func(*args) - return ret - - def _after_detour_with_return(self, *args): - # Detour which will be run instead of the normal one. - # The original function will be run before. - # The return value of the original function will be passed in. - ret = self.original(*args) - new_ret = self._detour_func(*args, _result_=ret) - if new_ret is not None: - return new_ret - return ret - - def close(self): - super().close() - self.state = "closed" - - def enable(self): - super().enable() - self.state = "enabled" - self._should_enable = True - - def disable(self): - if self.state == "enabled": - super().disable() - self.state = "disabled" - - @property - def offset(self): - return self.target - _internal.BASE_ADDRESS - - -class HookFactory: - _name: str - _templates: Optional[tuple[str]] = None - _overload: Optional[str] = None - - def __init__(self, func): - self.func = func - self._after = False - self._before = False - update_wrapper(self, func) - - def __new__( - cls, - detour: Callable[..., Any], - detour_time: DetourTime = DetourTime.NONE - ) -> _NMSHook: - should_enable = getattr(detour, "_should_enable", True) - return _NMSHook( - detour, - name=cls._name, - detour_time=detour_time, - overload=cls._overload, - should_enable=should_enable, - ) - - def __class_getitem__(cls: type["HookFactory"], key: Union[tuple[Any], Any]): - """ Create a new instance of the class with the data in the _templates - field used to generate the correct _name property based on the type - pass in to the __getitem__ lookup.""" - if cls._templates is not None and cls._name is not None: - if isinstance(key, tuple): - fmt_key = dict(zip(cls._templates, [x.__name__ for x in key])) - else: - fmt_key = {cls._templates[0]: key.__name__} - cls._name = cls._name.format(**fmt_key) - return cls - - @classmethod - def overload(cls, overload_args): - # TODO: Improve type hinting and possible make this have a generic arg - # arg type to simplify the logic... - raise NotImplementedError - - @classmethod - def original(cls, *args): - """ Call the original function with the given arguments. """ - return ORIGINAL_MAPPING[cls._name](*args) - - @classmethod - def before(cls, detour: Callable[..., Any]) -> _NMSHook: - """ - Run the decorated function before the original function is run. - This function in general should not call the original function, and does - not need to return anything. - If you wish to modify the values being passed into the original - function, return a tuple which has values in the same order as the - original function. - """ - should_enable = getattr(detour, "_should_enable", True) - return _NMSHook( - detour, - name=cls._name, - detour_time=DetourTime.BEFORE, - overload=cls._overload, - should_enable=should_enable, - ) - - @classmethod - def after(cls, detour: Callable[..., Any]) -> _NMSHook: - """ Mark the hook to be only run after the original function. - This function may have the keyword argument `_result_`. If it does, then - this value will be the result of the call of the original function - """ - should_enable = getattr(detour, "_should_enable", True) - return _NMSHook( - detour, - name=cls._name, - detour_time=DetourTime.AFTER, - overload=cls._overload, - should_enable=should_enable, - ) - - -def disable(obj): - """ - Disable the current function or class. - This decorator MUST be the innermost decorator for it to work correctly. - """ - obj._should_enable = False - return obj - - -# TODO: See if we can make this work so that we may have a main loop decorator -# which doesn't require a .before or .after... -class _main_loop: - def __init__(self, func, detour_time=DetourTime.BEFORE): - self.func = func - self.func._main_loop = True - self.func._main_loop_detour_time = detour_time - - def __call__(self, *args, **kwargs): - return self.func(*args, **kwargs) - - @staticmethod - def before(func): - func._main_loop = True - func._main_loop_detour_time = DetourTime.BEFORE - return func - - @staticmethod - def after(func): - func._main_loop = True - func._main_loop_detour_time = DetourTime.AFTER - return func - - -def on_state_change(state): - def _inner(func): - func._trigger_on_state = state - return func - return _inner - - -def on_game_save(func): - func._on_game_save = True - return func - - -def on_game_load(func): - func._on_game_load = True - return func - - -def on_fully_booted(func): - """ - Configure the decorated function to be run once the game is considered - "fully booted". - This occurs when the games' internal state first changes to "mode selector" - (ie. just before the game mode selection screen appears). - """ - func._trigger_on_state = "MODESELECTOR" - return func - - -class main_loop: - @staticmethod - def before(func): - func._is_main_loop_func = True - func._main_loop_detour_time = DetourTime.BEFORE - return func - - @staticmethod - def after(func): - func._is_main_loop_func = True - func._main_loop_detour_time = DetourTime.AFTER - return func - - -def manual_hook( - name: str, - offset: int, - func_def: FUNCDEF, -): - def _hook_function(detour): - should_enable = getattr(detour, "_should_enable", True) - return _NMSHook( - detour, - name=name, - detour_time=DetourTime.AFTER, - should_enable=should_enable, - offset=offset, - call_func=func_def, - ) - return _hook_function - - -def hook_function( - function_name: str, - *, - offset: Optional[int] = None, - pattern: Optional[str] = None -): - """ Specify parameters for the function to hook. - - Parameters - ---------- - function_name: - The name of the function to hook. This will be looked up against the - known functions for the game and hooked if found. - offset: - The offset relative to the base address of the exe where the function - starts. - NOTE: Currently doesn't work. - pattern: - A byte pattern in the form `"AB CD ?? EF ..."` - This will be the same pattern as used by IDA and cheat engine. - NOTE: Currently doesn't work. - """ - def _hook_function(klass: NMSHook): - klass._pattern = None - klass.target = 0 - if not offset and not pattern: - if function_name in FUNC_OFFSETS: - klass.target = _internal.BASE_ADDRESS + FUNC_OFFSETS[function_name] - else: - raise UnknownFunctionError(f"{function_name} has no known address") - else: - if pattern: - klass._pattern = pattern - if function_name in FUNC_CALL_SIGS: - signature = FUNC_CALL_SIGS[function_name] - else: - raise UnknownFunctionError(f"{function_name} has no known call signature") - klass.signature = signature - klass._name = function_name - return klass - return _hook_function - - -def conditionally_enabled_hook(conditional: bool): - """ Conditionally enable a hook. - This conditional is checked at function definition time and will determine - if the hook should actually be enabled when told to enable. - - Parameters - ---------- - conditional: - A statement which must resolve to True or False - Eg. `some_variable == 42` - """ - def _conditional_hook(klass: NMSHook): - klass._should_enable = conditional - return klass - return _conditional_hook - - -def conditional_hook(conditional: str): - """ Conditionally call the detour function when the provided conditional - evaluates to true. - This can be used to conditionally turn on or off the detouring automatically - based on some condition. - - Parameters - ---------- - conditional: - String containing a statement which, when evaluated to be True, will - cause the detour to be run. - """ - def _conditional_hook(klass: NMSHook): - orig_detour = klass.detour - @wraps(klass.detour) - def conditional_detour(self: NMSHook, *args, **kwargs): - # Run the original function if the conditional evaluates to True. - if eval(conditional) is True: - ret = orig_detour(self, *args, **kwargs) - else: - ret = self.original(*args, **kwargs) - return ret - # Assign the decorated method to the `detour` attribute. - klass.detour = conditional_detour - return klass - return _conditional_hook - - -def one_shot(klass: _NMSHook): - klass._is_one_shot = True - return klass - - -def on_key_pressed(event: str): - def wrapped(func): - func._hotkey = event - func._hotkey_press = "down" - return func - return wrapped - - -def on_key_release(event: str): - def wrapped(func): - func._hotkey = event - func._hotkey_press = "up" - return func - return wrapped - - -class CompoundHook(NMSHook): - def __init__( - self, - *, - signature: Optional[CFuncPtr] = None, - target: Optional[int] = None - ): - super().__init__(target=target, signature=signature) - self.before_funcs = [] - self.after_funcs = [] - self.after_funcs_with_result = [] - - def add_before(self, func): - self.before_funcs.append(func) - - def add_after(self, func): - if func._has_return_arg: - self.after_funcs_with_result.append(func) - else: - self.after_funcs.append(func) - - def detour(self, *args, **kwargs): - """ Detour function consisting of multiple sub-functions. """ - # First, run each of the 'before' detours. - for func in self.before_funcs: - func(*args, **kwargs) - # Then, run the original function. - return_value = self.original(*args, **kwargs) - # Then run each of the 'after' detours. - for func in self.after_funcs: - return_value = func(*args, **kwargs) or return_value - # Finally, run reach of the 'after' functions which takes the return - # value. - # We run these last so that they may have the most recent return value - # possible. - for func in self.after_funcs_with_result: - return_value = func(*args, result=return_value, **kwargs) or return_value - return return_value - - -class HookManager(): - def __init__(self): - self.hooks: dict[str, NMSHook] = {} - self.compound_hooks: dict[str, CompoundHook] = {} - # Keep a mapping of any hooks that try to be registered but fail. - # These hooks will not be instances of classes, but the class type. - self.failed_hooks: dict[str, Type[NMSHook]] = {} - # Keep a list of main loop functions which will be run either before or - # after the main update loop each update cycle. - self.main_loop_before_funcs: list = [] - self.main_loop_after_funcs: list = [] - # Keep track of all the functions which are called on various state - # changes. - self.on_state_change_funcs: dict[str, list] = {} - for state in StateEnum: - self.on_state_change_funcs[state.value.decode()] = [] - - def add_hook( - self, - detour: Callable[..., Any], - signature: CFuncPtr, - target: int, - func_name: str, - enable: bool = True - ): - # In-line creation of hooks. - hook = NMSHook(signature=signature, target=target, detour=detour) - hook._name = func_name - self.hooks[func_name] = hook - if enable: - try: - hook.enable() - except: - hook_logger.info(traceback.format_exc()) - - def _add_cls_to_compound_hook( - self, - cls: NMSHook, - compound_cls: CompoundHook - ): - if before_hook := getattr(cls, "_before_hook", None): - compound_cls.add_before(before_hook) - if after_hook := getattr(cls, "_after_hook", None): - compound_cls.add_after(after_hook) - - def resolve_dependencies(self): - """ Resolve dependencies of hooks. - This will get all the functions which are to be hooked and construct - compound hooks as required.""" - # TODO: Make work. - pass - - def add_main_loop_func(self, func): - if func._main_loop_detour_time == DetourTime.BEFORE: - self.main_loop_before_funcs.append(func) - else: - self.main_loop_after_funcs.append(func) - - def remove_main_loop_func(self, func): - try: - if func._main_loop_detour_time == DetourTime.BEFORE: - self.main_loop_before_funcs.remove(func) - else: - self.main_loop_after_funcs.remove(func) - except ValueError: - # In this case maybe there was an error starting the hook the last - # time and so it won't be loaded. Just ignore as hopefully this - # time when reloading it will go better! - pass - - def add_state_change_func(self, state, func): - self.on_state_change_funcs[state].append(func) - - def remove_state_change_func(self, state, func): - self.on_state_change_funcs[state].remove(func) - - def register_function( - self, - hook: _NMSHook, - enable: bool = True, - mod = None, - quiet: bool = False - ): - """ Register the provided function as a callback. """ - if hook._invalid: - # If the hook is invalid for any reason, then just return (for now...) - return - bound_ok = hook.bind(mod) - if bound_ok: - if not quiet: - hook_logger.info( - f"Registered hook '{hook._name}' for function " - f"'{hook._original_detour.__name__}'" - ) - self.hooks[hook._name] = hook - if enable and hook._should_enable: - try: - hook.enable() - except: - hook_logger.info(traceback.format_exc()) - - def enable(self, func_name: str): - """ Enable the hook for the provided function name. """ - if hook := self.hooks.get(func_name): - hook.enable() - elif hook := self.compound_hooks.get(func_name): - hook.enable() - else: - return - hook_logger.info(f"Enabled hook for function '{func_name}'") - - def disable(self, func_name: str): - """ Disable the hook for the provided function name. """ - if hook := self.hooks.get(func_name): - hook.disable() - elif hook := self.compound_hooks.get(func_name): - hook.disable() - else: - return - hook_logger.info(f"Disabled hook for function '{func_name}'") - - @property - def states(self): - # Return the states of all the registered hooks - for func_name, hook in self.hooks.items(): - yield f"{func_name}: {hook.state}" - - -hook_manager = HookManager() diff --git a/nmspy/logging.py b/nmspy/logging.py deleted file mode 100644 index aa80fa8..0000000 --- a/nmspy/logging.py +++ /dev/null @@ -1,25 +0,0 @@ -from multiprocessing.connection import Connection -import subprocess - -import psutil - - -class stdoutSocket(): - def __init__(self, connection: Connection): - self.connection = connection - - def write(self, val): - self.connection.send_bytes(val.encode()) - - def flush(self): - pass - - -def open_log_console(log_script: str) -> int: - """ Open the logging console and return the pid of it.""" - cmd = ["cmd.exe", "/c", "start", "NMS.py console", "python", log_script] - with subprocess.Popen(cmd) as proc: - log_ppid = proc.pid - for proc in psutil.process_iter(["pid", "name", "ppid"]): - if proc.info["ppid"] == log_ppid: - return proc.info["pid"] diff --git a/nmspy/memutils.py b/nmspy/memutils.py deleted file mode 100644 index a29d058..0000000 --- a/nmspy/memutils.py +++ /dev/null @@ -1,325 +0,0 @@ -import array -from contextlib import contextmanager -import ctypes -from gc import get_referents -import logging -import sys -import time -from types import ModuleType, FunctionType -from typing import Any, Type, TypeVar, Optional, Iterable, Union, Generator - -from nmspy._internal import BASE_ADDRESS, SIZE_OF_IMAGE -from nmspy.calling import call_function - - -# Custom objects know their class. -# Function objects seem to know way too much, including modules. -# Exclude modules as well. -BLACKLIST = type, ModuleType, FunctionType - - -mem_logger = logging.getLogger("MemUtils") - - -MEM_ACCESS_R = 0x100 # Read only. -MEM_ACCESS_RW = 0x200 # Read and Write access. - -BLOB_SIZE = 0x1_000_000 # 16Mb - - -ctypes.pythonapi.PyMemoryView_FromMemory.argtypes = ( - ctypes.c_char_p, - ctypes.c_ssize_t, - ctypes.c_int, -) -ctypes.pythonapi.PyMemoryView_FromMemory.restype = ctypes.py_object - - -# TypeVar for the map_struct so that we can correctly get the returned type to -# be the same as the input type. -CTYPES = Union[ctypes._SimpleCData, ctypes.Structure, ctypes._Pointer] -Struct = TypeVar("Struct", bound=CTYPES) - - -def getsize(obj): - """sum size of object & members.""" - if isinstance(obj, BLACKLIST): - raise TypeError('getsize() does not take argument of type: ' + str(type(obj))) - seen_ids = set() - size = 0 - objects = [obj] - while objects: - need_referents = [] - for _obj in objects: - if not isinstance(_obj, BLACKLIST) and id(_obj) not in seen_ids: - seen_ids.add(id(_obj)) - size += sys.getsizeof(_obj) - need_referents.append(_obj) - objects = get_referents(*need_referents) - try: - _len = len(obj) - except TypeError: - _len = None - return size, _len - - - -def chunks(lst: Iterable, n: int): - """Yield successive n-sized chunks from lst.""" - for i in range(0, len(lst), n): - yield lst[i:i + n] - - -def match(patt: bytes, input: bytes): - """ Check whether or not the pattern matches the provided bytes. """ - for i, char in enumerate(patt): - if not (char == b'.' or char == input[i]): - return False - return True - - -def pprint_mem(offset: int, size: int, stride: Optional[int] = None) -> str: - # TODO: Make this print a much nicer output... It sucks right now... - if not offset: - # If we are passed in an offset of 0, don't even try. - return "" - _data = (ctypes.c_char * size).from_address(offset) - if stride: - result = " ".join([f"{x:02X}".upper() for x in range(stride)]) + "\n" - for chunk in chunks(_data, stride): - result += " ".join([f"{k:02X}".upper() for k in chunk]) + "\n" - return result - else: - return " ".join([k.hex().upper() for k in _data]) - - -def _hex_repr(val: int, as_hex: bool) -> str: - if as_hex: - return hex(val) - else: - return str(val) - - -def get_field_info(obj, logger=None, indent: int = 0, as_hex: bool = True, max_depth: int = -1): - if indent == max_depth: - return - if isinstance(obj, ctypes.Structure): - # Need to get the actual class object to iterate over its' fields: - cls_obj = obj.__class__ - has_values = True - elif isinstance(obj, ctypes.Array): - cls_obj = obj[0] - has_values = True - else: - try: - if issubclass(obj, ctypes.Structure): - cls_obj = obj - has_values = False - elif issubclass(obj, ctypes.Array): - cls_obj = obj._type_ - has_values = False - else: - raise TypeError(f"obj {obj} must be an instance of a ctypes.Structure or a subclass.") - except TypeError as e: - yield obj.__mro__ - raise TypeError(f"!!! obj {obj} must be an instance of a ctypes.Structure or a subclass.") from e - for field, field_type in cls_obj._fields_: - if has_values: - val = getattr(obj, field) - # if isinstance(val, ctypes.Array): - # val = [x for x in val] - field_data: ctypes._CField = getattr(cls_obj, field) - offset = _hex_repr(field_data.offset, as_hex) - size = _hex_repr(field_data.size, as_hex) - if has_values and not isinstance(val, ctypes.Structure): - msg = f"{field} ({field_type.__name__}): size: {size} offset: {offset} value: {val}" - else: - msg = f"{field} ({field_type.__name__}): size: {size} offset: {offset}" - msg = indent * " " + msg - yield msg - if not issubclass(field_type, (ctypes._SimpleCData, ctypes.Array, ctypes._Pointer)): - if has_values: - for _msg in get_field_info(val, logger, indent + 1, as_hex, max_depth): - yield _msg - else: - for _msg in get_field_info(field_type, logger, indent + 1, as_hex, max_depth): - yield _msg - - -def get_addressof(obj) -> int: - try: - # If it's a pointer, this is the branch that is used. - return ctypes.cast(obj, ctypes.c_void_p).value - except: - # TODO: Get correct error type. - # Otherwise fallback to the usual method. - return ctypes.addressof(obj) - - -def _get_memview(offset: int, type_: Type[ctypes.Structure]) -> memoryview: - """ Return a memoryview which covers the region of memory specified by the - struct provided. - - Parameters - ---------- - offset: - The memory address to start reading the struct from. - type_: - The type of the ctypes.Structure to be loaded at this location. - """ - return ctypes.pythonapi.PyMemoryView_FromMemory( - ctypes.cast(offset, ctypes.c_char_p), - ctypes.sizeof(type_), - MEM_ACCESS_RW, - ) - - -def _get_memview_with_size(offset: int, size: int) -> Optional[memoryview]: - """ Return a memoryview which covers the region of memory specified by the - struct provided. - - Parameters - ---------- - offset: - The memory address to start reading the struct from. - type_: - The type of the ctypes.Structure to be loaded at this location. - """ - if not offset: - return None - return ctypes.pythonapi.PyMemoryView_FromMemory( - ctypes.cast(offset, ctypes.c_char_p), - size, - MEM_ACCESS_RW, - ) - - -@contextmanager -def map_struct_temp(offset: int, type_: Type[Struct]) -> Generator[Struct, Any, Any]: - """ Return an instance of the `type_` struct provided which shares memory - with the provided offset. - Note that the amount of memory to read is automatically determined by the - size of the struct provided. - IMPORTANT: The data at the offset specified will be de-allocated by NMS - itself after this context manager exits. This means you should only use the - object returned inside the context manager as using it outside will likely - cause the game or NMS.py to crash. - - Parameters - ---------- - offset: - The memory address to start reading the struct from. - type_: - The type of the ctypes.Structure to be loaded at this location. - - Returns - ------- - An instance of the input type. - """ - # Import here to save a circular dependency... - import nmspy.common as nms # noqa - - if not offset: - raise ValueError("Offset is 0. This would result in a segfault or similar") - instance = ctypes.cast(offset, ctypes.POINTER(type_)) - yield instance.contents - instance = None - del instance - if nms.memory_manager != 0: - # TODO: Use the function bound to the class, rather than this... - call_function("cTkMemoryManager::Free", nms.memory_manager, offset, -1) - - -def map_struct(offset: int, type_: Type[Struct]) -> Struct: - """ Return an instance of the `type_` struct provided which shares memory - with the provided offset. - Note that the amount of memory to read is automatically determined by the - size of the struct provided. - - Parameters - ---------- - offset: - The memory address to start reading the struct from. - type_: - The type of the ctypes.Structure to be loaded at this location. - - Returns - ------- - An instance of the input type. - """ - if not offset: - raise ValueError("Offset is 0. This would result in a segfault or similar") - instance = ctypes.cast(offset, ctypes.POINTER(type_)) - return instance.contents - - -def pattern_to_bytes(patt: str) -> array.array: - arr = array.array("H") - for char in patt.split(" "): - try: - num = int(char, 0x10) - except ValueError: - num = -1 - if num > 0xFF: - raise ValueError - elif num == -1: - num = 0x100 - arr.append(num) - return arr - - -def find_bytes( - pattern: str, - start: Optional[int] = None, - end: Optional[int] = None, - alignment: int = 0x4, - find_all: bool = False, -) -> Union[int, list[int], None]: - if start is None: - start = BASE_ADDRESS - if end is None: - end = start + SIZE_OF_IMAGE - # Search memory as follows: - # Read the requested bytes in (up to) 0x10 byte blobs. - # For the first blob then loop over all the memory addresses reading from - # start to finish. - # If, at any given memory address we find the start memory to match, then - # we continue to loop over the rest of the memory to see if it also matches. - # If not, then return back to the first part of the data and continue - # searching through memory. - start_time = time.time() - offsets = [] - _addr = start - patt = pattern_to_bytes(pattern) - # Loop over the whole region. - while _addr < end: - # Determine how much data we should read, and then read it into a - # memoryview. - _size = min(end - _addr, BLOB_SIZE) - mv = _get_memview_with_size(_addr, _size) - for i in range(_size // alignment): - for j, char in enumerate(patt): - # 0x100 is used as the "wildcard" since every byte will be from - # 0 -> 0xFF, but we will store each in 2 bytes. - if char == 0x100: - continue - try: - if not mv[alignment * i + j] == char: - break - except IndexError: - mem_logger.info(f"i: {i}, alignment: {alignment}, len: {len(mv)}, {alignment * i + j}") - raise - else: - # In this case we matched. Check to see if we want all or just - # one. - if find_all: - offsets.append(_addr + alignment * i) - else: - mem_logger.info(f"Time to find pattern: {time.time() - start_time:.3f}s") - return _addr + alignment * i - # Move forward by the alignment amount. - _addr += BLOB_SIZE - if find_all: - return offsets - else: - return None diff --git a/nmspy/mod_loader.py b/nmspy/mod_loader.py deleted file mode 100644 index ebebcb1..0000000 --- a/nmspy/mod_loader.py +++ /dev/null @@ -1,393 +0,0 @@ -# Main functionality for loading mods. - -# Mods will consist of a single file which will generally contain a number of -# hooks. - -from abc import ABC -from dataclasses import fields -from functools import partial -import inspect -import importlib -import importlib.util -import json -import logging -import os.path as op -import os -import traceback -from types import ModuleType -from typing import Any, Optional -import string -import sys - -from nmspy import __version__ as _nmspy_version -from nmspy.errors import NoSaveError -from nmspy._types import NMSHook -from nmspy._internal import CWD -from nmspy.hooking import HookManager, _NMSHook -import nmspy.common as nms - -import keyboard -import semver - - -mod_logger = logging.getLogger("ModManager") - - -VALID_CHARS = string.ascii_letters + string.digits + "_" - -# This will fail when not injected. Just have some dummy fallback for now. -try: - fpath = op.join(CWD, "mods") -except TypeError: - fpath = "mods" - - -nmspy_version = semver.Version.parse(_nmspy_version) - - -def _clean_name(name: str) -> str: - """ Remove any disallowed characters from the filename so that we get a - valid module name.""" - out = '' - for char in name: - if char not in VALID_CHARS: - out += "_" - else: - out += char - return out - - -def _is_mod_predicate(obj, ref_module) -> bool: - if inspect.getmodule(obj) == ref_module and inspect.isclass(obj): - return issubclass(obj, NMSMod) and getattr(obj, "_should_enable", True) - return False - - -def _is_mod_state_predicate(obj) -> bool: - return isinstance(obj, ModState) - - -def _partial_predicate(value: Any) -> bool: - try: - return isinstance(value, partial) and isinstance(value.func.__self__, _NMSHook) - except TypeError: - return False - - -def _main_loop_predicate(value: Any) -> bool: - """ Determine if the objecy has the _is_main_loop_func property. - This will only be methods on NMSMod classes which are decorated with either - @main_loop.before or @main_loop.after - """ - return getattr(value, "_is_main_loop_func", False) - - -def _state_change_hook_predicate(value: Any) -> bool: - """ Determine if the object has the _trigger_on_state property. - This will only be methods on NMSMod classes which are decorated with - @on_state_change or on_fully_booted. - """ - return hasattr(value, "_trigger_on_state") - - -def _has_hotkey_predicate(value: Any) -> bool: - """ Determine if the objecy has the _is_main_loop_func property. - This will only be methods on NMSMod classes which are decorated with either - @main_loop.before or @main_loop.after - """ - return getattr(value, "_hotkey", False) - - -def _import_file(fpath: str) -> Optional[ModuleType]: - try: - module_name = _clean_name(op.splitext(op.basename(fpath))[0]) - if spec := importlib.util.spec_from_file_location(module_name, fpath): - module = importlib.util.module_from_spec(spec) - module.__name__ = module_name - module.__spec__ = spec - sys.modules[module_name] = module - spec.loader.exec_module(module) - return module - except Exception: - mod_logger.error(f"Error loading {fpath}") - mod_logger.exception(traceback.format_exc()) - - -class StructEncoder(json.JSONEncoder): - def default(self, obj): - if hasattr(obj, "__json__"): - return { - "struct": obj.__class__.__qualname__, - "module": obj.__class__.__module__, - "fields": obj.__json__() - } - return json.JSONEncoder.default(self, obj) - - -class StructDecoder(json.JSONDecoder): - def __init__(self): - json.JSONDecoder.__init__(self, object_hook=self.object_hook) - - def object_hook(seld, obj: dict): - if (module := obj.get("module")) is not None: - mod_logger.info(module) - if module == "__main__": - return globals()[obj["struct"]](**obj["fields"]) - else: - try: - module_ = importlib.import_module(module) - return getattr(module_, obj["struct"])(**obj["fields"]) - except ImportError: - mod_logger.error(f"Cannot import {module}") - return - except AttributeError: - mod_logger.error( - f"Cannot find {obj['struct']} in {module}" - ) - return - return obj - - -class ModState(ABC): - """A class which is used as a base class to indicate that the class is to be - used as a mod state. - Mod State classes will persist across mod reloads so any variables set in it - will have the same value after the mod has been reloaded. - """ - _save_fields_: tuple[str] - - def save(self, name: str): - _data = {} - if hasattr(self, "_save_fields_") and self._save_fields_: - for field in self._save_fields_: - _data[field] = getattr(self, field) - else: - try: - for f in fields(self): - _data[f.name] = getattr(self, f.name) - except TypeError: - mod_logger.error( - "To save a mod state it must either be a dataclass or " - "have the _save_fields_ attribute. State was not saved" - ) - return - with open(op.join(nms.mod_save_dir, name), "w") as fobj: - json.dump(_data, fobj, cls=StructEncoder, indent=1) - - def load(self, name: str): - try: - with open(op.join(nms.mod_save_dir, name), "r") as f: - data = json.load(f, cls=StructDecoder) - except FileNotFoundError as e: - raise NoSaveError from e - for key, value in data.items(): - setattr(self, key, value) - - -class NMSMod(ABC): - __author__: str = "Name(s) of the mod author(s)" - __description__: str = "Short description of the mod" - __version__: str = "Mod version" - # Minimum required NMS.py version for this mod. - __NMSPY_required_version__: Optional[str] = None - - def __init__(self): - # Find all the hooks defined for the mod. - self.hooks: list[NMSHook] = [ - x[1].func.__self__ for x in inspect.getmembers(self, _partial_predicate) - ] - - self._main_funcs = [ - x[1] for x in inspect.getmembers(self, _main_loop_predicate) - ] - - self._state_change_funcs = [ - x[1] for x in inspect.getmembers(self, _state_change_hook_predicate) - ] - - self._hotkey_funcs = [ - x[1] for x in inspect.getmembers(self, _has_hotkey_predicate) - ] - - -class ModManager(): - def __init__(self, hook_manager: HookManager): - # Internal mapping of mods. - self._preloaded_mods: dict[str, type[NMSMod]] = {} - # Actual mapping of mods. - self.mods: dict[str, NMSMod] = {} - self._mod_hooks: dict[str, list] = {} - self.mod_states: dict[str, list[tuple[str, ModState]]] = {} - self._mod_paths: dict[str, ModuleType] = {} - self.hook_manager = hook_manager - # Keep a mapping of the hotkey callbacks - self.hotkey_callbacks: dict[tuple[str, str], Any] = {} - - def _load_module(self, module: ModuleType) -> bool: - """ Load a mod from the provided module. - This will be called when initially loading the mods, and also when we - wish to reload a mod. - """ - d: dict[str, type[NMSMod]] = dict( - inspect.getmembers( - module, - partial(_is_mod_predicate, ref_module=module) - ) - ) - if not len(d) >= 1: - mod_logger.error( - f"The file {module.__file__} has more than one mod defined in it. " - "Only define one mod per file." - ) - if len(d) == 0: - # No mod in the file. Just return - return False - mod_name = list(d.keys())[0] - mod = d[mod_name] - if mod.__NMSPY_required_version__ is not None: - try: - mod_version = semver.Version.parse(mod.__NMSPY_required_version__) - except ValueError: - mod_logger.warning( - "__NMSPY_required_version__ defined on mod " - f"{mod.__name__} is not a valid version string" - ) - mod_version = None - if mod_version is None or mod_version <= nmspy_version: - self._preloaded_mods[mod_name] = mod - else: - mod_logger.error( - f"Mod {mod.__name__} requires a newer verison of " - f"NMS.py ({mod_version} ≥ {nmspy_version})! " - "Please update" - ) - else: - self._preloaded_mods[mod_name] = mod - # Only get mod states if the mod name doesn't already have a cached - # state, otherwise it will override it. - if mod_name not in self.mod_states: - mod_states = list( - inspect.getmembers( - mod, - _is_mod_state_predicate - ) - ) - self.mod_states[mod_name] = mod_states - if not mod_name.startswith("_INTERNAL_"): - self._mod_paths[mod_name] = module - - return True - - def load_mod(self, fpath) -> bool: - """ Load a mod from the given filepath. """ - module = _import_file(fpath) - if module is None: - return False - return self._load_module(module) - - - def load_mod_folder(self, folder: str): - for file in os.listdir(folder): - if file.endswith(".py"): - self.load_mod(op.join(folder, file)) - - def _register_funcs(self, mod: NMSMod, quiet: bool): - for hook in mod.hooks: - self.hook_manager.register_function(hook, True, mod, quiet) - for main_loop_func in mod._main_funcs: - self.hook_manager.add_main_loop_func(main_loop_func) - for func in mod._state_change_funcs: - self.hook_manager.add_state_change_func(func._trigger_on_state, func) - for hotkey_func in mod._hotkey_funcs: - # Don't need to tell the hook manager, register the keyboard - # hotkey here... - # NOTE: The below is a "hack"/"solution" to an issue that the - # keyboard library has. - # cf. https://github.com/boppreh/keyboard/issues/584 - cb = keyboard.hook( - lambda e, func=hotkey_func, name=hotkey_func._hotkey, event_type=hotkey_func._hotkey_press: ( - e.name == name and - e.event_type == event_type and - nms.GcApplication is not None and - nms.GcApplication.hasFocus and - func() - ) - ) - self.hotkey_callbacks[ - (hotkey_func._hotkey, hotkey_func._hotkey_press) - ] = cb - - def enable_all(self, quiet: bool = False) -> int: - """ Enable all mods loaded by the manager that haven't been enabled yet. - Returns the number of mods enabled. """ - _loaded_mod_names = set() - for name, _mod in self._preloaded_mods.items(): - if not quiet: - mod_logger.info(f"- Loading hooks for {name}") - # Instantiate the mod, and then overwrite the object in the mods - # attribute with the instance. - mod = _mod() - self.mods[name] = mod - if not hasattr(mod, "hooks"): - mod_logger.error( - f"The mod {mod.__class__.__name__} is not initialised " - "properly. Please ensure that `super().__init__()` is " - "included in the `__init__` method of this mod!" - ) - mod_logger.warning(f"Could not enable {mod.__class__.__name__}") - continue - self._register_funcs(mod, quiet) - # If we get here, then the mod has been loaded successfully. - # Add the name to the loaded mod names set so we can then remove the - # mod from the preloaded mods dict. - _loaded_mod_names.add(name) - for name in _loaded_mod_names: - self._preloaded_mods.pop(name) - return len(_loaded_mod_names) - - def reload(self, name): - if (mod := self.mods.get(name)) is not None: - # First, remove everything. - for hook in mod.hooks: - mod_logger.info(f"Disabling hook {hook}: {hook._name}") - hook.disable() - hook.close() - del hook - for main_loop_func in mod._main_funcs: - self.hook_manager.remove_main_loop_func(main_loop_func) - for func in mod._state_change_funcs: - self.hook_manager.remove_state_change_func(func._trigger_on_state, func) - for hotkey_func in mod._hotkey_funcs: - cb = self.hotkey_callbacks.pop( - (hotkey_func._hotkey, hotkey_func._hotkey_press), - None, - ) - if cb is not None: - keyboard.unhook(cb) - - # Then, reload the module - module = self._mod_paths[name] - del sys.modules[module.__name__] - # Then, add everything back. - self.load_mod(module.__file__) - _loaded_mod_names = set() - for name, _mod in self._preloaded_mods.items(): - mod = _mod() - self.mods[name] = mod - if not hasattr(mod, "hooks"): - mod_logger.error( - f"The mod {mod.__class__.__name__} is not initialised " - "properly. Please ensure that `super().__init__()` is " - "included in the `__init__` method of this mod!" - ) - mod_logger.warning(f"Could not enable {mod.__class__.__name__}") - continue - self._register_funcs(mod, False) - if mod_state := self.mod_states.get(name): - for ms in mod_state: - field, state = ms - setattr(mod, field, state) - _loaded_mod_names.add(name) - for name in _loaded_mod_names: - self._preloaded_mods.pop(name) - mod_logger.info(f"Finished reloading {name}") diff --git a/nmspy/process.py b/nmspy/process.py deleted file mode 100644 index e01c6df..0000000 --- a/nmspy/process.py +++ /dev/null @@ -1,72 +0,0 @@ -import ctypes -import ctypes.wintypes -import _winapi - - -kernel32 = ctypes.WinDLL('kernel32.dll') - -# Map two useful debugging functions in kernel32: - -# Start debugger (will stop the current process) -# https://learn.microsoft.com/en-us/windows/win32/api/debugapi/nf-debugapi-debugactiveprocess -DebugActiveProcess = kernel32.DebugActiveProcess -DebugActiveProcess.argtypes = ( - ctypes.c_long, -) -DebugActiveProcess.restype = ctypes.c_byte - -# Stop the debugger (will result the current process) -# https://learn.microsoft.com/en-us/windows/win32/api/debugapi/nf-debugapi-debugactiveprocessstop -DebugActiveProcessStop = kernel32.DebugActiveProcessStop -DebugActiveProcessStop.argtypes = ( - ctypes.c_long, -) -DebugActiveProcessStop.restype = ctypes.c_byte - - -class SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [ - ("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), - ("bInheritHandle", ctypes.wintypes.BOOL), - ] - - -def start_process(binary_path: str, creationflags: int = 0x0): - # Start an executable similarly to subprocess.Popen - # The functionality here is ripped directly from that implementation, - # however we don't discard the thread_handle here so that we may use it - # again later to resume the thread if we pause it. - proc_attr = SECURITY_ATTRIBUTES() - proc_attr.bInheritHandle = True - thread_attr = SECURITY_ATTRIBUTES() - thread_attr.bInheritHandle = True - # Start the process the internal way to get the thread handle. - handle_process, handle_thread, pid, tid = _winapi.CreateProcess( - None, - binary_path, - ctypes.byref(proc_attr), - ctypes.byref(thread_attr), - False, - creationflags, - None, - None, - None, - ) - return (handle_process, handle_thread, pid, tid) - - -def _stop_process(pid: int): - ret = DebugActiveProcess(pid) - if not ret: - return f"Error: {ctypes.GetLastError()}" - else: - return ret - - -def _start_process(pid: int): - ret = DebugActiveProcessStop(pid) - if not ret: - return f"Error: {ctypes.GetLastError()}" - else: - return ret diff --git a/nmspy/protocols.py b/nmspy/protocols.py deleted file mode 100644 index dcdda9f..0000000 --- a/nmspy/protocols.py +++ /dev/null @@ -1,48 +0,0 @@ -import asyncio -import builtins -import traceback - -from functools import partial - -# This escape sequence is arbitrarily the first 4 digits of Euler's number "e" -# written as bytes from left to right. -ESCAPE_SEQUENCE = b"\x02\x07\x01\x08" - - -class ExecutionEndedException(Exception): - pass - - -def custom_exception_handler(loop: asyncio.AbstractEventLoop, context: dict): - # Simple custom exception handler to stop the loop if an - # ExecutionEndedException exception is raised. - exception = context.get("exception") - if isinstance(exception, ExecutionEndedException): - loop.stop() - - -class TerminalProtocol(asyncio.Protocol): - def __init__(self, message: str, future): - super().__init__() - self.message = message - self.future = future - - def connection_made(self, transport): - self.transport = transport - transport.write(self.message.encode()) - if transport.can_write_eof(): - transport.write_eof() - - def data_received(self, data): - print(data.decode(), end="") - - def eof_received(self): - self.transport.close() - if not self.future.done(): - self.future.set_result(True) - - def connection_lost(self, exc): - self.transport.close() - if not self.future.done(): - self.future.set_result(True) - super().connection_lost(exc) diff --git a/nmspy/pymhf.cfg b/nmspy/pymhf.cfg new file mode 100644 index 0000000..6c61e09 --- /dev/null +++ b/nmspy/pymhf.cfg @@ -0,0 +1,16 @@ +[binary] +path = C:/Games/No Man's Sky/Binaries/NMS.exe +root_dir = C:/Games/No Man's Sky +mod_dir = C:/Games/No Man's Sky/mods +# steam_gameid = 275850 +internal_mod_dir = _internals\mods +# hash = 014f5fd1837e2bd8356669b92109fd3add116137 +start_paused = True + +[pymhf] +log_level = info + +[gui] +shown = True +scale = 1 +log_window_name_override = NMS.py diff --git a/nmspy/utils.py b/nmspy/utils.py deleted file mode 100644 index e2c5632..0000000 --- a/nmspy/utils.py +++ /dev/null @@ -1,18 +0,0 @@ -import json -import os.path as op - -from nmspy import _internal - - -def dump_resource(res, fname): - with open(op.join(_internal.CWD, fname), "w") as f: - f.write(json.dumps(res, indent=2)) - - -def safe_assign_enum(enum, index: int): - """ Safely try and get the enum with the associated integer value. - If the index isn't one in the enum return the original index.""" - try: - return enum(index) - except ValueError: - return index diff --git a/pyproject.toml b/pyproject.toml index c97a160..d03046b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,10 @@ [project] -name = "nms.py" -description = "NMS python hooking API" +name = "NMS.py" +description = "No Man's Sky python modding API" requires-python = ">=3.9" # TODO: Add -license = {file = "LICENSE.txt"} -keywords = ["hooking", "games", "hacking"] +license = {file = "LICENSE.md"} +keywords = ["hooking", "games", "hacking", "modding"] authors = [ {name = "monkeyman192"} ] @@ -22,18 +22,20 @@ classifiers = [ "Programming Language :: Python :: 3 :: Only", ] dependencies = [ - "cyminhook~=0.1.2", "psutil~=5.9.5", + "pymhf" ] +version = "0.7.0" + +[tool.setuptools.package-dir] +nmspy = "nmspy" [project.urls] Homepage = "https://github.com/monkeyman192/NMS.py" Repository = "https://github.com/monkeyman192/NMS.py" +[project.entry-points.pymhflib] +nmspy = "nmspy" + [build-system] requires = ["setuptools>=43.0.0", "wheel"] build-backend = "setuptools.build_meta" - -[tool.pytest.ini_options] -testpaths = [ - "tests", -] \ No newline at end of file diff --git a/tests/test_memutils.py b/tests/test_memutils.py deleted file mode 100644 index 2e22145..0000000 --- a/tests/test_memutils.py +++ /dev/null @@ -1,14 +0,0 @@ -from nmspy.memutils import pattern_to_bytes - -import pytest - - -def test_pattern_to_bytes(): - patt = "AB CD EF" - assert list(pattern_to_bytes(patt)) == [0xAB, 0xCD, 0xEF] - patt = "1 2 3 4 5" - assert list(pattern_to_bytes(patt)) == [1, 2, 3, 4, 5] - patt = "01 ?? 02" - assert list(pattern_to_bytes(patt)) == [0x01, 0x100, 0x02] - with pytest.raises(ValueError): - pattern_to_bytes("123 04")