-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_serial.py
109 lines (93 loc) · 3.18 KB
/
log_serial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# @file list_tool.py
# @author Frederich Stine
# @brief host tool for listing installed components
# @date 2024
#
# This source file is part of an example system for MITRE's 2024 Embedded CTF (eCTF).
# This code is being provided only for educational purposes for the 2024 MITRE eCTF
# competition, and may not meet MITRE standards for quality. Use this code at your
# own risk!
#
# @copyright Copyright (c) 2024 The MITRE Corporation
import argparse
import serial
import re
from loguru import logger
import sys
# Logger formatting
fmt = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"{extra[extra]: <6} | "
"<level>{level: <8}</level> | "
"<level>{message}</level> "
)
logger.remove(0)
logger.add(sys.stdout, format=fmt)
# List function
def list(args):
ser = serial.Serial(
port=args.application_processor,
baudrate=115200,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
)
# Send command
output = ""
# Receive messages until done
while True:
byte = ser.read()
char = byte.decode("utf-8")
output += char
output = process_output(output)
def process_output(output):
# Find INFO level messages
match = re.search("%info: ((.|\n|\r)*?)%", output)
if match != None:
# Output all of the data and remove from buffer
output = output[:match.start()] + output[match.end():]
for line in match.group(1).strip().split('\n'):
logger.bind(extra="OUTPUT").info(line.strip())
# Find DEBUG level messages
match = re.search("%debug: ((.|\n|\r)*?)%", output)
if match != None:
# Output all of the data and remove from buffer
output = output[:match.start()] + output[match.end():]
for line in match.group(1).strip().split('\n'):
logger.bind(extra="OUTPUT").debug(line.strip())
# Find ACK level messages
match = re.search("%ack%", output)
if match != None:
# Ignore
output = output[:match.start()] + output[match.end():]
# Find SUCCESS level messages
match = re.search("%success: ((.|\n|\r)*?)%", output)
if match != None:
# Output all of the data and remove from buffer
output = output[:match.start()] + output[match.end():]
for line in match.group(1).strip().split('\n'):
logger.bind(extra="OUTPUT").success(line.strip())
exit(0)
# Find ERROR level messages
match = re.search("%error: ((.|\n|\r)*?)%", output)
if match != None:
# Output all of the data and remove from buffer
output = output[:match.start()] + output[match.end():]
for line in match.group(1).strip().split('\n'):
logger.bind(extra="OUTPUT").error(line.strip())
exit(1)
# Return the spliced output
return output
# Main function
def main():
parser = argparse.ArgumentParser(
prog="eCTF List Host Tool",
description="List the components connected to the medical device",
)
parser.add_argument(
"-a", "--application-processor", required=True, help="Serial device of the AP"
)
args = parser.parse_args()
list(args)
if __name__ == "__main__":
main()