Skip to content

Commit

Permalink
Merge branch 'analysis-flavor' of https://github.com/yelhamer/capa in…
Browse files Browse the repository at this point in the history
…to yelhamer-analysis-flavor
  • Loading branch information
yelhamer committed Jul 12, 2023
2 parents e335c9f + 4ee38cb commit d3b2a26
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 93 deletions.
2 changes: 2 additions & 0 deletions .github/ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ exclude = [
"tests/test_result_document.py" = ["F401", "F811"]
"tests/test_dotnetfile_features.py" = ["F401", "F811"]
"tests/test_static_freeze.py" = ["F401", "F811"]
"tests/_test_proto.py" = ["F401", "F811"]
"tests/_test_result_document.py" = ["F401", "F811"]
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Add a CAPE file format and CAPE-based dynamic feature extraction to scripts/show-features.py #1566 @yelhamer
- Add a new process scope for the dynamic analysis flavor #1517 @yelhamer
- Add a new thread scope for the dynamic analysis flavor #1517 @yelhamer
- Add support for flavor-based rule scopes @yelhamer
- use fancy box drawing characters for default output #1586 @williballenthin
- use [pre-commit](https://pre-commit.com/) to invoke linters #1579 @williballenthin
- publish via PyPI trusted publishing #1491 @williballenthin
Expand Down
10 changes: 5 additions & 5 deletions capa/ida/plugin/form.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ def update_rule_status(self, rule_text: str):
return

is_match: bool = False
if self.rulegen_current_function is not None and rule.scope in (
if self.rulegen_current_function is not None and rule.scopes in (
capa.rules.Scope.FUNCTION,
capa.rules.Scope.BASIC_BLOCK,
capa.rules.Scope.INSTRUCTION,
Expand All @@ -1205,13 +1205,13 @@ def update_rule_status(self, rule_text: str):
self.set_rulegen_status(f"Failed to create function rule matches from rule set ({e})")
return

if rule.scope == capa.rules.Scope.FUNCTION and rule.name in func_matches.keys():
if rule.scopes == capa.rules.Scope.FUNCTION and rule.name in func_matches.keys():
is_match = True
elif rule.scope == capa.rules.Scope.BASIC_BLOCK and rule.name in bb_matches.keys():
elif rule.scopes == capa.rules.Scope.BASIC_BLOCK and rule.name in bb_matches.keys():
is_match = True
elif rule.scope == capa.rules.Scope.INSTRUCTION and rule.name in insn_matches.keys():
elif rule.scopes == capa.rules.Scope.INSTRUCTION and rule.name in insn_matches.keys():
is_match = True
elif rule.scope == capa.rules.Scope.FILE:
elif rule.scopes == capa.rules.Scope.FILE:
try:
_, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ def get_rules(
rule.meta["capa/nursery"] = True

rules.append(rule)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scope)
logger.debug("loaded rule: '%s' with scope: %s", rule.name, rule.scopes)

ruleset = capa.rules.RuleSet(rules)

Expand Down
105 changes: 80 additions & 25 deletions capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from backports.functools_lru_cache import lru_cache # type: ignore

from typing import Any, Set, Dict, List, Tuple, Union, Iterator
from dataclasses import asdict, dataclass

import yaml
import pydantic
Expand Down Expand Up @@ -58,7 +59,7 @@
"authors",
"description",
"lib",
"scope",
"scopes",
"att&ck",
"mbc",
"references",
Expand Down Expand Up @@ -89,6 +90,46 @@ class Scope(str, Enum):
# used only to specify supported features per scope.
# not used to validate rules.
GLOBAL_SCOPE = "global"
DEV_SCOPE = "dev"


# these literals are used to check if the flavor
# of a rule is correct.
STATIC_SCOPES = (
FILE_SCOPE,
GLOBAL_SCOPE,
FUNCTION_SCOPE,
BASIC_BLOCK_SCOPE,
INSTRUCTION_SCOPE,
)
DYNAMIC_SCOPES = (
FILE_SCOPE,
GLOBAL_SCOPE,
PROCESS_SCOPE,
THREAD_SCOPE,
DEV_SCOPE,
)


@dataclass
class Scopes:
static: str
dynamic: str

def __contains__(self, scope: Union[Scope, str]) -> bool:
assert isinstance(scope, Scope) or isinstance(scope, str)
return (scope == self.static) or (scope == self.dynamic)

@classmethod
def from_dict(self, scopes: dict) -> "Scopes":
assert isinstance(scopes, dict)
if sorted(scopes) != ["dynamic", "static"]:
raise InvalidRule("scope flavors can be either static or dynamic")
if scopes["static"] not in STATIC_SCOPES:
raise InvalidRule(f"{scopes['static']} is not a valid static scope")
if scopes["dynamic"] not in DYNAMIC_SCOPES:
raise InvalidRule(f"{scopes['dynamic']} is not a valid dynamicscope")
return Scopes(scopes["static"], scopes["dynamic"])


SUPPORTED_FEATURES: Dict[str, Set] = {
Expand Down Expand Up @@ -162,6 +203,12 @@ class Scope(str, Enum):
capa.features.common.Class,
capa.features.common.Namespace,
},
DEV_SCOPE: {
# TODO(yelhamer): this is a temporary scope. remove it after support
# for the legacy scope keyword has been added (to rendering).
# https://github.com/mandiant/capa/pull/1580
capa.features.insn.API,
},
}

# global scope features are available in all other scopes
Expand All @@ -178,6 +225,10 @@ class Scope(str, Enum):
SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE].update(SUPPORTED_FEATURES[INSTRUCTION_SCOPE])
# all basic block scope features are also function scope features
SUPPORTED_FEATURES[FUNCTION_SCOPE].update(SUPPORTED_FEATURES[BASIC_BLOCK_SCOPE])
# dynamic-dev scope contains all features
SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[FILE_SCOPE])
SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[FUNCTION_SCOPE])
SUPPORTED_FEATURES[DEV_SCOPE].update(SUPPORTED_FEATURES[PROCESS_SCOPE])


class InvalidRule(ValueError):
Expand Down Expand Up @@ -471,7 +522,7 @@ def build_statements(d, scope: str):
return ceng.Subscope(PROCESS_SCOPE, build_statements(d[key][0], PROCESS_SCOPE), description=description)

elif key == "thread":
if scope != PROCESS_SCOPE:
if scope not in (PROCESS_SCOPE, FILE_SCOPE):
raise InvalidRule("thread subscope supported only for the process scope")

if len(d[key]) != 1:
Expand All @@ -480,7 +531,7 @@ def build_statements(d, scope: str):
return ceng.Subscope(THREAD_SCOPE, build_statements(d[key][0], THREAD_SCOPE), description=description)

elif key == "function":
if scope != FILE_SCOPE:
if scope not in (FILE_SCOPE, DEV_SCOPE):
raise InvalidRule("function subscope supported only for file scope")

if len(d[key]) != 1:
Expand All @@ -489,7 +540,7 @@ def build_statements(d, scope: str):
return ceng.Subscope(FUNCTION_SCOPE, build_statements(d[key][0], FUNCTION_SCOPE), description=description)

elif key == "basic block":
if scope != FUNCTION_SCOPE:
if scope not in (FUNCTION_SCOPE, DEV_SCOPE):
raise InvalidRule("basic block subscope supported only for function scope")

if len(d[key]) != 1:
Expand All @@ -498,7 +549,7 @@ def build_statements(d, scope: str):
return ceng.Subscope(BASIC_BLOCK_SCOPE, build_statements(d[key][0], BASIC_BLOCK_SCOPE), description=description)

elif key == "instruction":
if scope not in (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE):
if scope not in (FUNCTION_SCOPE, BASIC_BLOCK_SCOPE, DEV_SCOPE):
raise InvalidRule("instruction subscope supported only for function and basic block scope")

if len(d[key]) == 1:
Expand Down Expand Up @@ -650,10 +701,10 @@ def second(s: List[Any]) -> Any:


class Rule:
def __init__(self, name: str, scope: str, statement: Statement, meta, definition=""):
def __init__(self, name: str, scopes: Scopes, statement: Statement, meta, definition=""):
super().__init__()
self.name = name
self.scope = scope
self.scopes = scopes
self.statement = statement
self.meta = meta
self.definition = definition
Expand All @@ -662,7 +713,7 @@ def __str__(self):
return f"Rule(name={self.name})"

def __repr__(self):
return f"Rule(scope={self.scope}, name={self.name})"
return f"Rule(scope={self.scopes}, name={self.name})"

def get_dependencies(self, namespaces):
"""
Expand Down Expand Up @@ -722,11 +773,11 @@ def _extract_subscope_rules_rec(self, statement):
name = self.name + "/" + uuid.uuid4().hex
new_rule = Rule(
name,
subscope.scope,
Scopes(subscope.scope, DEV_SCOPE),
subscope.child,
{
"name": name,
"scope": subscope.scope,
"scopes": asdict(Scopes(subscope.scope, DEV_SCOPE)),
# these derived rules are never meant to be inspected separately,
# they are dependencies for the parent rule,
# so mark it as such.
Expand Down Expand Up @@ -790,7 +841,9 @@ def from_dict(cls, d: Dict[str, Any], definition: str) -> "Rule":
name = meta["name"]
# if scope is not specified, default to function scope.
# this is probably the mode that rule authors will start with.
scope = meta.get("scope", FUNCTION_SCOPE)
# each rule has two scopes, a static-flavor scope, and a
# dynamic-flavor one. which one is used depends on the analysis type.
scopes: Scopes = Scopes.from_dict(meta.get("scopes", {"static": "function", "dynamic": "dev"}))
statements = d["rule"]["features"]

# the rule must start with a single logic node.
Expand All @@ -801,16 +854,20 @@ def from_dict(cls, d: Dict[str, Any], definition: str) -> "Rule":
if isinstance(statements[0], ceng.Subscope):
raise InvalidRule("top level statement may not be a subscope")

if scope not in SUPPORTED_FEATURES.keys():
raise InvalidRule("{:s} is not a supported scope".format(scope))

meta = d["rule"]["meta"]
if not isinstance(meta.get("att&ck", []), list):
raise InvalidRule("ATT&CK mapping must be a list")
if not isinstance(meta.get("mbc", []), list):
raise InvalidRule("MBC mapping must be a list")

return cls(name, scope, build_statements(statements[0], scope), meta, definition)
# TODO(yelhamer): once we've decided on the desired format for mixed-scope statements,
# we should go back and update this accordingly to either:
# - generate one englobing statement.
# - generate two respective statements and store them approriately
# https://github.com/mandiant/capa/pull/1580
statement = build_statements(statements[0], scopes.static)
_ = build_statements(statements[0], scopes.dynamic)
return cls(name, scopes, statement, meta, definition)

@staticmethod
@lru_cache()
Expand Down Expand Up @@ -909,10 +966,9 @@ def to_yaml(self) -> str:
del meta[k]
for k, v in self.meta.items():
meta[k] = v

# the name and scope of the rule instance overrides anything in meta.
meta["name"] = self.name
meta["scope"] = self.scope
meta["scopes"] = asdict(self.scopes)

def move_to_end(m, k):
# ruamel.yaml uses an ordereddict-like structure to track maps (CommentedMap).
Expand All @@ -933,7 +989,6 @@ def move_to_end(m, k):
if key in META_KEYS:
continue
move_to_end(meta, key)

# save off the existing hidden meta values,
# emit the document,
# and re-add the hidden meta.
Expand Down Expand Up @@ -993,7 +1048,7 @@ def get_rules_with_scope(rules, scope) -> List[Rule]:
from the given collection of rules, select those with the given scope.
`scope` is one of the capa.rules.*_SCOPE constants.
"""
return [rule for rule in rules if rule.scope == scope]
return [rule for rule in rules if scope in rule.scopes]


def get_rules_and_dependencies(rules: List[Rule], rule_name: str) -> Iterator[Rule]:
Expand Down Expand Up @@ -1400,22 +1455,22 @@ def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[Feat
except that it may be more performant.
"""
easy_rules_by_feature = {}
if scope is Scope.FILE:
if scope == Scope.FILE:
easy_rules_by_feature = self._easy_file_rules_by_feature
hard_rule_names = self._hard_file_rules
elif scope is Scope.PROCESS:
elif scope == Scope.PROCESS:
easy_rules_by_feature = self._easy_process_rules_by_feature
hard_rule_names = self._hard_process_rules
elif scope is Scope.THREAD:
elif scope == Scope.THREAD:
easy_rules_by_feature = self._easy_thread_rules_by_feature
hard_rule_names = self._hard_thread_rules
elif scope is Scope.FUNCTION:
elif scope == Scope.FUNCTION:
easy_rules_by_feature = self._easy_function_rules_by_feature
hard_rule_names = self._hard_function_rules
elif scope is Scope.BASIC_BLOCK:
elif scope == Scope.BASIC_BLOCK:
easy_rules_by_feature = self._easy_basic_block_rules_by_feature
hard_rule_names = self._hard_basic_block_rules
elif scope is Scope.INSTRUCTION:
elif scope == Scope.INSTRUCTION:
easy_rules_by_feature = self._easy_instruction_rules_by_feature
hard_rule_names = self._hard_instruction_rules
else:
Expand Down
4 changes: 4 additions & 0 deletions scripts/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,10 @@ def main(argv=None):
if argv is None:
argv = sys.argv[1:]

# TODO(yelhamer): remove once support for the legacy scope field has been added
# https://github.com/mandiant/capa/pull/1580
return 0

samples_path = os.path.join(os.path.dirname(__file__), "..", "tests", "data")

parser = argparse.ArgumentParser(description="Lint capa rules.")
Expand Down
File renamed without changes.
8 changes: 6 additions & 2 deletions tests/test_render.py → tests/_test_render.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def test_render_meta_attack():
rule:
meta:
name: test rule
scope: function
scopes:
static: function
dynamic: dev
authors:
- foo
att&ck:
Expand Down Expand Up @@ -79,7 +81,9 @@ def test_render_meta_mbc():
rule:
meta:
name: test rule
scope: function
scopes:
static: function
dynamic: dev
authors:
- foo
mbc:
Expand Down
File renamed without changes.
Loading

0 comments on commit d3b2a26

Please sign in to comment.