-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnmfu.py
6685 lines (5560 loc) · 273 KB
/
nmfu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
NMFU - the "no memory for you" "parser" generator.
designed to create what a compsci major would yell at me for calling a dfa to parse files/protocols character by character while using as little
RAM as possible.
Copyright (C) 2020-2023 Matthew Mirvish
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
"""
__version__ = "0.5.7"
import abc
import enum
import string
import itertools
import queue
import io
import textwrap
import os
import sys
import weakref
try:
import lark
except ImportError: # pragma: no cover
print("... failed to import lark; must be in setup.py ...", file=sys.stderr)
# Mock it
class lark:
Token = None
Tree = None
@staticmethod
def Lark(*args, **kwargs):
return None
LarkError = RuntimeError
from collections import defaultdict, Counter
from typing import List, Optional, Iterable, Dict, Union, Set, Tuple
try: # pragma: no cover
import graphviz
debug_enabled = True
import lark.tree
except ImportError: # pragma: no cover
debug_enabled = False
grammar = r"""
start: top_decl* parser_decl top_decl*
?top_decl: out_decl
| macro_decl
| hook_decl
| code_decl
out_decl: "out" out_type IDENTIFIER ";"
| "out" out_type IDENTIFIER "=" atom ";"
code_decl: RESULT_CODE IDENTIFIER ("," IDENTIFIER)* ";"
out_type: "bool" -> bool_type
| "int" ("{" int_attr ("," int_attr)* "}")? -> int_type
| "enum" "{" IDENTIFIER ("," IDENTIFIER)+ "}" -> enum_type
| "str" "[" RADIX_NUMBER "]" -> str_type
| "unterminated" "str" "[" RADIX_NUMBER "]" -> unterm_str_type
| "raw" "{" IDENTIFIER "}" -> raw_type
int_attr: SIGNED -> signed_attr
| "size" NUMBER -> width_attr
hook_decl: "hook" IDENTIFIER ";"
macro_decl: "macro" IDENTIFIER macro_args "{" statement* "}"
macro_args: "(" macro_arg ("," macro_arg)* ")"
| "(" ")" -> macro_arg_empty
macro_arg: "macro" IDENTIFIER -> macro_macro_arg
| "out" IDENTIFIER -> macro_out_arg
| "match" IDENTIFIER -> macro_match_expr_arg
| "expr" IDENTIFIER -> macro_int_expr_arg
| "hook" IDENTIFIER -> macro_hook_arg
| "loop" IDENTIFIER -> macro_breaktgt_arg
| RESULT_CODE IDENTIFIER -> macro_rescode_arg
parser_decl: "parser" "{" statement+ "}"
?statement: block_stmt
| simple_stmt ";"
simple_stmt: expr -> match_stmt
| IDENTIFIER "=" expr -> assign_stmt
| IDENTIFIER "+=" expr -> append_stmt
| IDENTIFIER "(" (expr ("," expr)*)? ")" -> call_stmt
| "break" IDENTIFIER? -> break_stmt
| "delete" IDENTIFIER -> delete_stmt
| "finish" -> finish_stmt
| "finish" IDENTIFIER -> custom_finish_stmt
| "yield" IDENTIFIER -> custom_yield_stmt
| "wait" expr -> wait_stmt
block_stmt: "loop" IDENTIFIER? "{" statement+ "}" -> loop_stmt
| "case" "{" case_clause+ "}" -> case_stmt
| "greedy" "case" "{" greedy_prio_block+ "}" -> greedy_case_stmt
| "optional" "{" statement+ "}" -> optional_stmt
| "try" "{" statement+ "}" catch_block -> try_stmt
| "foreach" "{" statement+ "}" "do" "{" foreach_actions "}" -> foreach_stmt
| "if" if_condition ("elif" if_condition)* else_condition? -> if_stmt
if_condition: _math_expr "{" statement+ "}"
else_condition: "else" "{" statement+ "}"
foreach_actions: statement+
catch_block: "catch" catch_options? "{" statement* "}"
?greedy_prio_block: "prio" NUMBER case_clause
| "prio" NUMBER "{" case_clause+ "}"
| case_clause
case_clause: case_predicate ("," case_predicate)* "->" "{" statement* "}"
case_predicate: "else" -> else_predicate
| expr -> expr_predicate
catch_options: "(" CATCH_OPTION ("," CATCH_OPTION)* ")"
// EXPRESSIONS
?expr: atom // either literal or string depending on context
| regex // not an atom to simplify things
| binary_regex
| "end" -> end_expr
| "(" expr+ ")" -> concat_expr
| "[" _math_expr "]"
atom: BOOL_CONST -> bool_const
| RADIX_NUMBER -> number_const
| CHAR_CONSTANT -> char_const
| STRING "i" -> string_case_const
| STRING "b" -> binary_string_const
| STRING -> string_const
| IDENTIFIER -> identifier_const
_math_expr: disjunction_expr
?disjunction_expr: conjunction_expr ("||" conjunction_expr)*
?conjunction_expr: bit_or_expr ("&&" bit_or_expr)*
?bit_or_expr: bit_xor_expr ("|" bit_xor_expr)*
?bit_xor_expr: bit_and_expr ("^" bit_and_expr)*
?bit_and_expr: comp_expr ("&" comp_expr)*
?comp_expr: shift_expr (CMP_OP shift_expr)?
// nmfu does not allow (1 << 2 << 3) because that is dumb
?shift_expr: sum_expr (SHIFT_OP sum_expr)?
?sum_expr: mul_expr (SUM_OP mul_expr)*
?mul_expr: math_unary (MUL_OP math_unary)*
?math_unary: math_atom
| "!" math_atom -> not_expr
| "-" math_atom -> negate_expr
?math_atom: RADIX_NUMBER -> math_num
| IDENTIFIER -> math_var
| IDENTIFIER ".len" -> math_str_len
| IDENTIFIER "[" _math_expr "]" -> math_str_index
| CHAR_CONSTANT -> math_char_const
| BOOL_CONST -> bool_const
| "$" IDENTIFIER -> builtin_math_var
| "(" _math_expr ")"
// REGEX
// Uses templates to avoid duplication
regex: "/" regex_alternation "/"
regex_char_class: "\\" REGEX_CHARCLASS
?regex_alternation: regex_group ("|" regex_group)*
?regex_group: regex_alternation_element+
?regex_alternation_element: regex_literal
| regex_literal REGEX_OP -> regex_operation
| regex_literal "{" NUMBER "}" -> regex_exact_repeat
| regex_literal "{" NUMBER "," NUMBER "}" -> regex_range_repeat
| regex_literal "{" NUMBER "," "}" -> regex_at_least_repeat
?regex_literal: REGEX_UNIMPORTANT -> regex_raw_match // creates multiple char classes
| regex_char_class // creates a char class
| "(" regex_alternation ")" -> regex_group
| "[^" regex_set_element+ "]" -> regex_inverted_set // creates an inverted char class
| "[" regex_set_element+ "]" -> regex_set // creates a char class
| "." -> regex_any // creates an inverted char class
?regex_set_element: REGEX_CHARGROUP_ELEMENT_RAW
| REGEX_CHARGROUP_ELEMENT_RAW "-" REGEX_CHARGROUP_ELEMENT_RAW -> regex_set_range
| regex_char_class
binary_regex: "b/" binary_regex_alternation "/"
?binary_regex_alternation: binary_regex_group ("|" binary_regex_group)*
?binary_regex_group: binary_regex_alternation_element+
?binary_regex_alternation_element: binary_regex_literal
| binary_regex_literal REGEX_OP -> binary_regex_operation
| binary_regex_literal "{" NUMBER "}" -> binary_regex_exact_repeat
| binary_regex_literal "{" NUMBER "," NUMBER "}" -> binary_regex_range_repeat
| binary_regex_literal "{" NUMBER "," "}" -> binary_regex_at_least_repeat
?binary_regex_literal: REGEX_BYTE -> binary_regex_raw_match // creates multiple char classes
| "(" binary_regex_alternation ")" -> binary_regex_group
| "[^" binary_regex_set_element+ "]" -> binary_regex_inverted_set // creates an inverted char class
| "[" binary_regex_set_element+ "]" -> binary_regex_set // creates a char class
| "." -> binary_regex_any // creates an inverted char class
?binary_regex_set_element: REGEX_BYTE
| REGEX_BYTE "-" REGEX_BYTE -> binary_regex_set_range
// BINARY REGEX
// TERMINALS
BOOL_CONST: /true|false/
// catch options
CATCH_OPTION: /nomatch|outofspace/
RESULT_CODE: /yieldcode|finishcode/
%import common.CNAME -> CNAME
%import common.SIGNED_INT -> NUMBER
%import common.HEXDIGIT
HEX_NUMBER: ["+"|"-"] "0x" HEXDIGIT+
BIN_NUMBER: "0b" ["0"|"1"]+
RADIX_NUMBER: HEX_NUMBER | BIN_NUMBER | NUMBER
IDENTIFIER.-1: CNAME
STRING: /"(?:[^"\\]|\\.)*"/
// regex internals
REGEX_UNIMPORTANT: /[^.?*()\[\]\\+{}|\/]|\\\.|\\\*|\\\(|\\\)|\\\[|\\\]|\\\+|\\\\|\\\{|\\\}|\\\||\\\//
REGEX_OP: /[+*?]/
REGEX_CHARGROUP_ELEMENT_RAW: /[^\-\]\\\/]|\\-|\\\]|\\\\|\\\//
REGEX_CHARCLASS: /[wWdDsSntr ]/
REGEX_BYTE: /[0-9a-fA-F]{2}/
// math
SUM_OP: /[+-]/
MUL_OP: /[*\/%]/
CMP_OP: /[!=]=/ | /[<>]=?/
CHAR_CONSTANT: /'[^'\\]'/ | /'\\.'/
SHIFT_OP: "<<" | ">>"
SIGNED: "signed" | "unsigned"
// comments
%import common.WS
COMMENT: /\s*\/\/[^\n]*/
%ignore WS
%ignore COMMENT
"""
all_sum_expr_nodes = [
"disjunction_expr",
"conjunction_expr",
"comp_expr",
"sum_expr",
"mul_expr",
"math_num",
"negate_expr",
"not_expr",
"shift_expr",
"bit_or_expr",
"bit_xor_expr",
"bit_and_expr",
"math_str_len",
"math_str_index",
"math_var",
"math_char_const",
"builtin_math_var"
]
parser = lark.Lark(grammar, propagate_positions=True, lexer="dynamic_complete", start=["start", "regex"])
"""
NMFU operates in a few 'stages':
- 1. conversion to actual AST (removes variable assignments and turns them into actions)
- 2a. conversion to state machine recursively with abstract actions
- 2b. (optional) state machine optimizing
- 3. codegen
The overall architecture is similar to MLang, with various context classes which each steal the resources from a
predecessor ctx.
"""
# ===========
# STATE TYPES
# ===========
class DFTransition:
"""
A transition
on_values is a set of characters (or integers)
"""
# Special tokens
Else = type('_DFElse', (), {'__repr__': lambda x: "Else"})()
End = type('_DFEnd', (), {'__repr__': lambda x: "End"})()
def __init__(self, on_values=None, fallthrough=False, error_handling=False):
if type(on_values) is not list:
if type(on_values) is str or on_values in [DFTransition.Else, DFTransition.End]:
on_values = [on_values]
elif on_values is None:
on_values = []
else:
on_values = list(on_values)
self.on_values = on_values
self.target = None
self.is_fallthrough = fallthrough
self.error_handling = error_handling
self.actions = []
def copy(self):
my_copy = DFTransition()
my_copy.on_values = self.on_values.copy()
my_copy.actions = self.actions.copy()
my_copy.target = self.target
my_copy.is_fallthrough = self.is_fallthrough
my_copy.error_handling = self.error_handling
return my_copy
def __repr__(self):
error = "error " if self.error_handling else ""
if self.actions:
return f"<DFTransition {error}on={self.on_values} to={self.target} actions={self.actions} fallthough={self.is_fallthrough}>"
else:
return f"<DFTransition {error}on={self.on_values} to={self.target} fallthough={self.is_fallthrough}>"
def attach(self, *actions, prepend=False):
if prepend:
self.actions = list(actions) + self.actions
else:
self.actions.extend(actions)
return self
def to(self, target):
if isinstance(target, int):
self.target = DFState.all_states[target]
else:
self.target = target
return self
def fallthrough(self, fall=True):
self.is_fallthrough = fall
return self
def handles_else(self, handles=True):
self.error_handling = handles
return self
@classmethod
def from_key(cls, on_values, inherited):
return cls(on_values).to(inherited.target).attach(*inherited.actions).fallthrough(inherited.is_fallthrough).handles_else(inherited.error_handling)
class DFConditionalTransition(DFTransition):
def __init__(self, condition: "DFCondition"):
super().__init__(on_values=(), fallthrough=True)
self.condition = condition
def constrain(self, *conditions, join_as_or=False):
# TODO: work properly
self.condition = conditions[0]
def __repr__(self):
if self.actions:
return f"<DFConditionalTransition on={self.condition!r} to={self.target} actions={self.actions}>"
else:
return f"<DFConditionalTransition on={self.condition!r} to={self.target}>"
class DFState:
all_states = {}
def __init__(self):
self.transitions = []
DFState.all_states[id(self)] = self
def transition(self, transition, allow_replace=False, allow_replace_if=None, collapse_else=True):
if isinstance(transition, DFConditionalTransition):
raise IllegalDFAStateError("Invalid condition transition on normal state", self)
# Add parent relationship if this is a new transition
if ProgramData.lookup(transition, DTAG.PARENT) is None:
ProgramData.imbue(transition, DTAG.PARENT, self)
if allow_replace_if is None:
allow_replace_constant = True if allow_replace else False # don't allow reference binding
allow_replace = lambda x: allow_replace_constant
else:
allow_replace = allow_replace_if
contained = set()
for i in self.all_transitions():
if set(i.on_values) & set(transition.on_values):
contained.add(i)
break
if contained and any(x.target != transition.target or x.is_fallthrough != transition.is_fallthrough or x.error_handling != transition.error_handling for x in contained):
for contain in contained:
if allow_replace(contain):
for x in transition.on_values:
try:
contain.on_values.remove(x)
except ValueError:
continue
if not contain.on_values:
self.transitions.remove(contain)
else:
raise IllegalDFAStateError("Duplicate transition", self)
elif contained:
return # don't add duplicate entries in the table
# Check if we can "inline" this transition
if collapse_else:
for transition_in in self.transitions:
if transition_in.actions == transition.actions and transition_in.target == transition.target and transition_in.is_fallthrough == transition.is_fallthrough and transition_in.error_handling == transition.error_handling:
if DFTransition.Else in transition_in.on_values or DFTransition.Else in transition.on_values:
transition_in.on_values = [DFTransition.Else]
else:
transition_in.on_values = list(set(transition_in.on_values) | set(transition.on_values))
return self
self.transitions.append(transition)
return self
def all_transitions(self) -> Iterable[DFTransition]:
yield from self.transitions
def all_transitions_for(self, on_values):
for i in self.transitions:
if len(set(i.on_values) & set(on_values)):
yield i
def compute_foreign_else_definition(self, other_state: "DFState") -> Set:
r"""
Given two states
(self) -(a,b,c)-> ...
\-----(ELSE)-> ...
and
(other) ---(a)-> ...
\-----(ELSE)-> ...
it should be clear that the specific meaning of ELSE differs on both.
This function returns a (potentially super-)set of the symbols which ELSE
on a _foreign_ state (like "other") will match on this state.
"""
our_alphabet = self.local_alphabet()
their_alphabet = other_state.local_alphabet()
local_else_additions = our_alphabet - their_alphabet
local_else_additions.add(DFTransition.Else)
return local_else_additions
def __setitem__(self, key, data):
if type(key) in (tuple, list, set, frozenset):
on_values = key
else:
on_values = [key]
if type(data) is DFTransition:
self.transition(DFTransition.from_key(on_values, data), True)
else:
self.transition(DFTransition(on_values).to(data), True)
def __delitem__(self, key):
if type(key) in (tuple, list, set, frozenset):
on_values = key
else:
on_values = [key]
contained = None
for i in self.all_transitions():
if set(i.on_values) & set(on_values):
contained = i
break
if contained is not None:
for on_value in on_values:
contained.on_values.remove(on_value)
if not contained.on_values:
self.transitions.remove(contained)
def __getitem__(self, data):
"""
Get the transition that would be followed for data, including Else if applicable
"""
if type(data) in (list, tuple, set, frozenset):
data = frozenset(data)
for i in self.all_transitions():
if not i.on_values:
continue
if data <= set(i.on_values):
return i
elif data > set(i.on_values):
return None
return self[DFTransition.Else]
else:
for i in self.all_transitions():
if data in i.on_values:
return i
if DFTransition.Else != data:
return self[DFTransition.Else]
def local_alphabet(self, excluding=(DFTransition.Else,)) -> Set:
"""
Get the 'local alphabet': all symbols that are matched by this state excluding some set.
"""
local_alphabet = set()
for trans in self.transitions:
for i in trans.on_values:
if i in excluding: continue
local_alphabet.add(i)
return local_alphabet
class DFProxyState(DFState):
"""
Represents a state that only contains the equivalent of a fallthrough else
tansition to another state; in other words a pure "proxy" to another state, potentially
with some number of actions/conditions.
Note this class on its own does not _enforce_ these constraints, and will break if used in a way that
violates them.
"""
def can_eliminate(self):
"""
Can this node be eliminated?
"""
for transition in self.all_transitions():
if transition.actions:
return False
return True
def non_proxy_frontiers(self):
"""
Return the set of states reachable from following only proxy states, starting at this state.
"""
visited = set()
sources = set()
def aux(state):
if state in visited:
return
visited.add(state)
if isinstance(state, DFProxyState):
for t in state.all_transitions():
aux(t.target)
# Also consider actions with overrides
for action in t.actions:
if action.get_target_override_mode() in (ActionOverrideMode.MAY_GOTO_TARGET, ActionOverrideMode.ALWAYS_GOTO_OTHER):
for extra in action.get_target_override_targets():
aux(extra)
else:
sources.add(state)
aux(self)
return sources
def equivalent_on_values(self):
r"""
Compute an "equivalent" set of on_values, such that taking any one will have at least one valid path through all directly-attached condition points, in addition
to a set of on_values which map to the treat_as_else state in _all_ outcomes.
i.e. for a -t-> b -<c>-> c
\-f-> d -<else>-> e
\-<f>-> ELSE
we'd return ({c, else}, {f}), since f should go to an error state in some equivalent state E replacing a, and c/else should continue on. This lets us rewrite the dfa as
E -{c,else}=> a -> ...
\-f=> ELSE
which is a valid construction for append_after.
"""
encountered = set()
candidate_else = set()
sources = self.non_proxy_frontiers()
for state in sources:
for t in state.all_transitions():
if t.error_handling:
candidate_else.update(t.on_values)
else:
encountered.update(t.on_values)
filtered = set()
for possible in candidate_else:
for state in sources:
if state[possible] is None:
continue
if not state[possible].error_handling:
filtered.add(possible)
encountered.update(filtered)
candidate_else.difference_update(filtered)
return encountered, candidate_else
class DFConditionPoint(DFProxyState):
"""
Represents a single "condition point": a node which only has fallthrough conditional-transitions. A conditional-transition
doesn't match input, rather it just selects some unambiguous path to continue on
"""
def __init__(self):
super().__init__()
self.transitions = []
def transition(self, transition):
if not isinstance(transition, DFConditionalTransition):
raise IllegalDFAStateError("Invalid normal transition on condition point", self)
if transition in self.transitions:
return
if ProgramData.lookup(transition, DTAG.PARENT) is None:
ProgramData.imbue(transition, DTAG.PARENT, self)
self.transitions.append(transition)
def __getitem__(self, key): # pragma: no cover
raise NotImplementedError()
def __delitem__(self, key): # pragma: no cover
raise NotImplementedError()
def __setitem__(self, key): # pragma: no cover
raise NotImplementedError()
def can_eliminate(self):
return False
class DFA:
all_state_machines = {}
def __init__(self):
self.accepting_states = []
self.starting_state = None
self.states: List[DFState] = []
def add(self, state):
if ProgramData.lookup(state, DTAG.PARENT) is None:
ProgramData.imbue(state, DTAG.PARENT, self)
if self.starting_state == None:
self.starting_state = state
self.states.append(state)
def mark_accepting(self, state):
if isinstance(state, int):
self.accepting_states.append(DFState.all_states[state])
else:
self.accepting_states.append(state)
def simulate(self, actions):
"""
Attempt to simulate what would happen given the input states.
"""
return list(st for _, st, act in self.trace(actions) if act is None)[-1]
def trace(self, actions):
"""
Record a trace of all actions and state transitions executed given some input, in the form
(char, state, action)
"""
position = self.starting_state
yield (None, position, None)
for action_char in actions:
try:
while True:
if position is None:
break
transition = position[action_char]
position = transition.target
yield (action_char, position, None)
for action in transition.actions:
yield (action_char, position, action)
if action.get_target_override_mode() == ActionOverrideMode.ALWAYS_GOTO_OTHER:
potential = action.get_target_override_targets() # TODO: handle correctly
position = potential[0]
elif action.get_target_override_mode() == ActionOverrideMode.ALWAYS_GOTO_UNDEFINED:
yield (action_char, action, None)
return
if not transition.is_fallthrough:
break
except AttributeError:
yield (action_char, None, None)
return
else:
if not position:
yield (action_char, None, None)
return
def simulate_accepts(self, actions):
"""
Determine if a given input ends with a success or fail (considers FinishAction)
"""
result = self.simulate(actions)
if result in self.accepting_states:
return True
if isinstance(result, FinishAction):
return True
return False
def dfs(self):
"""
Construct a dfs-order traversal of the DFA
"""
visited = set()
def aux(state):
if not state:
return
if state in visited:
return
visited.add(state)
yield state
for t in state.all_transitions():
use_real = True
for action in t.actions:
if action.get_target_override_mode() == ActionOverrideMode.ALWAYS_GOTO_OTHER:
use_real = False
for tgt in action.get_target_override_targets():
yield from aux(tgt)
break
elif action.get_target_override_mode() == ActionOverrideMode.ALWAYS_GOTO_UNDEFINED:
use_real = False
break
elif action.get_target_override_mode() == ActionOverrideMode.MAY_GOTO_TARGET:
for tgt in action.get_target_override_targets():
yield from aux(tgt)
if use_real:
yield from aux(t.target)
yield from aux(self.starting_state)
def error_handling_transitions(self, include_states=False):
"""
Return a set of all transitions that are marked as error handling from the start state
"""
result = set()
for state, t in self.all_transitions(True):
if t.error_handling:
if include_states:
result.add((state, t))
else:
result.add(t)
return result
def transitions_pointing_to(self, target_state: DFState, include_states=False):
"""
Return a set of all transitions that point to the target state that are reachable
from the start state
"""
result = set()
for state, t in self.all_transitions(True):
if t.target == target_state:
if include_states:
result.add((state, t))
else:
result.add(t)
return result
def transitions_that_do(self, action: "Action"):
"""
Return a set of all transitions that contain the action and that are reachable
from the start state
"""
result = set()
for t in self.all_transitions():
if action in t.actions:
result.add(t)
return result
def all_transitions(self, include_states=False):
"""
Yield all reachable transitions
"""
for state in self.dfs():
for action in state.all_transitions():
if include_states:
yield (state, action)
else:
yield action
def is_valid(self):
"""
Can we still reach at least one accept state?
"""
for state in self.dfs():
if state in self.accepting_states:
return True
return False
def append_after(self, chained_dfa: "DFA", sub_states=None, mark_accept=True, chain_actions=None):
"""
Add the chained_dfa in such a way that its start state "becomes" all of the `sub_states` (or if unspecified, the accept states of _this_ dfa).
If mark_accept is True, we should replace the accept states that we currently have with corresponding ones based on the chained dfa.
If chain_actions is not empty, we add those actions to all new transitions. This does _not_ create potential ambiguity, as the original start
states of chained DFAs are kept in-tact, so loops work properly. In other wors, chain_actions will only be run once and so is a suitable mechanism
for adding finish actions.
"""
if sub_states is None:
sub_states = self.accepting_states
if chain_actions is None:
chain_actions = []
fake_initial_transition = None
def resolve_transitions_for_error(transition, symbolset):
if transition is not fake_initial_transition:
yield transition
else:
condition_point: DFConditionPoint = transition.target
visited = set()
for i in condition_point.non_proxy_frontiers():
local_symbolset = symbolset.copy()
if DFTransition.Else in local_symbolset:
local_symbolset.remove(DFTransition.Else)
local_symbolset |= i.compute_foreign_else_definition(condition_point)
for symbol in local_symbolset:
transition = i[symbol]
if transition in visited:
continue
visited.add(transition)
if not transition.error_handling:
yield transition
if isinstance(chained_dfa.starting_state, DFProxyState):
# Add an extra state that will represent the condition point properly (see docs for equivalent_on_values)
valid, to_else = chained_dfa.starting_state.equivalent_on_values()
if valid and to_else:
fake_start = DFState()
chained_dfa.add(fake_start)
fake_start[valid] = chained_dfa.starting_state
fake_initial_transition = fake_start[valid].fallthrough(True)
fake_start[to_else] = chained_dfa.starting_state
fake_start[to_else].fallthrough(True).handles_else()
chained_dfa.starting_state = fake_start
# If the caller wants to chain actions into a DFA which potentially matches the empty string, we have to place the actions onto
# transitions going into the sub_states, instead of on transitions coming out of them that we generate. This adds more opportunities
# for "unable to schedule strict"-type errors, but avoids missing actions in these cases.
if chain_actions and chained_dfa.starting_state in chained_dfa.accepting_states:
self.chain_actions_into(chain_actions, sub_states)
chain_actions = [] # Since the actions are already handled, don't try to add them to new transitions.
# Check for ambiguity: if any transitions added to a sub_state try to redirect a valid character a different valid
# state, there is ambiguity. In other words, we only want to add transitions that would previously lead to the error state.
chained_transitions = [x for x in chained_dfa.starting_state.transitions if DFTransition.Else in x.on_values]
chained_transitions.extend(x for x in chained_dfa.starting_state.transitions if DFTransition.Else not in x.on_values)
valid_replacers = set()
for transition in chained_transitions:
if transition.error_handling:
continue
valid_replacers.add(transition.target)
for sub_state in sub_states:
# Compute the "local else additions": what things that are matched by Else in the chained start state which Else in the sub_state does _not_ match.
# Note that we only handle this in one direction; technically sub_state[Else] could refer to a smaller set than chained.start[Else] but that shouldn't
# matter too much.
sub_local_alphabet = sub_state.local_alphabet()
local_else_meaning = sub_state.compute_foreign_else_definition(chained_dfa.starting_state)
culled_chained_transitions = []
for transition in chained_transitions:
culled_transition = transition.copy()
relevant_values = set(transition.on_values)
irrelevant_values = set()
if DFTransition.Else in relevant_values:
relevant_values.update(local_else_meaning)
target = sub_state[relevant_values]
if target is None:
targets = set()
for value in relevant_values:
v = sub_state[value]
if v is None:
irrelevant_values.add(value)
continue
targets.add(v)
relevant_values -= irrelevant_values
else:
targets = {target}
# targets is the set of all (potentially) conflicting transitions
if transition.error_handling:
# Cull values
for j in relevant_values.copy():
if sub_state[j] is not None and not sub_state[j].error_handling:
relevant_values.remove(j)
elif not all(x.error_handling or x.target == transition.target for x in targets):
if ProgramData.dump(DebugDumpable.DFA) and ProgramData.do(ProgramFlag.VERBOSE_AMBIG_ERRORS): # pragma: no cover
debug_dump_dfa(self, "ae_dfa2", highlight=sub_state)
debug_dump_dfa(chained_dfa, "ae_dfa1")
dprint[ProgramFlag.VERBOSE_AMBIG_ERRORS]("TT", transition)
dprint[ProgramFlag.VERBOSE_AMBIG_ERRORS]("TA", targets)
targets = [x for x in targets if not x.error_handling and x.target != transition.target]
dprint[ProgramFlag.VERBOSE_AMBIG_ERRORS]("TAF", targets)
dprint[ProgramFlag.VERBOSE_AMBIG_ERRORS]("RV", relevant_values)
for i in relevant_values:
if sub_state[i] not in targets:
irrelevant_values.add(i)
relevant_values -= irrelevant_values
dprint[ProgramFlag.VERBOSE_AMBIG_ERRORS]("RVF", relevant_values)
# Come up with a reasonable description of which characters are ambiguous
if relevant_values == {DFTransition.Else}: # wildcard notation hard to represent
rv_suffix = ""
else:
relevant_values_in_msg = relevant_values - {DFTransition.Else}
if len(relevant_values_in_msg) >= 3:
relevant_values_in_msg = list(relevant_values_in_msg)[:3]
if ProgramData.do(ProgramFlag.CODEPOINTS_IN_ERRORS):
rv_suffix = f" on symbol{'s' if len(relevant_values_in_msg) > 1 else ''} {', '.join(format(ord(x), '02x') for x in relevant_values_in_msg)}"
else:
rv_suffix = f" on character{'s' if len(relevant_values_in_msg) > 1 else ''} {', '.join(repr(x) for x in (relevant_values_in_msg))}"
raise IllegalDFAStateConflictsError("Ambiguous transitions detected while joining DFAs" + rv_suffix, *itertools.chain(
*(resolve_transitions_for_error(x, relevant_values) for x in targets)), *resolve_transitions_for_error(transition, relevant_values))
# Create transition to add
culled_transition.on_values = list(relevant_values | irrelevant_values)
culled_transition.attach(*chain_actions, prepend=True)
culled_chained_transitions.append(culled_transition)
for new_transition in culled_chained_transitions:
sub_state.transition(new_transition, allow_replace_if=lambda x: x.error_handling or x.target in valid_replacers)
# Adopt all the states
for state in chained_dfa.states:
if state not in self.states:
self.add(state)
# Finally, mark all the sub_states as no longer accept states, and mark the new accept states thus
if mark_accept:
sub_states = sub_states.copy()
for sub_state in sub_states:
if sub_state in self.accepting_states:
self.accepting_states.remove(sub_state)
# Additionally, if the starting state was an accept state, mark all the sub states as accept states too
if chained_dfa.starting_state in chained_dfa.accepting_states:
for sub_state in sub_states:
self.mark_accepting(sub_state)
for state in chained_dfa.accepting_states:
self.mark_accepting(state)
def chain_actions_into(self, actions: Iterable["Action"], target_states: Iterable[DFState]):
"""
Attempt to chain the given actions on all transitions pointing into the passed states.
If these actions cannot be scheduled once (if requested) an error will be thrown. Note the check for scheduling once