diff --git a/CHANGELOG.md b/CHANGELOG.md index 095682a5f..3edb6863c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### New Features - ELF: implement file import and export name extractor #1607 #1608 @Aayush-Goel-04 - bump pydantic from 1.10.9 to 2.1.1 #1582 @Aayush-Goel-04 +- develop script to highlight the features that are not used during matching #331 @Aayush-Goel-04 ### Breaking Changes diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 45d822a56..688b1733a 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -738,6 +738,33 @@ def extract_subscope_rules(self): yield from self._extract_subscope_rules_rec(self.statement) + def _extract_all_features_rec(self, statement) -> Set[Feature]: + feature_set: Set[Feature] = set() + + for child in statement.get_children(): + if isinstance(child, Statement): + feature_set.update(self._extract_all_features_rec(child)) + else: + feature_set.add(child) + return feature_set + + def extract_all_features(self) -> Set[Feature]: + """ + recursively extracts all feature statements in this rule. + + returns: + set: A set of all feature statements contained within this rule. + """ + if not isinstance(self.statement, ceng.Statement): + # For rules with single feature like + # anti-analysis\obfuscation\obfuscated-with-advobfuscator.yml + # contains a single feature - substring , which is of type String + return { + self.statement, + } + + return self._extract_all_features_rec(self.statement) + def evaluate(self, features: FeatureSet, short_circuit=True): capa.perf.counters["evaluate.feature"] += 1 capa.perf.counters["evaluate.feature.rule"] += 1 diff --git a/scripts/detect_duplicate_features.py b/scripts/detect_duplicate_features.py index dd9b98384..6737d7fa9 100644 --- a/scripts/detect_duplicate_features.py +++ b/scripts/detect_duplicate_features.py @@ -8,38 +8,17 @@ import sys import logging import argparse +from typing import Set from pathlib import Path import capa.main import capa.rules -import capa.engine as ceng +from capa.features.common import Feature logger = logging.getLogger("detect_duplicate_features") -def get_child_features(feature: ceng.Statement) -> list: - """ - Recursively extracts all feature statements from a given rule statement. - - Args: - feature (capa.engine.Statement): The feature statement to extract features from. - - Returns: - list: A list of all feature statements contained within the given feature statement. - """ - children = [] - - if isinstance(feature, (ceng.And, ceng.Or, ceng.Some)): - for child in feature.children: - children.extend(get_child_features(child)) - elif isinstance(feature, (ceng.Subscope, ceng.Range, ceng.Not)): - children.extend(get_child_features(feature.child)) - else: - children.append(feature) - return children - - -def get_features(rule_path: str) -> list: +def get_features(rule_path: str) -> Set[Feature]: """ Extracts all features from a given rule file. @@ -47,17 +26,15 @@ def get_features(rule_path: str) -> list: rule_path (str): The path to the rule file to extract features from. Returns: - list: A list of all feature statements contained within the rule file. + set: A set of all feature statements contained within the rule file. """ - feature_list = [] with Path(rule_path).open("r", encoding="utf-8") as f: try: new_rule = capa.rules.Rule.from_yaml(f.read()) - feature_list = get_child_features(new_rule.statement) + return new_rule.extract_all_features() except Exception as e: logger.error("Error: New rule %s %s %s", rule_path, str(type(e)), str(e)) sys.exit(-1) - return feature_list def find_overlapping_rules(new_rule_path, rules_path): @@ -67,7 +44,6 @@ def find_overlapping_rules(new_rule_path, rules_path): # Loads features of new rule in a list. new_rule_features = get_features(new_rule_path) - count = 0 overlapping_rules = [] @@ -75,7 +51,7 @@ def find_overlapping_rules(new_rule_path, rules_path): ruleset = capa.main.get_rules(rules_path) for rule_name, rule in ruleset.rules.items(): - rule_features = get_child_features(rule.statement) + rule_features = rule.extract_all_features() if not len(rule_features): continue diff --git a/scripts/show-unused-features.py b/scripts/show-unused-features.py new file mode 100644 index 000000000..dbd6c8c89 --- /dev/null +++ b/scripts/show-unused-features.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +""" +Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. +You may obtain a copy of the License at: [package root]/LICENSE.txt +Unless required by applicable law or agreed to in writing, software distributed under the License + is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +""" +import os +import sys +import typing +import logging +import argparse +from typing import Set, Tuple +from pathlib import Path +from collections import Counter + +import tabulate +from termcolor import colored + +import capa.main +import capa.rules +import capa.helpers +import capa.features +import capa.exceptions +import capa.render.verbose as v +import capa.features.common +import capa.features.freeze +import capa.features.address +import capa.features.extractors.pefile +import capa.features.extractors.base_extractor +from capa.helpers import log_unsupported_runtime_error +from capa.features.common import Feature +from capa.features.extractors.base_extractor import FunctionHandle + +logger = logging.getLogger("show-unused-features") + + +def format_address(addr: capa.features.address.Address) -> str: + return v.format_address(capa.features.freeze.Address.from_capa((addr))) + + +def get_rules_feature_set(rules_path) -> Set[Feature]: + ruleset = capa.main.get_rules(rules_path) + rules_feature_set: Set[Feature] = set() + for _, rule in ruleset.rules.items(): + rules_feature_set.update(rule.extract_all_features()) + + return rules_feature_set + + +def get_file_features( + functions: Tuple[FunctionHandle, ...], extractor: capa.features.extractors.base_extractor.FeatureExtractor +) -> typing.Counter[Feature]: + feature_map: typing.Counter[Feature] = Counter() + + for f in functions: + if extractor.is_library_function(f.address): + function_name = extractor.get_function_name(f.address) + logger.debug("skipping library function %s (%s)", format_address(f.address), function_name) + continue + + for feature, _ in extractor.extract_function_features(f): + if capa.features.common.is_global_feature(feature): + continue + feature_map.update([feature]) + + for bb in extractor.get_basic_blocks(f): + for feature, _ in extractor.extract_basic_block_features(f, bb): + if capa.features.common.is_global_feature(feature): + continue + feature_map.update([feature]) + + for insn in extractor.get_instructions(f, bb): + for feature, _ in extractor.extract_insn_features(f, bb, insn): + if capa.features.common.is_global_feature(feature): + continue + feature_map.update([feature]) + return feature_map + + +def get_colored(s: str): + if "(" in s and ")" in s: + s_split = s.split("(", 1) + s_color = colored(s_split[1][:-1], "cyan") + return f"{s_split[0]}({s_color})" + else: + return colored(s, "cyan") + + +def print_unused_features(feature_map: typing.Counter[Feature], rules_feature_set: Set[Feature]): + unused_features = [] + for feature, count in reversed(feature_map.most_common()): + if feature in rules_feature_set: + continue + unused_features.append((str(count), get_colored(str(feature)))) + print("\n") + print(tabulate.tabulate(unused_features, headers=["Count", "Feature"], tablefmt="plain")) + print("\n") + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + parser = argparse.ArgumentParser(description="Show the features that capa doesn't have rules for yet") + capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend", "rules"}) + + parser.add_argument("-F", "--function", type=str, help="Show features for specific function") + args = parser.parse_args(args=argv) + capa.main.handle_common_args(args) + + if args.function and args.backend == "pefile": + print("pefile backend does not support extracting function features") + return -1 + + try: + taste = capa.helpers.get_file_taste(Path(args.sample)) + except IOError as e: + logger.error("%s", str(e)) + return -1 + + try: + sig_paths = capa.main.get_signatures(args.signatures) + except IOError as e: + logger.error("%s", str(e)) + return -1 + + if (args.format == "freeze") or ( + args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) + ): + extractor = capa.features.freeze.load(Path(args.sample).read_bytes()) + else: + should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) + try: + extractor = capa.main.get_extractor( + args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace + ) + except capa.exceptions.UnsupportedFormatError: + capa.helpers.log_unsupported_format_error() + return -1 + except capa.exceptions.UnsupportedRuntimeError: + log_unsupported_runtime_error() + return -1 + + feature_map: typing.Counter[Feature] = Counter() + + feature_map.update([feature for feature, _ in extractor.extract_global_features()]) + + function_handles: Tuple[FunctionHandle, ...] + if isinstance(extractor, capa.features.extractors.pefile.PefileFeatureExtractor): + # pefile extractor doesn't extract function features + function_handles = () + else: + function_handles = tuple(extractor.get_functions()) + + if args.function: + if args.format == "freeze": + function_handles = tuple(filter(lambda fh: fh.address == args.function, function_handles)) + else: + function_handles = tuple(filter(lambda fh: format_address(fh.address) == args.function, function_handles)) + + if args.function not in [format_address(fh.address) for fh in function_handles]: + print(f"{args.function} not a function") + return -1 + + if len(function_handles) == 0: + print(f"{args.function} not a function") + return -1 + + feature_map.update(get_file_features(function_handles, extractor)) + + rules_feature_set = get_rules_feature_set(args.rules) + + print_unused_features(feature_map, rules_feature_set) + return 0 + + +def ida_main(): + import idc + + import capa.main + import capa.features.extractors.ida.extractor + + function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) + print(f"getting features for current function {hex(function)}") + + extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() + feature_map: typing.Counter[Feature] = Counter() + + feature_map.update([feature for feature, _ in extractor.extract_file_features()]) + + function_handles = tuple(extractor.get_functions()) + + if function: + function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) + + if len(function_handles) == 0: + print(f"{hex(function)} not a function") + return -1 + + feature_map.update(get_file_features(function_handles, extractor)) + + rules_path = capa.main.get_default_root() / "rules" + rules_feature_set = get_rules_feature_set([rules_path]) + + print_unused_features(feature_map, rules_feature_set) + + return 0 + + +if __name__ == "__main__": + if capa.helpers.is_runtime_ida(): + ida_main() + else: + sys.exit(main()) diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 4baa96a97..7c91bc573 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -45,6 +45,7 @@ def get_rule_path(): pytest.param("show-capabilities-by-function.py", [get_file_path()]), pytest.param("show-features.py", [get_file_path()]), pytest.param("show-features.py", ["-F", "0x407970", get_file_path()]), + pytest.param("show-unused-features.py", [get_file_path()]), pytest.param("capa_as_library.py", [get_file_path()]), ], )