-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathForwardAnalysis.py
51 lines (42 loc) · 1.66 KB
/
ForwardAnalysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from collections import defaultdict
from CFG import op_str
class ForwardAnalysis(object):
def __init__(self, cfg, entry_state=None):
self.cfg = cfg
self.before_states = defaultdict(lambda: self.empty_state())
self.after_states = defaultdict(lambda: self.empty_state())
self.entry_state = entry_state if entry_state is not None else self.empty_state()
self.before_states[self.cfg.start] = self.entry_state
self.analyze()
def analyze(self):
work_list = [self.cfg.start]
while len(work_list) > 0:
op = work_list.pop()
in_state = self.before_states[op]
out_state = self.flow_func(in_state, op)
self.after_states[op] = out_state
for succ_addr in op.succs:
next_op = self.cfg[succ_addr]
next_state = self.before_states[next_op]
merged = self.merge(out_state, next_state)
if merged != next_state:
if next_op not in work_list:
work_list.append(next_op)
self.before_states[next_op] = merged
def __repr__(self):
return '{}(0x{:x})'.format(self.__class__.__name__, self.cfg.start_addr)
def show(self):
for op, state in sorted(self.before_states.items(), key=lambda x:x[0].address):
self.show_state(op, state)
def show_state(self, op, state):
print('{}: {}'.format(op_str(op), state))
def empty_state(self):
raise NotImplementedError
# takes in two analysis states and returns a new merged copy
# you should /not/ mutate either input state
def merge(self, s1, s2):
raise NotImplementedError
# returns the a new state for the flow function of the operand and state
# you should /not/ mutate the input state
def flow_func(self, state, op):
raise NotImplementedError