Skip to content

Commit

Permalink
Add GPIO slot test for gadget SNAP
Browse files Browse the repository at this point in the history
Those GPIO related jobs intend to test if GPIO ports are exported after connecting interfaces between checkbox and gadget snap
  • Loading branch information
rickwu666666 committed Apr 15, 2024
1 parent 803cfe4 commit c61e4b0
Show file tree
Hide file tree
Showing 5 changed files with 464 additions and 0 deletions.
207 changes: 207 additions & 0 deletions contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/check_gpio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#!/usr/bin/env python3
import argparse
import os
from checkbox_support.snap_utils.snapd import Snapd
from checkbox_support.snap_utils.system import get_gadget_snap
import requests


def list_gpio_slots(snapd, gadget_name):
"""
List GPIO slots defined by a gadget snap.
Args:
snapd: A Snapd object for interacting with Snapd.
gadget_name: The name of the gadget snap.
Returns:
A dictionary containing GPIO slot information.
"""

gpio_slot = {}

# Go through whole response of "Snapd.interface()", and parser out
# the interfaces that is GPIO and define by gadget snap.
for slots in snapd.interfaces()["slots"]:
if slots["interface"] == "gpio" and slots["snap"] == gadget_name:
gpio_slot[slots["slot"]] = {"number": slots["attrs"]["number"]}
if gpio_slot:
return gpio_slot
else:
raise SystemExit("Error: Can not find any GPIO slot")


def parse_config(config):
"""
Parse a configuration string containing port numbers or ranges
of port numbers.
Args:
config (str): A comma-separated string containing port numbers
or ranges of port numbers.
Port ranges are specified using the format 'start:end'.
Returns:
list: A list containing all the port numbers parsed from
the configuration string.
Example:
>>> parse_config("1,2,5:7,10")
[1, 2, 5, 6, 7, 10]
"""

expect_port = []
if config:
for port_list in config.split(","):
if ":" not in port_list:
expect_port.append(int(port_list))
else:
start_port = port_list.split(":")[0]
end_port = port_list.split(":")[1]
try:
start_port = int(start_port)
end_port = int(end_port)
if start_port > end_port:
raise ValueError("Invalid port range: {}".
format(port_list))
for range_port in range(start_port, end_port + 1):
expect_port.append(range_port)
except ValueError:
raise ValueError("Invalid port range: {}".
format(port_list))
else:
raise ValueError("Error: Config is empty!")
return expect_port


def check_gpio_list(gpio_list, config):
"""
Check if all expected GPIO numbers are defined in the gadget snap.
Args:
gpio_list: A dictionary containing GPIO slot information.
config: Checkbox config including expected GPIO numbers.
e.g. EXPECTED_GADGET_GPIO=499,500,501:504
Sprate by comma, and also colon to define a range of ports
Raises:
SystemExit: If any expected GPIO slot is not defined in the gadget
snap.
"""
expect_port = parse_config(config)
for gpio in gpio_list.values():
if gpio["number"] in expect_port:
expect_port.remove(gpio["number"])
if expect_port:
for gpio_slot in expect_port:
print(
"Error: Slot of GPIO {} is not defined in gadget snap".format(
gpio_slot)
)
raise SystemExit(1)
else:
print("All expected GPIO slots have been defined in gadget snap.")


def connect_gpio(gpio_slots, gadget_name):
"""
Connect GPIO plugs of checkbox to GPIO slots of gadget snap.
Args:
gpio_slots: A dictionary containing GPIO slot information.
gadget_name: The name of the gadget snap.
Raises:
SystemExit: If failed to connect any GPIO.
"""

exit_code = 0
# Get the snap name of checkbox
snap = os.environ['SNAP_NAME']
timeout = int(os.environ.get('SNAPD_TASK_TIMEOUT', 60))
for gpio_num in gpio_slots.keys():
print("Attempting connect gpio to {}:{}".format(gadget_name, gpio_num))
try:
Snapd(task_timeout=timeout).connect(
gadget_name,
gpio_num,
snap,
"gpio"
)
print("Success")
except requests.HTTPError:
print("Failed to connect {}".format(gpio_num))
exit_code = 1
if exit_code == 1:
raise SystemExit(1)


def check_node(num):
"""
Check if a GPIO node is exported.
Args:
num: The GPIO number to check.
Raises:
SystemExit: If the GPIO node does not exist.
"""

path = "/sys/class/gpio/gpio{}".format(num)
if os.path.exists(path):
print("GPIO node of {} exist!".format(num))
else:
raise SystemExit("GPIO node of {} not exist!".format(num))


def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(
dest='action',
help="Action in check-gpio, check-node, connect and dump",
)
check_gpio_subparser = subparsers.add_parser("check-gpio")
check_gpio_subparser.add_argument(
"-c",
"--config",
required=True,
help="Checkbox config include expected GPIO\
e.g. 499:500:501:502",
)
check_node_subparser = subparsers.add_parser("check-node")
check_node_subparser.add_argument(
"-n",
"--num",
type=int,
required=True,
help="GPIO number to check if node exported",
)
subparsers.add_parser(
"connect",
help="Connect checkbox GPIO plug and gadget GPIO slots "
)
subparsers.add_parser(
"dump",
help="Dump GPIO slots from gadget"
)
args = parser.parse_args()
snapd = Snapd()
gadget_name = get_gadget_snap()
gpio_slots = list_gpio_slots(snapd, gadget_name)
if args.action == "check-gpio":
check_gpio_list(gpio_slots, args.config)
if args.action == "connect":
connect_gpio(gpio_slots, gadget_name)
if args.action == "dump":
for x in gpio_slots:
print(
"slot: {}\ngpio_number: {}\n".format(
x, gpio_slots[x]["number"]
)
)
if args.action == "check-node":
check_node(args.num)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#!/usr/bin/env python3
import unittest
from unittest.mock import patch, MagicMock, call
import requests
import check_gpio


class TestCheckGpio(unittest.TestCase):

def test_parse_single_ports(self):
# Test parsing single ports
self.assertEqual(check_gpio.parse_config("1,2,3"), [1, 2, 3])

def test_parse_port_ranges(self):
# Test parsing port ranges
self.assertEqual(check_gpio.parse_config("5:8,10:12"),
[5, 6, 7, 8, 10, 11, 12])

def test_parse_mixed(self):
# Test parsing mixed single ports and port ranges
self.assertEqual(check_gpio.parse_config("1,3:5,7,9:10"),
[1, 3, 4, 5, 7, 9, 10])

def test_parse_empty_string(self):
# Test parsing an empty string
expected_result = "Error: Config is empty!"
with self.assertRaises(ValueError) as err:
check_gpio.parse_config("")
self.assertEqual(err.exception.args[0], expected_result)

def test_parse_invalid_input(self):
# Test parsing invalid input (non-integer ports)
expected_result = "Invalid port range: 3:a"
with self.assertRaises(ValueError) as err:
check_gpio.parse_config("1,2,3:a")
self.assertEqual(err.exception.args[0], expected_result)

def test_list_gpio_slots_exist(self):
snapd_mock = MagicMock()
gadget_name = 'test_gadget'
snapd_mock.interfaces.return_value = {
"slots": [
{"snap": gadget_name,
"slot": "gpio-0",
"interface": "gpio",
"attrs": {"number": "10"}},
{"snap": "other-snap",
"slot": "other-slot",
"interface": "other-interface",
"attrs": {}},
]
}
expected_result = {"gpio-0": {"number": "10"}}
self.assertEqual(check_gpio.list_gpio_slots(snapd_mock, gadget_name),
expected_result)

def test_list_gpio_slots_not_exist(self):
snapd_mock = MagicMock()
gadget_name = 'test_gadget'
snapd_mock.interfaces.return_value = {
"slots": [
{"snap": "other-snap",
"slot": "other-slot",
"interface": "other-interface",
"attrs": {}},
]
}
expected_result = "Error: Can not find any GPIO slot"
with self.assertRaises(SystemExit) as err:
check_gpio.list_gpio_slots(snapd_mock, gadget_name)
self.assertEqual(err.exception.args[0], expected_result)

@patch('builtins.print') # Mock print function to prevent actual printing
def test_check_gpio_list_all_defined(self, mock_print):
gpio_list = {
1: {"number": 499},
2: {"number": 500},
3: {"number": 501},
4: {"number": 502}
}
config = "499,500:502"
check_gpio.check_gpio_list(gpio_list, config)
# Assert that "All expected GPIO slots have been defined
# in gadget snap." is printed
mock_print.assert_called_with(
"All expected GPIO slots have been defined in gadget snap.")

@patch('builtins.print') # Mock print function to prevent actual printing
def test_check_gpio_list_missing(self, mock_print):
gpio_list = {
1: {"number": 499},
2: {"number": 500},
3: {"number": 501},
# GPIO 502 is missing
}
config = "499,500:502"
with self.assertRaises(SystemExit) as context:
check_gpio.check_gpio_list(gpio_list, config)

# Assert that the proper error message is printed for the
# missing GPIO slot
mock_print.assert_called_with(
"Error: Slot of GPIO 502 is not defined in gadget snap")

# Assert that SystemExit is raised with exit code 1
self.assertEqual(context.exception.code, 1)

@patch('check_gpio.os.environ')
@patch('check_gpio.Snapd')
def test_connect_gpio_success(self, mock_snapd, mock_environ):
mock_environ.__getitem__.side_effect = lambda x: {
'SNAP_NAME': 'checkbox_snap',
'SNAPD_TASK_TIMEOUT': '30'
}[x]
mock_snapd.return_value.connect.side_effect = None

gpio_slots = {
"gpio-499": {"number": 499},
"gpio-500": {"number": 500}
}
gadget_name = "gadget_snap"

check_gpio.connect_gpio(gpio_slots, gadget_name)

# Assert that connect is called for each GPIO slot
expected_calls = [call(gadget_name,
'gpio-499',
'checkbox_snap',
'gpio'),
call(gadget_name,
'gpio-500',
'checkbox_snap',
'gpio')
]
mock_snapd.return_value.connect.assert_has_calls(expected_calls)

@patch('check_gpio.os.environ')
@patch('check_gpio.Snapd')
def test_connect_gpio_fail(self, mock_snapd, mock_environ):
mock_environ.__getitem__.side_effect = lambda x: {
'SNAP_NAME': 'checkbox_snap',
'SNAPD_TASK_TIMEOUT': '30'
}[x]
mock_snapd.return_value.connect.side_effect = requests.HTTPError

gpio_slots = {
"gpio-499": {"number": 499},
"gpio-500": {"number": 500}
}
gadget_name = "gadget_snap"
with self.assertRaises(SystemExit) as err:
check_gpio.connect_gpio(gpio_slots, gadget_name)

# Assert that connect is called for each GPIO slot
expected_calls = [call(gadget_name,
'gpio-499',
'checkbox_snap',
'gpio'),
call(gadget_name,
'gpio-500',
'checkbox_snap',
'gpio')]
mock_snapd.return_value.connect.assert_has_calls(expected_calls)
self.assertEqual(err.exception.code, 1)

@patch('builtins.print') # Mock print function to prevent actual printing
def test_check_node_exists(self, mock_print):
# Mocking os.path.exists to return True
with patch('os.path.exists', return_value=True):
check_gpio.check_node(499)
# Assert that "GPIO node of 499 exist!" is printed
mock_print.assert_called_with("GPIO node of 499 exist!")

def test_check_node_not_exist(self):
# Mocking os.path.exists to return False
with patch('os.path.exists', return_value=False):
with self.assertRaises(SystemExit) as context:
check_gpio.check_node(499)
# Assert that SystemExit is raised with the correct message
self.assertEqual(context.exception.args[0],
"GPIO node of 499 not exist!")


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit c61e4b0

Please sign in to comment.