Skip to content

Commit

Permalink
Merge pull request #7 from tock/dev/scheduler-tests
Browse files Browse the repository at this point in the history
Implement scheduler release tests
  • Loading branch information
lschuermann authored Jan 3, 2025
2 parents 8ded9e3 + b83684b commit 38ec5da
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 1 deletion.
47 changes: 47 additions & 0 deletions hwci/tests/scheduler_restart_whileone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed under the Apache License, Version 2.0 OR the MIT License.
# SPDX-License-Identifier: Apache-2.0 OR MIT

import logging
import time
import re
from utils.test_helpers import OneshotTest

class SchedulerRestartWhileoneTest(OneshotTest):
def __init__(self):
super().__init__(apps=["tests/whileone"])

def oneshot_test(self, board):
serial = board.serial

def get_process_id():
# Extract the process list:
serial.write(b"list\r\n")
process_list_re = \
r"PID[ \t]+ShortID[ \t]+Name[ \t]+Quanta[ \t]+Syscalls[ \t]+Restarts[ \t]+Grants[ \t]+State\r\n(.*)\r\n"
process_list_out = serial.expect(process_list_re)
assert process_list_out is not None
process_list_match = re.match(process_list_re, process_list_out.decode("utf-8"))

# Extract the ID of the whileone process:
process_list_entry_0 = process_list_match.group(1)
assert "whileone" in process_list_entry_0
process_list_entry_0_match = re.match(r"[ \t]*([0-9]+)[ \t]+", process_list_entry_0)
whileone_pid = int(process_list_entry_0_match.group(1))
return whileone_pid

# Wait for the process console to be up:
assert serial.expect("tock") is not None

initial_pid = get_process_id()
logging.info(f"whileone process running with PID {initial_pid}")

logging.info("Restarting whileone process")
serial.write(b"terminate whileone\r\n")
time.sleep(0.5)
serial.write(b"boot whileone\r\n")

new_pid = get_process_id()
logging.info(f"whileone process running with PID {new_pid}")
assert new_pid > initial_pid

test = SchedulerRestartWhileoneTest()
55 changes: 55 additions & 0 deletions hwci/tests/scheduler_stop_start_whileone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed under the Apache License, Version 2.0 OR the MIT License.
# SPDX-License-Identifier: Apache-2.0 OR MIT

import logging
import time
import re
from utils.test_helpers import OneshotTest

class SchedulerStopStartWhileoneTest(OneshotTest):
def __init__(self):
super().__init__(apps=["tests/whileone"])

def oneshot_test(self, board):
serial = board.serial

def get_process_state():
# Extract the process list:
serial.write(b"list\r\n")
process_list_re = \
r"PID[ \t]+ShortID[ \t]+Name[ \t]+Quanta[ \t]+Syscalls[ \t]+Restarts[ \t]+Grants[ \t]+State\r\n(.*)\r\n"
process_list_out = serial.expect(process_list_re)
assert process_list_out is not None
process_list_match = re.match(process_list_re, process_list_out.decode("utf-8"))

# Extract the ID of the whileone process:
process_list_entry_0 = process_list_match.group(1)
assert "whileone" in process_list_entry_0

if "Stopped(Running)" in process_list_entry_0:
return "stopped_running"
elif "Running" in process_list_entry_0:
return "running"
else:
raise ValueError(f"Unknown process state: {process_list_entry_0}")

# Wait for the process console to be up:
assert serial.expect("tock") is not None

initial_state = get_process_state()
logging.info(f"whileone process {initial_state}")
assert initial_state == "running"

logging.info("Stopping whileone process")
serial.write(b"stop whileone\r\n")
stopped_state = get_process_state()
logging.info(f"whileone process {stopped_state}")
assert stopped_state == "stopped_running"

logging.info("Starting whileone process")
serial.write(b"start whileone\r\n")
started_state = get_process_state()
logging.info(f"whileone process {started_state}")
assert started_state == "running"

test = SchedulerStopStartWhileoneTest()
85 changes: 85 additions & 0 deletions hwci/tests/scheduler_whileone_blink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Licensed under the Apache License, Version 2.0 OR the MIT License.
# SPDX-License-Identifier: Apache-2.0 OR MIT

import logging
import time
import re
from utils.test_helpers import OneshotTest

class SchedulerWhileoneBlinkTest(OneshotTest):
def __init__(self):
super().__init__(apps=["tests/whileone", "blink"])

def oneshot_test(self, board):
gpio = board.gpio
serial = board.serial

# Map the LEDs according to target_spec.yaml
led_pins = {
"LED1": gpio.pin("P0.13"),
"LED2": gpio.pin("P0.14"),
}

# Configure LED pins as inputs to read their state
for led in led_pins.values():
led.set_mode("input")

# Since the LEDs are active low, when the pin is low, the LED is on
logging.info("Starting scheduler (whileone + blink) test")
toggle_counts = {
led: 0
for led in led_pins.keys()
}
previous_states = {}
for _ in range(50): # Read the LED states multiple times
current_states = {}
for name, pin in led_pins.items():
value = pin.read()
led_on = value == 0 # Active low
current_states[name] = led_on
logging.info(f"{name} is {'ON' if led_on else 'OFF'}")

# Compare with previous states to check for changes
if previous_states:
for name in led_pins.keys():
if current_states[name] != previous_states[name]:
logging.info(
f"{name} changed state to {'ON' if current_states[name] else 'OFF'}"
)
toggle_counts[name] += 1
previous_states = current_states

time.sleep(0.1) # Wait before next read

# Make sure that each LED toggled at least twice, and the frequency of
# toggles decreases with the LED index:
prev_led = None
prev_toggles = None
for led, toggle_count in toggle_counts.items():
assert toggle_count >= 2, f"LED {led} did not toggle at least 2 times!"
assert prev_toggles is None or toggle_count < prev_toggles, \
f"LED {led} toggled more ({toggle_count}) times than the previous LED ({prev_led}, {prev_toggles})"
prev_toggles = toggle_count
prev_led = led

# Ensure that both apps are reported as running:
serial.write(b"list\r\n")
process_list_re = \
r"PID[ \t]+ShortID[ \t]+Name[ \t]+Quanta[ \t]+Syscalls[ \t]+Restarts[ \t]+Grants[ \t]+State\r\n(.*)\r\n(.*)\r\n"
process_list_out = serial.expect(process_list_re)
assert process_list_out is not None
process_list_match = re.match(process_list_re, process_list_out.decode("utf-8"))

[whileone_proc, blink_proc] = \
[process_list_match.group(1), process_list_match.group(2)] \
if "whileone" in process_list_match.group(1) else \
[process_list_match.group(2), process_list_match.group(1)]

assert "whileone" in whileone_proc
assert "Running" in whileone_proc
assert "blink" in blink_proc
assert "Running" in blink_proc or "Yielded" in blink_proc

logging.info("Scheduler (whileone + blink) test completed successfully")

test = SchedulerWhileoneBlinkTest()
4 changes: 3 additions & 1 deletion hwci/utils/serial_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def expect(self, pattern, timeout=10, timeout_error=True):

def write(self, data):
logging.debug(f"Writing data: {data}")
self.ser.write(data)
for byte in data:
self.ser.write(bytes([byte]))
time.sleep(0.1)

def close(self):
self.ser.close()
Expand Down

0 comments on commit 38ec5da

Please sign in to comment.