Skip to content

Commit

Permalink
Add hot reloading
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitris Zervas <[email protected]>
  • Loading branch information
dzervas committed Jan 12, 2024
1 parent c29e253 commit f085d7d
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 43 deletions.
16 changes: 14 additions & 2 deletions actions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from html import escape
from typing import Any, Optional
import binaryninja as bn
import frida

from .frida_launcher import FridaLauncher, jinja
from .frida_launcher import FridaLauncher, jinja, FRIDA_RELOADER
from .log import *
from .console import CONSOLE
from .settings import HOOK_TAG_TYPE, HOOK_TAG_TYPE_ICON
Expand All @@ -12,8 +13,9 @@
def show_help(bv: bn.BinaryView):
bv.show_markdown_report("Frinja Help", open(bn.user_plugin_path() + "/frinja/README.md").read())

@alert_on_error
def mark_hooked(bv: bn.BinaryView, func: bn.Function):
global FRIDA_RELOADER

# NOTE: Maybe rely on id instead of name?
if not bv.get_tag_type(HOOK_TAG_TYPE):
bv.create_tag_type(HOOK_TAG_TYPE, HOOK_TAG_TYPE_ICON)
Expand All @@ -23,17 +25,27 @@ def mark_hooked(bv: bn.BinaryView, func: bn.Function):
else:
func.remove_user_function_tags_of_type(HOOK_TAG_TYPE)

try:
FRIDA_RELOADER()
except frida.InvalidOperationError:
FRIDA_RELOADER = lambda: None

# Frida Start
@needs_settings
def frida_start(bv: bn.BinaryView):
global FRIDA_RELOADER

info("Launching hooker script")
# int is immutable so we have to use dict/list
state = { "depth": 0 }
targets = get_functions_by_tag(bv, HOOK_TAG_TYPE)

frida_launcher = FridaLauncher.from_template(bv, "hooker.js.j2", targets=targets)
frida_launcher.on_message_send = [on_frida_start(state)]
frida_launcher.start()

FRIDA_RELOADER = lambda: frida_launcher.replace_script_from_template("hooker.js.j2", targets=get_functions_by_tag(bv, HOOK_TAG_TYPE))

@message_handler
def on_frida_start(msg: Any, data: Optional[bytes], state: dict):
if not isinstance(msg, dict) or "event" not in msg.keys() or msg["event"] not in ("call", "return"):
Expand Down
119 changes: 78 additions & 41 deletions frida_launcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from typing import Any, List, Mapping, Optional, Tuple, Union
from typing import List, Optional
import frida
import binaryninja as bn

Expand All @@ -8,10 +8,9 @@
from .log import *
from .helper import PLUGIN_PATH
from jinja2 import Environment, FileSystemLoader, select_autoescape
from pathlib import Path
from PySide6.QtCore import SignalInstance

TEMPLATES_PATH = PLUGIN_PATH / "templates"
FRIDA_RELOADER: Callable[[], None] = lambda: None

jinja = Environment(
loader=FileSystemLoader(TEMPLATES_PATH),
Expand All @@ -21,7 +20,7 @@

class FridaLauncher(bn.BackgroundTaskThread):
bv: bn.BinaryView
script: str
script_source: str

on_log: List[Callable[[str, str], None]]
on_destroyed: List[Callable[[], None]]
Expand All @@ -32,11 +31,20 @@ class FridaLauncher(bn.BackgroundTaskThread):
on_message_send: List[Callable[[frida.core.ScriptPayloadMessage, Optional[bytes]], None]]
on_message_error: List[Callable[[frida.core.ScriptErrorMessage], None]]

session: Optional[frida.core.Session]
script: Optional[frida.core.Script]
evaluate: Optional[Callable[[str], str]]

def __init__(self, bv: bn.BinaryView, script: str):
global FRIDA_RELOADER
super().__init__("Frinja initializing", True)

self.script = script
FRIDA_RELOADER = lambda: None
self.script_source = script
self.bv = bv
self.script = None
self.session = None
self.evaluate = None
SETTINGS.restore(bv)

self.on_log = [CONSOLE.handle_log]
Expand All @@ -52,25 +60,39 @@ def __init__(self, bv: bn.BinaryView, script: str):
def from_template(bv: bn.BinaryView, template_name: str, **kwargs):
template = jinja.get_template(template_name)
script = template.render(settings=SETTINGS, bv=bv, **kwargs)
return FridaLauncher(bv, script)

print("\n".join([f"{i + 1}: {l}" for i, l in enumerate(script.split("\n"))]))
def replace_script_from_template(self, template_name: str, **kwargs):
template = jinja.get_template(template_name)
script = template.render(settings=SETTINGS, bv=self.bv, **kwargs)
return self.replace_script(script)

return FridaLauncher(bv, script)
def replace_script(self, script: str) -> bool:
if self.session is None:
return False

def run(self):
if SETTINGS.device is None:
alert("Please select a device from the settings")
if self.script is None:
info("Loading script")
self.progress = "Loading script"
else:
info("Reloading script")
self.progress = "Reloading script"
self.script.unload()

# Prepare the callback handlers
def on_detached(reason):
for f in self.on_detached:
bn.execute_on_main_thread(lambda: f(reason))
self.cancel()
bn.execute_on_main_thread(lambda: CONSOLE.output.appendHtml("<br/>=== Script reloaded ===<br/>"))

# Print the script (very useful for debugging)
debug("\n".join([f"{n + 1}: {l}" for n, l in enumerate(script.split("\n"))]))

# Create the script with the repl code injected
repl_script = open(TEMPLATES_PATH / "repl.js").read()
self.script = self.session.create_script(repl_script + "\n\n" + script)

# Intialize the callback handlers
def on_destroyed():
for f in self.on_destroyed:
bn.execute_on_main_thread(f)
self.cancel()
# self.cancel()

def on_message(msg: frida.core.ScriptMessage, data: Optional[bytes]):
for f in self.on_message:
Expand All @@ -87,6 +109,38 @@ def on_log(level: str, text: str):
for f in self.on_log:
bn.execute_on_main_thread(lambda: f(level, text))

self.script.set_log_handler(on_log)
self.script.on("destroyed", on_destroyed)
self.script.on("message", on_message)
self.script.load()
self.evaluate = self.script.exports_sync.evaluate # RPC export defined in repl.js

self.progress = "Frinja running..."

def run(self):
if SETTINGS.device is None:
alert("Please select a device from the settings")

# Prepare the callback handlers
def on_detached(reason):
info("Detached from process")
for f in self.on_detached:
bn.execute_on_main_thread(lambda: f(reason))
self.cancel()

# Add the session & script finalizer
def finish_script():
if SETTINGS.exec_action != ExecutionAction.SPAWN:
self.script.unload()
self.session.detach()
else:
try:
SETTINGS.device.kill(pid)
info("Process killed")
except frida.ProcessNotFoundError:
info("Process already finished")
self.on_end.append(finish_script)

# Find (or create) the process
pid = 0
if SETTINGS.exec_action == ExecutionAction.SPAWN:
Expand All @@ -102,47 +156,30 @@ def on_log(level: str, text: str):
info(f"Attaching to {pid}")

# Initialize the frida session
session = SETTINGS.device.attach(pid)
session.on("detached", on_detached)
self.session = SETTINGS.device.attach(pid)
self.session.on("detached", on_detached)

# Load the script
script = session.create_script(self.script + "\n\n" + open(TEMPLATES_PATH / "repl.js").read())
script.set_log_handler(on_log)
script.on("destroyed", on_destroyed)
script.on("message", on_message)
info("Loading script")
script.load()

# Add the session & script finalizer
def finish_script():
if SETTINGS.exec_action != ExecutionAction.SPAWN:
script.unload()
session.detach()
else:
try:
SETTINGS.device.kill(pid)
info("Process killed")
except frida.ProcessNotFoundError:
info("Process already finished")
self.on_end.append(finish_script)
self.replace_script(self.script_source)

# Resume the process and connect to the REPL
if SETTINGS.exec_action == ExecutionAction.SPAWN:
SETTINGS.device.resume(pid)

self.progress = "Frinja running..."
evaluate = script.exports_sync.evaluate # RPC export defined in repl.js

for f in self.on_start:
bn.execute_on_main_thread(lambda: f(evaluate, self.cancel))
bn.execute_on_main_thread(lambda: f(self.evaluate, self.cancel))

while True:
if self.cancelled or self.finished:
break
time.sleep(1)

def _finalizer(self):
global FRIDA_RELOADER
self.progress = "Frinja cleaning up"

FRIDA_RELOADER = lambda: None
bn.execute_on_main_thread(lambda: CONSOLE.output.appendHtml("<br/>=== Script finished ===<br/>"))
for f in self.on_end:
bn.execute_on_main_thread(f)

Expand Down

0 comments on commit f085d7d

Please sign in to comment.