From bcd57a9af16a1007d5389147eb4ce6f291ab2465 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Thu, 26 Sep 2024 11:21:55 +0000 Subject: [PATCH] detect and use third-party analysis backends when possible (#2380) * introduce script to detect 3P backends ref #2376 * add idalib backend * binary ninja: search for API using XDG desktop entry ref #2376 * binja: search more XDG locations for desktop entry * binary ninja: optimize embedded PE scanning closes #2397 * add script for comparing the performance of analysis backends --- .github/pyinstaller/pyinstaller.spec | 3 + .gitignore | 1 + CHANGELOG.md | 3 + capa/features/extractors/binja/file.py | 54 +-- .../extractors/binja/find_binja_api.py | 160 ++++++++- capa/features/extractors/ida/extractor.py | 5 +- capa/features/extractors/ida/idalib.py | 113 +++++++ capa/helpers.py | 56 ++++ capa/ida/helpers.py | 13 + capa/loader.py | 60 ++-- capa/main.py | 2 + pyproject.toml | 2 + scripts/compare-backends.py | 316 ++++++++++++++++++ scripts/detect-backends.py | 106 ++++++ 14 files changed, 820 insertions(+), 74 deletions(-) create mode 100644 capa/features/extractors/ida/idalib.py create mode 100644 scripts/compare-backends.py create mode 100644 scripts/detect-backends.py diff --git a/.github/pyinstaller/pyinstaller.spec b/.github/pyinstaller/pyinstaller.spec index 021a2b294..e392eb5ae 100644 --- a/.github/pyinstaller/pyinstaller.spec +++ b/.github/pyinstaller/pyinstaller.spec @@ -70,7 +70,10 @@ a = Analysis( "qt5", "pyqtwebengine", "pyasn1", + # don't pull in Binary Ninja/IDA bindings that should + # only be installed locally. "binaryninja", + "ida", ], ) diff --git a/.gitignore b/.gitignore index ce07daf4a..997cef4cc 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,4 @@ Pipfile.lock .github/binja/download_headless.py .github/binja/BinaryNinja-headless.zip justfile +data/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 33ebf93b7..cc93d2d76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ ### New Features +- add IDA v9.0 backend via idalib #2376 @williballenthin +- locate Binary Ninja API using XDG Desktop Entries #2376 @williballenthin + ### Breaking Changes ### New Rules (7) diff --git a/capa/features/extractors/binja/file.py b/capa/features/extractors/binja/file.py index cd340e77d..d5bb5a7c5 100644 --- a/capa/features/extractors/binja/file.py +++ b/capa/features/extractors/binja/file.py @@ -5,8 +5,6 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. - -import struct from typing import Tuple, Iterator from binaryninja import Segment, BinaryView, SymbolType, SymbolBinding @@ -20,56 +18,24 @@ from capa.features.extractors.binja.helpers import read_c_string, unmangle_c_name -def check_segment_for_pe(bv: BinaryView, seg: Segment) -> Iterator[Tuple[int, int]]: - """check segment for embedded PE - - adapted for binja from: - https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19 - """ - mz_xor = [ - ( - capa.features.extractors.helpers.xor_static(b"MZ", i), - capa.features.extractors.helpers.xor_static(b"PE", i), - i, - ) - for i in range(256) - ] - - todo = [] - # If this is the first segment of the binary, skip the first bytes. Otherwise, there will always be a matched - # PE at the start of the binaryview. - start = seg.start - if bv.view_type == "PE" and start == bv.start: +def check_segment_for_pe(bv: BinaryView, seg: Segment) -> Iterator[Tuple[Feature, Address]]: + """check segment for embedded PE""" + start = 0 + if bv.view_type == "PE" and seg.start == bv.start: + # If this is the first segment of the binary, skip the first bytes. + # Otherwise, there will always be a matched PE at the start of the binaryview. start += 1 - for mzx, pex, i in mz_xor: - for off, _ in bv.find_all_data(start, seg.end, mzx): - todo.append((off, mzx, pex, i)) - - while len(todo): - off, mzx, pex, i = todo.pop() - - # The MZ header has one field we will check e_lfanew is at 0x3c - e_lfanew = off + 0x3C - - if seg.end < (e_lfanew + 4): - continue - - newoff = struct.unpack(" Iterator[Tuple[Feature, Address]]: """extract embedded PE features""" for seg in bv.segments: - for ea, _ in check_segment_for_pe(bv, seg): - yield Characteristic("embedded pe"), FileOffsetAddress(ea) + yield from check_segment_for_pe(bv, seg) def extract_file_export_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/binja/find_binja_api.py b/capa/features/extractors/binja/find_binja_api.py index 7412259f2..2a5dc6a93 100644 --- a/capa/features/extractors/binja/find_binja_api.py +++ b/capa/features/extractors/binja/find_binja_api.py @@ -5,31 +5,175 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import os +import sys +import logging import subprocess +import importlib.util +from typing import Optional from pathlib import Path +logger = logging.getLogger(__name__) + + # When the script gets executed as a standalone executable (via PyInstaller), `import binaryninja` does not work because # we have excluded the binaryninja module in `pyinstaller.spec`. The trick here is to call the system Python and try # to find out the path of the binaryninja module that has been installed. # Note, including the binaryninja module in the `pyinstaller.spec` would not work, since the binaryninja module tries to # find the binaryninja core e.g., `libbinaryninjacore.dylib`, using a relative path. And this does not work when the # binaryninja module is extracted by the PyInstaller. -code = r""" +CODE = r""" from pathlib import Path from importlib import util spec = util.find_spec('binaryninja') if spec is not None: if len(spec.submodule_search_locations) > 0: - path = Path(spec.submodule_search_locations[0]) - # encode the path with utf8 then convert to hex, make sure it can be read and restored properly - print(str(path.parent).encode('utf8').hex()) + path = Path(spec.submodule_search_locations[0]) + # encode the path with utf8 then convert to hex, make sure it can be read and restored properly + print(str(path.parent).encode('utf8').hex()) """ -def find_binja_path() -> Path: - raw_output = subprocess.check_output(["python", "-c", code]).decode("ascii").strip() - return Path(bytes.fromhex(raw_output).decode("utf8")) +def find_binaryninja_path_via_subprocess() -> Optional[Path]: + raw_output = subprocess.check_output(["python", "-c", CODE]).decode("ascii").strip() + output = bytes.fromhex(raw_output).decode("utf8") + if not output.strip(): + return None + return Path(output) + + +def get_desktop_entry(name: str) -> Optional[Path]: + """ + Find the path for the given XDG Desktop Entry name. + + Like: + + >> get_desktop_entry("com.vector35.binaryninja.desktop") + Path("~/.local/share/applications/com.vector35.binaryninja.desktop") + """ + assert sys.platform in ("linux", "linux2") + assert name.endswith(".desktop") + + data_dirs = os.environ.get("XDG_DATA_DIRS", "/usr/share") + f":{Path.home()}/.local/share" + for data_dir in data_dirs.split(":"): + applications = Path(data_dir) / "applications" + for application in applications.glob("*.desktop"): + if application.name == name: + return application + + return None + + +def get_binaryninja_path(desktop_entry: Path) -> Optional[Path]: + # from: Exec=/home/wballenthin/software/binaryninja/binaryninja %u + # to: /home/wballenthin/software/binaryninja/ + for line in desktop_entry.read_text(encoding="utf-8").splitlines(): + if not line.startswith("Exec="): + continue + + if not line.endswith("binaryninja %u"): + continue + + binaryninja_path = Path(line[len("Exec=") : -len("binaryninja %u")]) + if not binaryninja_path.exists(): + return None + + return binaryninja_path + + return None + + +def validate_binaryninja_path(binaryninja_path: Path) -> bool: + if not binaryninja_path: + return False + + module_path = binaryninja_path / "python" + if not module_path.is_dir(): + return False + + if not (module_path / "binaryninja" / "__init__.py").is_file(): + return False + + return True + + +def find_binaryninja() -> Optional[Path]: + binaryninja_path = find_binaryninja_path_via_subprocess() + if not binaryninja_path or not validate_binaryninja_path(binaryninja_path): + if sys.platform == "linux" or sys.platform == "linux2": + # ok + logger.debug("detected OS: linux") + elif sys.platform == "darwin": + logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform) + return False + elif sys.platform == "win32": + logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform) + return False + else: + logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform) + return False + + desktop_entry = get_desktop_entry("com.vector35.binaryninja.desktop") + if not desktop_entry: + logger.debug("failed to find Binary Ninja application") + return None + logger.debug("found Binary Ninja application: %s", desktop_entry) + + binaryninja_path = get_binaryninja_path(desktop_entry) + if not binaryninja_path: + logger.debug("failed to determine Binary Ninja installation path") + return None + + if not validate_binaryninja_path(binaryninja_path): + logger.debug("failed to validate Binary Ninja installation") + return None + + logger.debug("found Binary Ninja installation: %s", binaryninja_path) + + return binaryninja_path / "python" + + +def is_binaryninja_installed() -> bool: + """Is the binaryninja module ready to import?""" + try: + return importlib.util.find_spec("binaryninja") is not None + except ModuleNotFoundError: + return False + + +def has_binaryninja() -> bool: + if is_binaryninja_installed(): + logger.debug("found installed Binary Ninja API") + return True + + logger.debug("Binary Ninja API not installed, searching...") + + binaryninja_path = find_binaryninja() + if not binaryninja_path: + logger.debug("failed to find Binary Ninja installation") + + logger.debug("found Binary Ninja API: %s", binaryninja_path) + return binaryninja_path is not None + + +def load_binaryninja() -> bool: + try: + import binaryninja + + return True + except ImportError: + binaryninja_path = find_binaryninja() + if not binaryninja_path: + return False + + sys.path.append(binaryninja_path.absolute().as_posix()) + try: + import binaryninja # noqa: F401 unused import + + return True + except ImportError: + return False if __name__ == "__main__": - print(find_binja_path()) + print(find_binaryninja_path_via_subprocess()) diff --git a/capa/features/extractors/ida/extractor.py b/capa/features/extractors/ida/extractor.py index 806ef8e78..a2b4f7913 100644 --- a/capa/features/extractors/ida/extractor.py +++ b/capa/features/extractors/ida/extractor.py @@ -8,7 +8,6 @@ from typing import List, Tuple, Iterator import idaapi -import ida_nalt import capa.ida.helpers import capa.features.extractors.elf @@ -32,7 +31,9 @@ class IdaFeatureExtractor(StaticFeatureExtractor): def __init__(self): super().__init__( hashes=SampleHashes( - md5=ida_nalt.retrieve_input_file_md5(), sha1="(unknown)", sha256=ida_nalt.retrieve_input_file_sha256() + md5=capa.ida.helpers.retrieve_input_file_md5(), + sha1="(unknown)", + sha256=capa.ida.helpers.retrieve_input_file_sha256(), ) ) self.global_features: List[Tuple[Feature, Address]] = [] diff --git a/capa/features/extractors/ida/idalib.py b/capa/features/extractors/ida/idalib.py new file mode 100644 index 000000000..df1e3172e --- /dev/null +++ b/capa/features/extractors/ida/idalib.py @@ -0,0 +1,113 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import os +import sys +import json +import logging +import importlib.util +from typing import Optional +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def is_idalib_installed() -> bool: + try: + return importlib.util.find_spec("ida") is not None + except ModuleNotFoundError: + return False + + +def get_idalib_user_config_path() -> Optional[Path]: + """Get the path to the user's config file based on platform following IDA's user directories.""" + # derived from `py-activate-idalib.py` from IDA v9.0 Beta 4 + + if sys.platform == "win32": + # On Windows, use the %APPDATA%\Hex-Rays\IDA Pro directory + config_dir = Path(os.getenv("APPDATA")) / "Hex-Rays" / "IDA Pro" + else: + # On macOS and Linux, use ~/.idapro + config_dir = Path.home() / ".idapro" + + # Return the full path to the config file (now in JSON format) + user_config_path = config_dir / "ida-config.json" + if not user_config_path.exists(): + return None + return user_config_path + + +def find_idalib() -> Optional[Path]: + config_path = get_idalib_user_config_path() + if not config_path: + return None + + config = json.loads(config_path.read_text(encoding="utf-8")) + + try: + ida_install_dir = Path(config["Paths"]["ida-install-dir"]) + except KeyError: + return None + + if not ida_install_dir.exists(): + return None + + libname = { + "win32": "idalib.dll", + "linux": "libidalib.so", + "linux2": "libidalib.so", + "darwin": "libidalib.dylib", + }[sys.platform] + + if not (ida_install_dir / "ida.hlp").is_file(): + return None + + if not (ida_install_dir / libname).is_file(): + return None + + idalib_path = ida_install_dir / "idalib" / "python" + if not idalib_path.exists(): + return None + + if not (idalib_path / "ida" / "__init__.py").is_file(): + return None + + return idalib_path + + +def has_idalib() -> bool: + if is_idalib_installed(): + logger.debug("found installed IDA idalib API") + return True + + logger.debug("IDA idalib API not installed, searching...") + + idalib_path = find_idalib() + if not idalib_path: + logger.debug("failed to find IDA idalib installation") + + logger.debug("found IDA idalib API: %s", idalib_path) + return idalib_path is not None + + +def load_idalib() -> bool: + try: + import ida + + return True + except ImportError: + idalib_path = find_idalib() + if not idalib_path: + return False + + sys.path.append(idalib_path.absolute().as_posix()) + try: + import ida # noqa: F401 unused import + + return True + except ImportError: + return False diff --git a/capa/helpers.py b/capa/helpers.py index 237a67f62..f185db9e6 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -5,11 +5,14 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import io import os import sys import gzip +import ctypes import inspect import logging +import tempfile import contextlib import importlib.util from typing import Dict, List, Union, BinaryIO, Iterator, NoReturn @@ -81,6 +84,59 @@ def assert_never(value) -> NoReturn: assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011 +@contextlib.contextmanager +def stdout_redirector(stream): + """ + Redirect stdout at the C runtime level, + which lets us handle native libraries that spam stdout. + + *But*, this only works on Linux! Otherwise will silently still write to stdout. + So, try to upstream the fix when possible. + + Via: https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/ + """ + if sys.platform not in ("linux", "linux2"): + logger.warning("Unable to capture STDOUT on non-Linux (begin)") + yield + logger.warning("Unable to capture STDOUT on non-Linux (end)") + return + + # libc is only on Linux + LIBC = ctypes.CDLL(None) + C_STDOUT = ctypes.c_void_p.in_dll(LIBC, "stdout") + + # The original fd stdout points to. Usually 1 on POSIX systems. + original_stdout_fd = sys.stdout.fileno() + + def _redirect_stdout(to_fd): + """Redirect stdout to the given file descriptor.""" + # Flush the C-level buffer stdout + LIBC.fflush(C_STDOUT) + # Flush and close sys.stdout - also closes the file descriptor (fd) + sys.stdout.close() + # Make original_stdout_fd point to the same file as to_fd + os.dup2(to_fd, original_stdout_fd) + # Create a new sys.stdout that points to the redirected fd + sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb")) + + # Save a copy of the original stdout fd in saved_stdout_fd + saved_stdout_fd = os.dup(original_stdout_fd) + try: + # Create a temporary file and redirect stdout to it + tfile = tempfile.TemporaryFile(mode="w+b") + _redirect_stdout(tfile.fileno()) + # Yield to caller, then redirect stdout back to the saved fd + yield + _redirect_stdout(saved_stdout_fd) + # Copy contents of temporary file to the given stream + tfile.flush() + tfile.seek(0, io.SEEK_SET) + stream.write(tfile.read()) + finally: + tfile.close() + os.close(saved_stdout_fd) + + def load_json_from_path(json_path: Path): with gzip.open(json_path, "r") as compressed_report: try: diff --git a/capa/ida/helpers.py b/capa/ida/helpers.py index 547099f47..91f29f05e 100644 --- a/capa/ida/helpers.py +++ b/capa/ida/helpers.py @@ -14,6 +14,7 @@ import idc import idaapi import ida_ida +import ida_nalt import idautils import ida_bytes import ida_loader @@ -64,6 +65,12 @@ def is_64bit() -> bool: info: idaapi.idainfo = idaapi.get_inf_structure() return info.is_64bit() + def retrieve_input_file_md5() -> str: + return ida_nalt.retrieve_input_file_md5() + + def retrieve_input_file_sha256() -> str: + return ida_nalt.retrieve_input_file_sha256() + else: def get_filetype() -> "ida_ida.filetype_t": @@ -78,6 +85,12 @@ def is_32bit() -> bool: def is_64bit() -> bool: return idaapi.inf_is_64bit() + def retrieve_input_file_md5() -> str: + return ida_nalt.retrieve_input_file_md5().hex() + + def retrieve_input_file_sha256() -> str: + return ida_nalt.retrieve_input_file_sha256().hex() + def inform_user_ida_ui(message): # this isn't a logger, this is IDA's logging facility diff --git a/capa/loader.py b/capa/loader.py index 818198710..6dfc6be42 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -5,8 +5,8 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import io import os -import sys import logging import datetime import contextlib @@ -69,6 +69,7 @@ BACKEND_VMRAY = "vmray" BACKEND_FREEZE = "freeze" BACKEND_BINEXPORT2 = "binexport2" +BACKEND_IDA = "ida" class CorruptFile(ValueError): @@ -237,24 +238,15 @@ def get_extractor( return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path) elif backend == BACKEND_BINJA: - import capa.helpers - from capa.features.extractors.binja.find_binja_api import find_binja_path - - # When we are running as a standalone executable, we cannot directly import binaryninja - # We need to fist find the binja API installation path and add it into sys.path - if capa.helpers.is_running_standalone(): - bn_api = find_binja_path() - if bn_api.exists(): - sys.path.append(str(bn_api)) - - try: - import binaryninja - from binaryninja import BinaryView - except ImportError: - raise RuntimeError( - "Cannot import binaryninja module. Please install the Binary Ninja Python API first: " - + "https://docs.binary.ninja/dev/batch.html#install-the-api)." - ) + import capa.features.extractors.binja.find_binja_api as finder + + if not finder.has_binaryninja(): + raise RuntimeError("cannot find Binary Ninja API module.") + + if not finder.load_binaryninja(): + raise RuntimeError("failed to load Binary Ninja API module.") + + import binaryninja import capa.features.extractors.binja.extractor @@ -269,7 +261,7 @@ def get_extractor( raise UnsupportedOSError() with console.status("analyzing program...", spinner="dots"): - bv: BinaryView = binaryninja.load(str(input_path)) + bv: binaryninja.BinaryView = binaryninja.load(str(input_path)) if bv is None: raise RuntimeError(f"Binary Ninja cannot open file {input_path}") @@ -321,6 +313,34 @@ def get_extractor( return capa.features.extractors.binexport2.extractor.BinExport2FeatureExtractor(be2, buf) + elif backend == BACKEND_IDA: + import capa.features.extractors.ida.idalib as idalib + + if not idalib.has_idalib(): + raise RuntimeError("cannot find IDA idalib module.") + + if not idalib.load_idalib(): + raise RuntimeError("failed to load IDA idalib module.") + + import ida + import ida_auto + + import capa.features.extractors.ida.extractor + + logger.debug("idalib: opening database...") + # idalib writes to stdout (ugh), so we have to capture that + # so as not to screw up structured output. + with capa.helpers.stdout_redirector(io.BytesIO()): + with console.status("analyzing program...", spinner="dots"): + if ida.open_database(str(input_path), run_auto_analysis=True): + raise RuntimeError("failed to analyze input file") + + logger.debug("idalib: waiting for analysis...") + ida_auto.auto_wait() + logger.debug("idalib: opened database.") + + return capa.features.extractors.ida.extractor.IdaFeatureExtractor() + else: raise ValueError("unexpected backend: " + backend) diff --git a/capa/main.py b/capa/main.py index 8035eafa2..d7b45e03a 100644 --- a/capa/main.py +++ b/capa/main.py @@ -43,6 +43,7 @@ from capa.rules import RuleSet from capa.engine import MatchResults from capa.loader import ( + BACKEND_IDA, BACKEND_VIV, BACKEND_CAPE, BACKEND_BINJA, @@ -283,6 +284,7 @@ def install_common_args(parser, wanted=None): backends = [ (BACKEND_AUTO, "(default) detect appropriate backend automatically"), (BACKEND_VIV, "vivisect"), + (BACKEND_IDA, "IDA via idalib"), (BACKEND_PEFILE, "pefile (file features only)"), (BACKEND_BINJA, "Binary Ninja"), (BACKEND_DOTNET, ".NET"), diff --git a/pyproject.toml b/pyproject.toml index 2ceeed3f8..c60893c49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -183,7 +183,9 @@ known_first_party = [ "binaryninja", "flirt", "ghidra", + "ida", "ida_ida", + "ida_auto", "ida_bytes", "ida_entry", "ida_funcs", diff --git a/scripts/compare-backends.py b/scripts/compare-backends.py new file mode 100644 index 000000000..1c000bade --- /dev/null +++ b/scripts/compare-backends.py @@ -0,0 +1,316 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import sys +import json +import time +import logging +import argparse +import contextlib +import statistics +import subprocess +import multiprocessing +from typing import Set, Dict, List, Optional +from pathlib import Path +from collections import Counter +from dataclasses import dataclass +from multiprocessing import Pool + +import rich +import rich.box +import rich.table + +import capa.main + +logger = logging.getLogger("capa.compare-backends") + +BACKENDS = ("vivisect", "ida", "binja") + + +@dataclass +class CapaInvocation: + path: Path + backend: str + duration: float + returncode: int + stdout: Optional[str] + stderr: Optional[str] + err: Optional[str] + + +def invoke_capa(file: Path, backend: str) -> CapaInvocation: + stdout = None + stderr = None + err = None + returncode: int + try: + logger.debug("run capa: %s: %s", backend, file.name) + t1 = time.time() + child = subprocess.run( + ["python", "-m", "capa.main", "--json", "--backend=" + backend, str(file)], + capture_output=True, + check=True, + text=True, + encoding="utf-8", + ) + returncode = child.returncode + stdout = child.stdout + stderr = child.stderr + except subprocess.CalledProcessError as e: + returncode = e.returncode + stdout = e.stdout + stderr = e.stderr + + logger.debug("%s:%s: error", backend, file.name) + err = str(e) + else: + pass + finally: + t2 = time.time() + + return CapaInvocation( + path=file, + backend=backend, + duration=t2 - t1, + returncode=returncode, + stdout=stdout, + stderr=stderr, + err=err, + ) + + +def wrapper_invoke_capa(args): + file, backend = args + return invoke_capa(file, backend) + + +def collect(args): + results_path = args.results_path + if not results_path.is_file(): + default_doc = {backend: {} for backend in BACKENDS} # type: ignore + results_path.write_text(json.dumps(default_doc), encoding="utf-8") + + testfiles = Path(__file__).parent.parent / "tests" / "data" + + for file in sorted(p for p in testfiles.glob("*")): + # remove leftover analysis files + # because IDA doesn't cleanup after itself, currently. + if file.suffix in (".til", ".id0", ".id1", ".id2", ".nam", ".viv"): + logger.debug("removing: %s", file) + with contextlib.suppress(IOError): + file.unlink() + + doc = json.loads(results_path.read_text(encoding="utf-8")) + + plan = [] + for file in sorted(p for p in testfiles.glob("*")): + if not file.is_file(): + continue + + if file.is_dir(): + continue + + if file.name.startswith("."): + continue + + if file.suffix not in (".exe_", ".dll_", ".elf_", ""): + continue + + logger.debug("%s", file.name) + key = str(file) + + for backend in BACKENDS: + + if (backend, file.name) in { + ("binja", "0953cc3b77ed2974b09e3a00708f88de931d681e2d0cb64afbaf714610beabe6.exe_") + }: + # this file takes 38GB+ and 20hrs+ + # https://github.com/Vector35/binaryninja-api/issues/5951 + continue + + if key in doc[backend]: + if not args.retry_failures: + continue + + if not doc[backend][key]["err"]: + # didn't previously fail, don't repeat work + continue + + else: + # want to retry this previous failure + pass + + plan.append((file, backend)) + + pool_size = multiprocessing.cpu_count() // 2 + logger.info("work pool size: %d", pool_size) + with Pool(processes=pool_size) as pool: + for i, result in enumerate(pool.imap_unordered(wrapper_invoke_capa, plan)): + doc[result.backend][str(result.path)] = { + "path": str(result.path), + "returncode": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr, + "err": result.err, + "duration": result.duration, + } + + if i % 8 == 0: + logger.info("syncing output database") + results_path.write_text(json.dumps(doc)) + + logger.info( + "%.1f\t%s %s %s", + result.duration, + "(err)" if result.err else " ", + result.backend.ljust(8), + result.path.name, + ) + + results_path.write_text(json.dumps(doc)) + return + + +def report(args): + doc = json.loads(args.results_path.read_text(encoding="utf-8")) + + samples = set() + for backend in BACKENDS: + samples.update(doc[backend].keys()) + + failures_by_backend: Dict[str, Set[str]] = {backend: set() for backend in BACKENDS} + durations_by_backend: Dict[str, List[float]] = {backend: [] for backend in BACKENDS} + + console = rich.get_console() + for key in sorted(samples): + sample = Path(key).name + console.print(sample, style="bold") + + seen_rules: Counter[str] = Counter() + + rules_by_backend: Dict[str, Set[str]] = {backend: set() for backend in BACKENDS} + + for backend in BACKENDS: + if key not in doc[backend]: + continue + + entry = doc[backend][key] + duration = entry["duration"] + + if not entry["err"]: + matches = json.loads(entry["stdout"])["rules"].keys() + seen_rules.update(matches) + rules_by_backend[backend].update(matches) + durations_by_backend[backend].append(duration) + + console.print(f" {backend: >8}: {duration: >6.1f}s {len(matches): >3d} matches") + + else: + failures_by_backend[backend].add(sample) + console.print(f" {backend: >8}: {duration: >6.1f}s (error)") + + if not seen_rules: + console.print() + continue + + t = rich.table.Table(box=rich.box.SIMPLE, header_style="default") + t.add_column("viv") + t.add_column("ida") + t.add_column("bn") + t.add_column("rule") + + for rule, _ in seen_rules.most_common(): + t.add_row( + "x" if rule in rules_by_backend["vivisect"] else " ", + "x" if rule in rules_by_backend["ida"] else " ", + "x" if rule in rules_by_backend["binja"] else " ", + rule, + ) + + console.print(t) + + for backend in BACKENDS: + console.print(f"failures for {backend}:", style="bold") + for failure in sorted(failures_by_backend[backend]): + console.print(f" - {failure}") + + if not failures_by_backend[backend]: + console.print(" (none)", style="green") + console.print() + + console.print("durations:", style="bold") + console.print(" (10-quantiles, in seconds)", style="grey37") + for backend in BACKENDS: + q = statistics.quantiles(durations_by_backend[backend], n=10) + console.print(f" {backend: <8}: ", end="") + for i in range(9): + if i in (4, 8): + style = "bold" + else: + style = "default" + console.print(f"{q[i]: >6.1f}", style=style, end=" ") + console.print() + console.print(" ^-- 10% of samples took less than this ^", style="grey37") + console.print(" 10% of samples took more than this -----------------+", style="grey37") + + console.print() + for backend in BACKENDS: + total = sum(durations_by_backend[backend]) + successes = len(durations_by_backend[backend]) + avg = statistics.mean(durations_by_backend[backend]) + console.print( + f" {backend: <8}: {total: >7.0f} seconds across {successes: >4d} successful runs, {avg: >4.1f} average" + ) + console.print() + + console.print("slowest samples:", style="bold") + for backend in BACKENDS: + console.print(backend) + for duration, path in sorted( + ((d["duration"], Path(d["path"]).name) for d in doc[backend].values()), reverse=True + )[:5]: + console.print(f" - {duration: >6.1f} {path}") + + return + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + default_samples_path = Path(__file__).resolve().parent.parent / "tests" / "data" + + parser = argparse.ArgumentParser(description="Compare analysis backends.") + capa.main.install_common_args( + parser, + wanted=set(), + ) + + subparsers = parser.add_subparsers() + collect_parser = subparsers.add_parser("collect") + collect_parser.add_argument("results_path", type=Path, help="Path to output JSON file") + collect_parser.add_argument("--samples", type=Path, default=default_samples_path, help="Path to samples") + collect_parser.add_argument("--retry-failures", action="store_true", help="Retry previous failures") + collect_parser.set_defaults(func=collect) + + report_parser = subparsers.add_parser("report") + report_parser.add_argument("results_path", type=Path, help="Path to JSON file") + report_parser.set_defaults(func=report) + + args = parser.parse_args(args=argv) + + try: + capa.main.handle_common_args(args) + except capa.main.ShouldExitError as e: + return e.status_code + + args.func(args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/detect-backends.py b/scripts/detect-backends.py new file mode 100644 index 000000000..2840058f5 --- /dev/null +++ b/scripts/detect-backends.py @@ -0,0 +1,106 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import sys +import logging +import argparse +import importlib.util + +import rich +import rich.table + +import capa.main +from capa.features.extractors.ida.idalib import find_idalib, load_idalib, is_idalib_installed +from capa.features.extractors.binja.find_binja_api import find_binaryninja, load_binaryninja, is_binaryninja_installed + +logger = logging.getLogger(__name__) + + +def is_vivisect_installed() -> bool: + try: + return importlib.util.find_spec("vivisect") is not None + except ModuleNotFoundError: + return False + + +def load_vivisect() -> bool: + try: + import vivisect # noqa: F401 unused import + + return True + except ImportError: + return False + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser(description="Detect analysis backends.") + capa.main.install_common_args(parser, wanted=set()) + args = parser.parse_args(args=argv) + + try: + capa.main.handle_common_args(args) + except capa.main.ShouldExitError as e: + return e.status_code + + if args.debug: + logging.getLogger("capa").setLevel(logging.DEBUG) + logging.getLogger("viv_utils").setLevel(logging.DEBUG) + else: + logging.getLogger("capa").setLevel(logging.ERROR) + logging.getLogger("viv_utils").setLevel(logging.ERROR) + + table = rich.table.Table() + table.add_column("backend") + table.add_column("already installed?") + table.add_column("found?") + table.add_column("loads?") + + if True: + row = ["vivisect"] + if is_vivisect_installed(): + row.append("True") + row.append("-") + else: + row.append("False") + row.append("False") + + row.append(str(load_vivisect())) + table.add_row(*row) + + if True: + row = ["Binary Ninja"] + if is_binaryninja_installed(): + row.append("True") + row.append("-") + else: + row.append("False") + row.append(str(find_binaryninja() is not None)) + + row.append(str(load_binaryninja())) + table.add_row(*row) + + if True: + row = ["IDA idalib"] + if is_idalib_installed(): + row.append("True") + row.append("-") + else: + row.append("False") + row.append(str(find_idalib() is not None)) + + row.append(str(load_idalib())) + table.add_row(*row) + + rich.print(table) + + +if __name__ == "__main__": + sys.exit(main())