diff --git a/CHANGELOG.md b/CHANGELOG.md index 8220019..d3fb393 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Changelog +## Version 4.0.0 + +Breaking changes: +- `device.create_qod_session()` now requires `duration` as a mandatory parameter +- `device.get_congestion()` now returns a list of `Congestion` objects + +Changes: +- `device.verify_location()` may now return a "PARTIAL" result if the device is + partially inside the verification area + +Fixes: +- Previously due to a miscommunication `device.sessions()` would return all + created sessions. These have now been correctly limited to device-specific ones + ## Version 3.1.0 Changes: diff --git a/Jenkinsfile b/Jenkinsfile index a34533d..1bc824a 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -93,6 +93,17 @@ pipeline { } } } + stage('Audit') { + steps { + container('beluga') { + script { + sh """ + https_proxy="http://fihel1d-proxy.emea.nsn-net.net:8080" python3 -m poetry run pip-audit + """ + } + } + } + } stage('Integration Test') { steps { container('beluga') { @@ -125,17 +136,6 @@ pipeline { } } } - stage('Audit') { - steps { - container('beluga') { - script { - sh """ - https_proxy="http://fihel1d-proxy.emea.nsn-net.net:8080" python3 -m poetry run pip-audit - """ - } - } - } - } stage('Build') { steps { container('beluga') { @@ -150,7 +150,7 @@ pipeline { } stage('Installation Test') { when { expression { env.gitlabActionType == "TAG_PUSH" && - (env.gitlabTargetBranch.contains("rc-") || env.gitlabTargetBranch.contains("release-"))} } + (env.gitlabBranch.contains("rc-") || env.gitlabBranch.contains("release-"))} } steps { container('beluga') { script { @@ -167,14 +167,14 @@ pipeline { } } stage('Deploy candidate') { - when { expression { env.gitlabActionType == "TAG_PUSH" && env.gitlabTargetBranch.contains("rc-")} } + when { expression { env.gitlabActionType == "TAG_PUSH" && env.gitlabBranch.contains("rc-")} } steps { container('beluga') { script { sh """ env | grep gitlab """ - if(env.gitlabActionType == "TAG_PUSH" && env.gitlabTargetBranch.contains("rc-")){ + if(env.gitlabActionType == "TAG_PUSH" && env.gitlabBranch.contains("rc-")){ sh ''' python3 -m poetry config repositories.devpi ${PYPI_REPOSITORY} python3 -m poetry build @@ -186,7 +186,7 @@ pipeline { } } stage('Deploy release') { - when { expression { env.gitlabActionType == "TAG_PUSH" && env.gitlabTargetBranch.contains("release-")} } + when { expression { env.gitlabActionType == "TAG_PUSH" && env.gitlabBranch.contains("release-")} } steps { container('beluga') { script { diff --git a/examples/device-status.py b/examples/device-status.py index e4e2b6d..034e217 100644 --- a/examples/device-status.py +++ b/examples/device-status.py @@ -1,39 +1,47 @@ -# Device Status functionalities - -# Subscribing to Connectivity and Roaming updates: - -from typing import List import network_as_code as nac +from datetime import datetime, timedelta, timezone +from network_as_code.models.device import DeviceIpv4Addr -from network_as_code.models.device import Device, DeviceIpv4Addr - +# We begin by creating a Network as Code client client = nac.NetworkAsCodeClient( token="" ) -device = client.devices.get( - "device@bestcsp.net", - ipv4_address = DeviceIpv4Addr( - public_address="233.252.0.2", - private_address="192.0.2.25", - public_port=80), - ipv6_address = "2001:db8:1234:5678:9abc:def0:fedc:ba98", +# Then, we create an object for the mobile device we want to use +my_device = client.devices.get( + "device@testcsp.net", + ipv4_address=DeviceIpv4Addr( + public_address="192.0.2.3", + private_address="192.0.2.204", + public_port=80 + ), + ipv6_address="2001:db8:1234:5678:9abc:def0:fedc:ba98", # The phone number accepts the "+" sign, but not spaces or "()" marks - phone_number = "36721601234567" + phone_number="36721601234567" ) -connectivity_subscription = client.connectivity.subscribe( - # Change it to "ROAMING_STATUS" whenever needed - event_type="CONNECTIVITY", - device=device, - max_num_of_reports=5, - notification_url="https://example.com/notifications", +# Simply change the event_type to +# "org.camaraproject.device-status.v0.roaming-status" whenever needed. +my_subscription = client.connectivity.subscribe( + event_type="org.camaraproject.device-status.v0.connectivity-data", + device=my_device, + # You can tell when the subscription is supposed to expire + # with a date-time object + subscription_expire_time=datetime.now(timezone.utc) + timedelta(days=1), # Use HTTPS to send notifications + notification_url="https://example.com/notifications", notification_auth_token="replace-with-your-auth-token" ) -# Get a subscription by its ID -subscription = client.connectivity.get_subscription(connectivity_subscription.id) +# Use this to show the roaming subscription status +print(my_subscription) + +# You can also get a subscription by its ID +subscription = client.connectivity.get_subscription(my_subscription.id) + +# Or check when your subscription starts/expires: +print(subscription.starts_at) +print(subscription.expires_at) # Delete a subscription -connectivity_subscription.delete() +subscription.delete() diff --git a/examples/identity-security.py b/examples/identity-security.py index d88da06..989ab2b 100644 --- a/examples/identity-security.py +++ b/examples/identity-security.py @@ -14,13 +14,22 @@ # Then, create a device object for the phone number you want to check my_device = client.devices.get( # The phone number accepts the "+" sign, but not spaces or "()" marks - phone_number="36721601234567" + phone_number="+346661113334" ) -# Check the latest SIM-Swap date -latest_sim_swap_date = my_device.get_sim_swap_date() +# The date of the last SIM Swap can be retrieved like so: +# The output may be null, if no SIM Swap has occurred. +# Or it may also return the SIM activation date. +sim_swap_date = my_device.get_sim_swap_date() - # Check SIM-Swap events within specified time spans -# The max_age parameter is not mandatory -# This method also checks if SIM swap occurred within an undefined age -sim_swap_check = my_device.verify_sim_swap(max_age=360) +# Otherwise it behaves like a regular datetime object +print(sim_swap_date.isoformat()) + +# If you are only interested if a SIM swap has occurred, +# just use: +if my_device.verify_sim_swap(): + print("There has been a SIM swap!") + +# You can also test if the SIM swap happened recently: +if my_device.verify_sim_swap(max_age=3600): + print("A SIM swap occurred within the past hour!") \ No newline at end of file diff --git a/examples/location-information.py b/examples/location-information.py index 79a369e..52d6b3f 100644 --- a/examples/location-information.py +++ b/examples/location-information.py @@ -16,7 +16,7 @@ from network_as_code.models.device import Device, DeviceIpv4Addr -SDK_TOKEN = "" +SDK_TOKEN = "" DEVICE_ID = "device@testcsp.net" # Give the device the device identifier and SDK token diff --git a/examples/network-insights.py b/examples/network-insights.py index ccbb3f6..32f6b0a 100644 --- a/examples/network-insights.py +++ b/examples/network-insights.py @@ -9,7 +9,7 @@ from network_as_code.models.device import Device, DeviceIpv4Addr # Initialize the client object with your application key -SDK_TOKEN = "" +SDK_TOKEN = "" DEVICE_ID = "device@testcsp.net" # Give the device the device identifier and SDK token @@ -19,7 +19,8 @@ my_device = client.devices.get(DEVICE_ID) -# Subscribe your device to Congestion notifications +# Subscribe your device to Congestion notifications FIRST. +# Then, you'll be able to use other functionalities, such as polling. congestion_subscription = client.insights.subscribe_to_congestion_info( my_device, # Set the duration of your subscription to congestion insights, @@ -38,9 +39,13 @@ print(congestion_subscription.starts_at) print(congestion_subscription.expires_at) -# Get historical data between two timestamps +# Get congestion historical data between two timestamps # Set the duration/time difference with the timedelta function +# It returns an array of congested objects and the prediction confidence level congestion = my_device.get_congestion( start=datetime.now(timezone.utc), end=datetime.now(timezone.utc) + timedelta(hours=3) ) + +# Show the congestion level objects +print(congestion) \ No newline at end of file diff --git a/examples/network_slices.py b/examples/network_slices.py index 5306a1e..f5c4382 100644 --- a/examples/network_slices.py +++ b/examples/network_slices.py @@ -10,12 +10,11 @@ Point, AreaOfService, NetworkIdentifier, - Slice, SliceInfo, Throughput ) -SDK_TOKEN = "" +SDK_TOKEN = "" DEVICE_ID = "device@testcsp.net" @@ -24,31 +23,37 @@ token=SDK_TOKEN ) - # Creation of a slice # We use the country code (MCC) and network code (MNC) to identify the network # Different types of slices can be requested using service type and differentiator # Area of the slice must also be described in geo-coordinates my_slice = client.slices.create( - name="slice-name", - network_id=NetworkIdentifier(mcc="664", mnc="22"), - slice_info=SliceInfo(service_type="eMBB", differentiator="123456"), - area_of_service=AreaOfService( - polygon=[ - Point(latitude=42.0, longitude=42.0), - Point(latitude=41.0, longitude=42.0), - Point(latitude=42.0, longitude=41.0), - Point(latitude=42.0, longitude=42.0) - ] - ), - notification_url="https://notify.me/here", - # Use HTTPS to send notifications - notification_auth_token="replace-with-your-auth-token" + name="slice-test-2", + network_id = NetworkIdentifier(mcc="236", mnc="30"), + slice_info = SliceInfo(service_type="eMBB", differentiator="444444"), + area_of_service = AreaOfService(polygon=[ + Point(latitude=47.344, longitude=104.349), + Point(latitude=35.344, longitude=76.619), + Point(latitude=12.344, longitude=142.541), + Point(latitude=19.43, longitude=103.53) + ]), + slice_downlink_throughput = Throughput(guaranteed=3415, maximum=1234324), + slice_uplink_throughput = Throughput(guaranteed=3415, maximum=1234324), + device_downlink_throughput = Throughput(guaranteed=3415, maximum=1234324), + device_uplink_throughput = Throughput(guaranteed=3415, maximum=1234324), + max_data_connections=10, + max_devices=6, + # Use HTTPS to send notifications + notification_url="https://snippets.requestcatcher.com/test", ) # Get a slice by its ID slice = client.slices.get(my_slice.name) +# Get a slice by using an index +# or remove the index '[0]' to get all slices +one_slice = client.slices.get_all()[0] + # Modify the slice my_slice.modify( max_data_connections = 12, @@ -89,4 +94,4 @@ async def slice_attachments(): # Simple manual polling can be implemented like this: while my_slice.state != "AVAILABLE": my_slice.refresh() - time.sleep(1) + time.sleep(1) \ No newline at end of file diff --git a/examples/qod_example.py b/examples/qod_example.py index b15411f..f95aa4d 100644 --- a/examples/qod_example.py +++ b/examples/qod_example.py @@ -6,7 +6,7 @@ from network_as_code.models.device import DeviceIpv4Addr -client = nac.NetworkAsCodeClient(...) +client = nac.NetworkAsCodeClient("") # Identify the device with its ID, # IP address(es) and optionally, a phone number diff --git a/integration_tests/test_device_status.py b/integration_tests/test_device_status.py index 2bf8de8..2415cae 100644 --- a/integration_tests/test_device_status.py +++ b/integration_tests/test_device_status.py @@ -21,8 +21,6 @@ def test_creating_connectivity_subscription_with_notification(client, device): notification_auth_token="c8974e592c2fa383d4a3960714", ) - print(subscription) - subscription.delete() def test_creating_connectivity_subscription_roaming(client, device): diff --git a/integration_tests/test_network_insights.py b/integration_tests/test_network_insights.py index 0a17094..8a0054c 100644 --- a/integration_tests/test_network_insights.py +++ b/integration_tests/test_network_insights.py @@ -3,33 +3,22 @@ from datetime import datetime, timezone, timedelta from network_as_code.models.congestion import Congestion - from network_as_code.models.device import Device -from network_as_code.namespaces import insights @pytest.fixture def nef_device(client) -> Device: - device = client.devices.get(phone_number="3670123456") + device = client.devices.get(phone_number="+3670123456") return device @pytest.fixture def camara_device(client) -> Device: - device = client.devices.get(phone_number="3637123456") + device = client.devices.get(phone_number="+3637123456") return device - -def test_can_query_congestion_level_from_camara_device(camara_device): - congestion = camara_device.get_congestion() - - assert isinstance(congestion, Congestion) - - assert congestion.level in ["none", "low", "medium", "high"] - - -def test_can_query_within_time_range(camara_device: Device): - congestion = camara_device.get_congestion(start=datetime.now(timezone.utc), end=datetime.now(timezone.utc) + timedelta(hours=3)) - - assert congestion.level in ["none", "low", "medium", "high"] +@pytest.fixture +def naid_device(client) -> Device: + device = client.devices.get(network_access_identifier="testdevice@testcsp.net") + return device def test_can_subscribe_for_congestion_info(client, camara_device: Device): subscription = client.insights.subscribe_to_congestion_info( @@ -94,3 +83,53 @@ def test_can_get_list_of_subscriptions(client, camara_device: Device): for subscription in subscriptions: subscription.delete() + + +def test_can_query_congestion_level_from_camara_device(client, camara_device): + subscription = client.insights.subscribe_to_congestion_info( + camara_device, + notification_url="https://example.com", + subscription_expire_time=datetime.now(timezone.utc) + timedelta(minutes=5), + notification_auth_token="my-auth-token" + ) + + congestion = camara_device.get_congestion() + + assert isinstance(congestion, list) + + assert congestion[0].level in ["None", "Low", "Medium", "High"] + + subscription.delete() + +@pytest.mark.xfail +def test_can_query_congestion_level_from_nef_device(client, nef_device): + subscription = client.insights.subscribe_to_congestion_info( + nef_device, + notification_url="https://example.com", + subscription_expire_time=datetime.now(timezone.utc) + timedelta(minutes=5), + notification_auth_token="my-auth-token" + ) + + congestion = nef_device.get_congestion() + + assert isinstance(congestion, list) + + assert congestion[0].level in ["None", "Low", "Medium", "High"] + + subscription.delete() + +def test_can_query_within_time_range(client, camara_device: Device): + subscription = client.insights.subscribe_to_congestion_info( + camara_device, + notification_url="https://example.com", + subscription_expire_time=datetime.now(timezone.utc) + timedelta(minutes=5), + notification_auth_token="my-auth-token" + ) + + congestion = camara_device.get_congestion(start=datetime.now(timezone.utc), end=datetime.now(timezone.utc) + timedelta(hours=3)) + + assert congestion[0].level in ["None", "Low", "Medium", "High"] + + assert congestion[0].confidence + + subscription.delete() diff --git a/integration_tests/test_qos.py b/integration_tests/test_qos.py index 081ad3a..c0bd792 100644 --- a/integration_tests/test_qos.py +++ b/integration_tests/test_qos.py @@ -1,4 +1,5 @@ +import pdb import pytest from network_as_code.models.session import PortsSpec, PortRange @@ -12,7 +13,7 @@ def device(client) -> Device: @pytest.fixture def setup_and_cleanup_session_data(device): - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) yield session @@ -22,29 +23,32 @@ def test_getting_a_device(client, device): assert device.ipv4_address.public_address == "1.1.1.2" def test_creating_a_qos_flow(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") - + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) + + assert session.service_ipv4 == "5.6.7.8" + assert session.device.network_access_identifier == device.network_access_identifier session.delete() def test_creating_a_qos_flow_for_device_with_only_phone_number(client, device): - device = client.devices.get(phone_number=f"3670{random.randint(123456, 999999)}", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80)) + device = client.devices.get(phone_number=f"+3670{random.randint(123456, 999999)}", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80)) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) + assert session.device.phone_number == device.phone_number session.delete() def test_creating_a_qos_flow_medium_profile(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_M") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_M", duration=3600) session.delete() def test_creating_a_qos_flow_small_profile(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_S") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_S", duration=3600) session.delete() def test_creating_a_qos_flow_low_latency_profile(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_E") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_E", duration=3600) session.delete() @@ -59,13 +63,28 @@ def test_getting_a_created_qos_session_by_id(client, device, setup_and_cleanup_s assert True def test_creating_a_qos_flow_with_port_info(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", service_ports=PortsSpec(ports=[80]), profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", service_ports=PortsSpec(ports=[80]), profile="QOS_L", duration=3600) + assert session.service_ports.ports == [80] session.delete() def test_creating_a_qos_flow_with_service_port_and_device_port(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", service_ports=PortsSpec(ports=[80]), profile="QOS_L", device_ports=PortsSpec(ports=[20000])) + session = device.create_qod_session(service_ipv4="5.6.7.8", service_ports=PortsSpec(ports=[80]), profile="QOS_L", device_ports=PortsSpec(ports=[20000]), duration=3600) + + assert session.service_ports.ports == [80] + session.delete() + +def test_creating_a_qos_flow_with_service_ipv6(client, device): + session = device.create_qod_session(service_ipv6="2001:db8:1234:5678:9abc:def0:fedc:ba98", service_ports=PortsSpec(ports=[80]), profile="QOS_L", device_ports=PortsSpec(ports=[20000]), duration=3600) + + assert session.service_ipv6 == "2001:db8:1234:5678:9abc:def0:fedc:ba98" + session.delete() +def test_creating_a_qos_flow_with_device_ipv6(client): + device_ipv6 = client.devices.get(f"test-device{random.randint(1, 1000)}@testcsp.net", ipv6_address = "2001:db8:1234:5678:9abc:def0:fedc:ba98") + session = device_ipv6.create_qod_session(service_ipv6="2001:db8:1234:5678:9abc:def0:fedc:ba98", service_ports=PortsSpec(ports=[80]), profile="QOS_L", device_ports=PortsSpec(ports=[20000]), duration=3600) + assert session.service_ipv6 == "2001:db8:1234:5678:9abc:def0:fedc:ba98" + assert session.device.ipv6_address == device_ipv6.ipv6_address session.delete() def test_port_range_field_aliasing(): @@ -75,8 +94,10 @@ def test_port_range_field_aliasing(): assert "to" in port_range.model_dump(by_alias=True).keys() def test_creating_a_qos_flow_with_service_port_range(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", service_ports=PortsSpec(ranges=[PortRange(start=80, end=443)]), profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", service_ports=PortsSpec(ranges=[PortRange(start=80, end=443)]), profile="QOS_L", duration=3600) + assert session.service_ports.ranges[0].start == 80 + assert session.service_ports.ranges[0].end == 443 session.delete() def test_creating_a_qos_flow_with_duration(client, device): @@ -88,15 +109,22 @@ def test_creating_a_qos_flow_with_duration(client, device): assert session.duration().seconds == 60 def test_creating_a_qos_flow_with_notification_url(client, device): - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", notification_url="https://example.com/notifications", notification_auth_token="c8974e592c2fa383d4a3960714") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", notification_url="https://example.com/notifications", notification_auth_token="c8974e592c2fa383d4a3960714", duration=3600) session.delete() +def test_getting_all_sessions(client, device): + device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) + sessions = device.sessions() + + assert len(sessions) >= 0 + device.clear_sessions() + def test_clearing_qos_flows(client, device): ids = [] for i in range(5): - created_session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") + created_session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) ids.append(created_session.id) device.clear_sessions() @@ -107,13 +135,17 @@ def test_clearing_qos_flows(client, device): def test_creating_session_with_public_and_private_ipv4(client): device = client.devices.get("test-device@testcsp.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2")) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) + assert session.device.ipv4_address.public_address == device.ipv4_address.public_address + assert session.device.ipv4_address.private_address == device.ipv4_address.private_address session.delete() def test_creating_session_with_public_ipv4_and_public_port(client): device = client.devices.get("test-device@testcsp.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", public_port=80)) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) + assert session.device.ipv4_address.public_address == device.ipv4_address.public_address + assert session.device.ipv4_address.public_port == device.ipv4_address.public_port session.delete() diff --git a/integration_tests/test_slice.py b/integration_tests/test_slice.py index a957759..71eb00d 100644 --- a/integration_tests/test_slice.py +++ b/integration_tests/test_slice.py @@ -30,6 +30,7 @@ def setup_and_cleanup_slice_data(client): slice.delete() +@pytest.mark.xfail def test_getting_slices(client): assert type(client.slices.get_all()) is list @@ -69,6 +70,7 @@ def test_creating_a_slice_with_optional_args(client): slice.delete() +@pytest.mark.xfail def test_getting_a_slice(client, setup_and_cleanup_slice_data): new_slice = setup_and_cleanup_slice_data @@ -86,6 +88,7 @@ def test_deleting_a_slice_marks_it_as_deleted(client, setup_and_cleanup_slice_da assert slice.state == "DELETED" +@pytest.mark.xfail def test_getting_attachments(client): assert type(client.slices.get_all_attachments()) is list diff --git a/network_as_code/__init__.py b/network_as_code/__init__.py index ab7fb09..7aed7be 100644 --- a/network_as_code/__init__.py +++ b/network_as_code/__init__.py @@ -14,4 +14,4 @@ from .client import NetworkAsCodeClient -__version__ = "3.1.0" +__version__ = "4.0.0" diff --git a/network_as_code/api/congestion_api.py b/network_as_code/api/congestion_api.py index 7277761..b7ca85a 100644 --- a/network_as_code/api/congestion_api.py +++ b/network_as_code/api/congestion_api.py @@ -35,7 +35,7 @@ def fetch_congestion(self, device, start: Optional[str] = None, end: Optional[st if end: body["end"] = end - response = self.client.post(url="/device", json=body) + response = self.client.post(url="/query", json=body) error_handler(response) diff --git a/network_as_code/api/location_api.py b/network_as_code/api/location_api.py index 861d658..cd40853 100644 --- a/network_as_code/api/location_api.py +++ b/network_as_code/api/location_api.py @@ -28,7 +28,7 @@ def verify_location(self, latitude, longitude, device, radius, max_age=60): body = { "device": device.model_dump(mode='json', by_alias=True, exclude_none=True), "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": {"latitude": latitude, "longitude": longitude}, "radius": radius, }, @@ -41,7 +41,8 @@ def verify_location(self, latitude, longitude, device, radius, max_age=60): error_handler(response) - return response.json()["verificationResult"] == "TRUE" + result = response.json()["verificationResult"] + return True if result == "TRUE" else False if result == "FALSE" else result class LocationRetrievalAPI: diff --git a/network_as_code/api/qod_api.py b/network_as_code/api/qod_api.py index a7b12e6..fec8fa3 100644 --- a/network_as_code/api/qod_api.py +++ b/network_as_code/api/qod_api.py @@ -39,11 +39,11 @@ def create_session( self, device, profile, + duration, service_ipv4, service_ipv6=None, device_ports: Union[None, any] = None, service_ports: Union[None, any] = None, - duration=None, notification_url=None, notification_auth_token=None, ): @@ -51,11 +51,11 @@ def create_session( #### Args: profile (any): Name of the requested QoS profile. + duration(int): The length of the QoD session in seconds. service_ipv4 (any): IPv4 address of the service. service_ipv6 (optional): IPv6 address of the service. device_ports (optional): List of the device ports. service_ports (optional): List of the application server ports. - duration (optional): Session duration in seconds. notification_url (optional): Notification URL for session-related events. notification_token (optional): Security bearer token to authenticate registration of session. @@ -66,6 +66,7 @@ def create_session( "qosProfile": profile, "device": device.model_dump(mode='json', by_alias=True, exclude_none=True), "applicationServer": {"ipv4Address": service_ipv4}, + "duration": duration } if device_ports: @@ -77,9 +78,6 @@ def create_session( if service_ipv6: session_resource["applicationServer"]["ipv6Address"] = service_ipv6 - if duration: - session_resource["duration"] = duration - if notification_url: session_resource["notificationUrl"] = notification_url diff --git a/network_as_code/models/congestion.py b/network_as_code/models/congestion.py index 3030ca2..491d155 100644 --- a/network_as_code/models/congestion.py +++ b/network_as_code/models/congestion.py @@ -17,6 +17,20 @@ from pydantic import BaseModel, PrivateAttr from network_as_code.api.client import APIClient +class Congestion(BaseModel): + level: str + start: datetime + stop: datetime + confidence: Optional[int] + + @classmethod + def from_json(cls, json) -> "Congestion": + level = json["congestionLevel"] + confidence = json.get("confidenceLevel") + start = datetime.fromisoformat(json["timeIntervalStart"].replace("Z", "+00:00")) + stop = datetime.fromisoformat(json["timeIntervalStop"].replace("Z", "+00:00")) + + return cls(level=level, confidence=confidence, start=start, stop=stop) class CongestionSubscription(BaseModel): id: Optional[str] = None @@ -30,7 +44,3 @@ def __init__(self, api: APIClient, **data): def delete(self): self._api.congestion.delete_subscription(self.id) - - -class Congestion(BaseModel): - level: str diff --git a/network_as_code/models/device.py b/network_as_code/models/device.py index b26014c..e326231 100644 --- a/network_as_code/models/device.py +++ b/network_as_code/models/device.py @@ -120,11 +120,11 @@ def network_access_id(self) -> Union[str, None]: def create_qod_session( self, profile, + duration, service_ipv4=None, service_ipv6=None, device_ports: Union[None, PortsSpec] = None, service_ports: Union[None, PortsSpec] = None, - duration=None, notification_url=None, notification_auth_token=None, ) -> QoDSession: @@ -132,17 +132,17 @@ def create_qod_session( #### Args: profile (any): Name of the requested QoS profile. + duration(int): The length of the QoD session in seconds. service_ipv4 (any): IPv4 address of the service. service_ipv6 (optional): IPv6 address of the service. device_ports (optional): List of the device ports. service_ports (optional): List of the application server ports. - duration (optional): Session duration in seconds. notification_url (optional): Notification URL for session-related events. notification_token (optional): Security bearer token to authenticate registration of session. #### Example: ```python - session = device.create_session(profile="QOS_L", + session = device.create_session(profile="QOS_L", duration=3600, service_ipv4="5.6.7.8", service_ipv6="2041:0000:140F::875B:131B", notification_url="https://example.com/notifications, notification_token="c8974e592c2fa383d4a3960714") @@ -155,20 +155,24 @@ def create_qod_session( session = self._api.sessions.create_session( self, profile, + duration, service_ipv4, service_ipv6, device_ports, service_ports, - duration, notification_url, notification_auth_token, ) - # Convert response body to an Event model # Event(target=session.json().get('id'), atUnix=session.json().get('expiresAt')) return QoDSession.convert_session_model( - self._api, self.ipv4_address, session.json() + self._api, self, session.json() ) + def filter_sessions_by_device(self, session: dict): + return ( + (session['device'].get('networkAccessIdentifier') == self.network_access_identifier) and + (session['device'].get('phoneNumber') is None or session['device'].get('phoneNumber') == self.phone_number) + ) def sessions(self) -> List[QoDSession]: """List sessions of the device. TODO change the name to get_sessions @@ -180,10 +184,11 @@ def sessions(self) -> List[QoDSession]: """ try: sessions = self._api.sessions.get_all_sessions(self) + filtered_sessions = [session for session in sessions.json() if self.filter_sessions_by_device(session)] return list( map( self.__convert_session_model, - sessions.json(), + filtered_sessions, ) ) except NotFound: @@ -197,7 +202,7 @@ def clear_sessions(self): session.delete() def __convert_session_model(self, session) -> QoDSession: - return QoDSession.convert_session_model(self._api, self.ipv4_address, session) + return QoDSession.convert_session_model(self._api, self, session) def location(self, max_age: int = 60) -> Location: """Returns the location of the device. @@ -264,7 +269,7 @@ def location(self, max_age: int = 60) -> Location: def verify_location( self, longitude: float, latitude: float, radius: float, max_age: int = 60 - ) -> bool: + ) -> Union[bool, str]: """Verifies the location of the device (Returns boolean value). #### Args: @@ -307,11 +312,15 @@ def get_roaming(self) -> RoamingStatus: country_name=status.get("countryName"), ) + # TODO: # pylint: disable=fixme + # In the future this won't be possible without first creating a CongestionSubscription + # Either this needs to be migrated to CongestionSubscription, needs to take a valid + # CongestionSubscription as a parameter or needs to be documented as having that requirement def get_congestion( self, start: Union[datetime, str, None] = None, end: Union[datetime, str, None] = None, - ) -> Congestion: + ) -> List[Congestion]: """Get the congestion level this device is experiencing #### Args: @@ -325,7 +334,9 @@ def get_congestion( json = self._api.congestion.fetch_congestion(self, start=start, end=end) - return Congestion(level=json["level"]) + assert isinstance(json, list) + + return [Congestion.from_json(congestion_json) for congestion_json in json] def get_sim_swap_date(self) -> str: """Get the latest simswap date. diff --git a/network_as_code/models/session.py b/network_as_code/models/session.py index 8cd99e4..43dd86e 100644 --- a/network_as_code/models/session.py +++ b/network_as_code/models/session.py @@ -12,14 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Union, List, Optional +from typing import Union, List, Optional, Any from datetime import datetime -from pydantic import ConfigDict, BaseModel, PrivateAttr +from pydantic import ConfigDict, BaseModel, PrivateAttr from network_as_code.api.client import APIClient - ALIASES = {"start": "from", "end": "to"} @@ -63,7 +62,8 @@ class QoDSession(BaseModel, arbitrary_types_allowed=True): #### Public Attributes: id (str): Session identifier. - service_ip (str): IP address of a service. + service_ipv4 (str): IPv4 address of the service. + service_ipv6 (str): IPv6 address of the service. service_ports (Union[PortsSpec, None]): List of ports for a service. profile (str): Name of the requested QoS profile. status(str): Status of the requested QoS. @@ -82,6 +82,10 @@ class QoDSession(BaseModel, arbitrary_types_allowed=True): status: str started_at: Union[datetime, None] = None expires_at: Union[datetime, None] = None + device: Any # Change this to Type Device, after solving the circular import issue later + service_ipv4: Union[str, None] = None + service_ipv6: Union[str, None] = None + service_ports: Union[PortsSpec, None] = None def __init__(self, api: APIClient, **data) -> None: super().__init__(**data) @@ -99,7 +103,7 @@ def duration(self): return self.expires_at - self.started_at @staticmethod - def convert_session_model(api, ip, session): + def convert_session_model(api, device, session): """Returns a `Session` instance. Assigns the startedAt and expiresAt attributes None if their value not found. @@ -119,13 +123,16 @@ def convert_session_model(api, ip, session): if session.get("expiresAt", False) else None ) + service = session.get("applicationServer") + service_ports = session.get('applicationServerPorts') return QoDSession( api=api, id=session["sessionId"], - device_ip=ip, + device=device, device_ports=None, - service_ip="", - service_ports=None, + service_ipv4=service.get("ipv4Address") if service else None, + service_ipv6=service.get("ipv6Address") if service else None, + service_ports=PortsSpec(**service_ports) if service_ports else None, profile=session["qosProfile"], status=session["qosStatus"], started_at=started_at, diff --git a/network_as_code/namespaces/session.py b/network_as_code/namespaces/session.py index 7ccffaf..5b6bca7 100644 --- a/network_as_code/namespaces/session.py +++ b/network_as_code/namespaces/session.py @@ -30,5 +30,6 @@ def get(self, id: str) -> QoDSession: id (str): ID of the QoS Session """ session_object = self.api.sessions.get_session(id) - - return QoDSession.convert_session_model(self.api, "", session_object.json()) + device = session_object.json()['device'] + return QoDSession.convert_session_model(self.api, device, session_object.json()) + \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 88bfc2f..3e1ae0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "network_as_code" -version = "3.1.0" +version = "4.0.0" description = "" authors = ["Sami Lahtinen ", "Pavel Garmuyev "] packages = [ diff --git a/tests/test_location_mock.py b/tests/test_location_mock.py index 4b6dbe7..01e7ca2 100644 --- a/tests/test_location_mock.py +++ b/tests/test_location_mock.py @@ -19,7 +19,7 @@ def test_get_location(httpx_mock: httpx_mock, device): mock_response = { "lastLocationTime": "2023-09-12T11:41:28+03:00", "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": { "latitude": 0.0, "longitude": 0.0 @@ -65,7 +65,7 @@ def test_get_location_without_maxage(httpx_mock: httpx_mock, device): mock_response = { "lastLocationTime": "2023-09-12T11:41:28+03:00", "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": { "latitude": 0.0, "longitude": 0.0 @@ -111,7 +111,7 @@ def test_get_location_without_civic_address(httpx_mock: httpx_mock, device): mock_response = { "lastLocationTime": "2023-09-12T11:41:28+03:00", "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": { "latitude": 0.0, "longitude": 0.0 @@ -158,7 +158,7 @@ def test_verify_location(httpx_mock: httpx_mock, device): } }, "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": { "latitude": 47, "longitude": 19 @@ -191,7 +191,7 @@ def test_verify_location_with_max_age(httpx_mock: httpx_mock, device): } }, "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": { "latitude": 47, "longitude": 19 @@ -208,6 +208,39 @@ def test_verify_location_with_max_age(httpx_mock: httpx_mock, device): assert device.verify_location(longitude=19, latitude=47, radius=10_000, max_age=70) +def test_verify_partial_location(httpx_mock: httpx_mock, device): + url = f"https://location-verification.p-eu.rapidapi.com/verify" + + httpx_mock.add_response( + url=url, + method='POST', + match_content=json.dumps({ + "device": { + "networkAccessIdentifier": "test_device_id", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "privateAddress": "1.1.1.2", + "publicPort": 80 + } + }, + "area": { + "areaType": "CIRCLE", + "center": { + "latitude": 47, + "longitude": 19 + }, + "radius": 10_000 + }, + "maxAge": 60 + }).encode(), + json={ + "lastLocationTime": "2023-09-11T18:34:01+03:00", + "verificationResult": "PARTIAL" + } + ) + + result = device.verify_location(longitude=19, latitude=47, radius=10_000) + assert result == "PARTIAL" def test_verify_location_raises_exception_if_unauthenticated(httpx_mock: httpx_mock, device): url = f"https://location-verification.p-eu.rapidapi.com/verify" @@ -225,7 +258,7 @@ def test_verify_location_raises_exception_if_unauthenticated(httpx_mock: httpx_m } }, "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": { "latitude": 47, "longitude": 19 @@ -259,7 +292,7 @@ def test_verify_location_raises_exception_if_server_fails(httpx_mock: httpx_mock } }, "area": { - "areaType": "Circle", + "areaType": "CIRCLE", "center": { "latitude": 47, "longitude": 19 diff --git a/tests/test_network_insights.py b/tests/test_network_insights.py index 5268c6e..01f656c 100644 --- a/tests/test_network_insights.py +++ b/tests/test_network_insights.py @@ -2,7 +2,7 @@ import pytest from datetime import datetime -from network_as_code.models.congestion import CongestionSubscription +from network_as_code.models.congestion import Congestion, CongestionSubscription from network_as_code.models.device import Device from tests.test_device_status_api_mock import to_bytes @@ -19,11 +19,16 @@ def camara_device(client) -> Device: def test_can_fetch_current_congestion_info_from_device_model(httpx_mock, client, camara_device): httpx_mock.add_response( - url="https://congestion-insights.p-eu.rapidapi.com/device", + url="https://congestion-insights.p-eu.rapidapi.com/query", method="POST", - json={ - "level": "medium" - }, + json=[ + { + "timeIntervalStart": "2024-08-20T21:00:00+00:00", + "timeIntervalStop": "2024-08-20T21:05:00+00:00", + "congestionLevel": "medium", + "confidenceLevel": 50 + } + ], match_content=to_bytes( { "device": { @@ -35,15 +40,24 @@ def test_can_fetch_current_congestion_info_from_device_model(httpx_mock, client, congestion = camara_device.get_congestion() - assert congestion.level == "medium" + assert isinstance(congestion, list) + + assert congestion[0] + assert isinstance(congestion[0], Congestion) + assert congestion[0].level == "medium" def test_can_request_congestion_time_range(httpx_mock, client, camara_device): httpx_mock.add_response( - url="https://congestion-insights.p-eu.rapidapi.com/device", + url="https://congestion-insights.p-eu.rapidapi.com/query", method="POST", - json={ - "level": "medium" - }, + json=[ + { + "timeIntervalStart": "2024-08-20T21:00:00+00:00", + "timeIntervalStop": "2024-08-20T21:05:00+00:00", + "congestionLevel": "medium", + "confidenceLevel": 50 + } + ], match_content=to_bytes( { "device": { @@ -60,7 +74,7 @@ def test_can_request_congestion_time_range(httpx_mock, client, camara_device): end=datetime.fromisoformat("2024-04-16T08:18:01.773761+00:00") ) - assert congestion.level == "medium" + assert congestion[0].level == "medium" def test_can_subscribe_to_congestion_reports(httpx_mock, client, camara_device): httpx_mock.add_response( diff --git a/tests/test_qos_mock.py b/tests/test_qos_mock.py index 35a7349..d80902e 100644 --- a/tests/test_qos_mock.py +++ b/tests/test_qos_mock.py @@ -11,12 +11,24 @@ def test_getting_a_device(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80)) assert device.network_access_identifier == "testuser@open5glab.net" - def test_creating_a_session_mock(httpx_mock, client): - device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80), phone_number = "9382948473") + device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80), phone_number = "+9382948473") mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "privateAddress": "1.1.1.2", + "publicPort": 80 + }, + "phoneNumber": "+9382948473" + }, + "applicationServer": { + "ipv4Address": "5.6.7.8", + }, + "duration": 3600, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -29,7 +41,7 @@ def test_creating_a_session_mock(httpx_mock, client): "qosProfile": "QOS_L", "device": { "networkAccessIdentifier": "testuser@open5glab.net", - "phoneNumber": "9382948473", + "phoneNumber": "+9382948473", "ipv4Address": { "publicAddress": "1.1.1.2", "privateAddress": "1.1.1.2", @@ -39,23 +51,38 @@ def test_creating_a_session_mock(httpx_mock, client): "applicationServer": { "ipv4Address": "5.6.7.8", }, + "duration": 3600 }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) assert session.status == mock_response["qosStatus"] + assert session.device.network_access_identifier == device.network_access_identifier + assert session.service_ipv4 == "5.6.7.8" + httpx_mock.add_response( json={} ) session.delete() def test_creating_a_minimal_session(httpx_mock, client): - device = client.devices.get(ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", public_port=80), phone_number = "9382948473") + device = client.devices.get(ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", public_port=80), phone_number = "+9382948473") mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "ipv4Address": { + "publicAddress": "1.1.1.2", + "publicPort": 80 + }, + "phoneNumber": "+9382948473" + }, + "applicationServer": { + "ipv4Address": "5.6.7.8", + }, + "duration": 3600, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -67,7 +94,7 @@ def test_creating_a_minimal_session(httpx_mock, client): match_content = json.dumps({ "qosProfile": "QOS_L", "device": { - "phoneNumber": "9382948473", + "phoneNumber": "+9382948473", "ipv4Address": { "publicAddress": "1.1.1.2", "publicPort": 80 @@ -76,16 +103,33 @@ def test_creating_a_minimal_session(httpx_mock, client): "applicationServer": { "ipv4Address": "5.6.7.8", }, + "duration": 3600 }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", duration=3600) + assert session.device.phone_number == device.phone_number + assert session.service_ipv4 == "5.6.7.8" def test_creating_a_session_with_ipv6(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80), ipv6_address = "2266:25::12:0:ad12", phone_number = "9382948473") mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "phoneNumber": "9382948473", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "privateAddress": "1.1.1.2", + "publicPort": 80 + } + }, + "applicationServer": { + "ipv4Address": "5.6.7.8", + "ipv6Address": "2266:25::12:0:ad12" + }, + "duration": 3600, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -107,18 +151,22 @@ def test_creating_a_session_with_ipv6(httpx_mock, client): "ipv6Address": "2266:25::12:0:ad12", }, "applicationServer": { - "ipv4Address": "5.6.7.8", + "ipv4Address": "5.6.7.8", "ipv6Address": "2266:25::12:0:ad12" }, + "duration": 3600 }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", service_ipv6="2266:25::12:0:ad12", profile="QOS_L") + session = device.create_qod_session(service_ipv4="5.6.7.8", service_ipv6="2266:25::12:0:ad12", profile="QOS_L", duration=3600) + assert type(session.started_at) == datetime assert type(session.expires_at) == datetime assert type(session.duration()) == timedelta assert session.status == mock_response["qosStatus"] - + assert session.service_ipv4 == "5.6.7.8" + assert session.service_ipv6 == "2266:25::12:0:ad12" + httpx_mock.add_response( json={} ) @@ -130,6 +178,17 @@ def test_creating_qod_session_with_device_ports(httpx_mock, client): mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "publicPort": 80 + }, + }, + "applicationServer": { + "ipv4Address": "5.6.7.8" + }, + "duration": 3600, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -150,13 +209,14 @@ def test_creating_qod_session_with_device_ports(httpx_mock, client): "applicationServer": { "ipv4Address": "5.6.7.8" }, + "duration": 3600, "devicePorts": { "ports": [80, 443] - }, + } }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", device_ports=PortsSpec(ports=[80, 443])) + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", device_ports=PortsSpec(ports=[80, 443]), duration=3600) def test_creating_qod_session_with_device_port_range(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", public_port=80)) @@ -164,6 +224,17 @@ def test_creating_qod_session_with_device_port_range(httpx_mock, client): mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "publicPort": 80 + }, + }, + "applicationServer": { + "ipv4Address": "5.6.7.8" + }, + "duration": 3600, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -184,13 +255,14 @@ def test_creating_qod_session_with_device_port_range(httpx_mock, client): "applicationServer": { "ipv4Address": "5.6.7.8" }, + "duration": 3600, "devicePorts": { "ranges": [{"from": 1024, "to": 3000}] - }, + } }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", device_ports=PortsSpec(ranges=[PortRange(start=1024, end=3000)])) + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", device_ports=PortsSpec(ranges=[PortRange(start=1024, end=3000)]), duration=3600) def test_creating_qod_session_with_service_ports(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", public_port=80)) @@ -198,6 +270,20 @@ def test_creating_qod_session_with_service_ports(httpx_mock, client): mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "publicPort": 80 + }, + }, + "applicationServer": { + "ipv4Address": "5.6.7.8" + }, + "duration": 3600, + "applicationServerPorts": { + "ports": [80, 443] + }, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -218,13 +304,15 @@ def test_creating_qod_session_with_service_ports(httpx_mock, client): "applicationServer": { "ipv4Address": "5.6.7.8" }, + "duration": 3600, "applicationServerPorts": { "ports": [80, 443] - }, + } }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", service_ports=PortsSpec(ports=[80, 443])) + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", service_ports=PortsSpec(ports=[80, 443]), duration=3600) + assert session.service_ports.ports == [80, 443] def test_creating_qod_session_with_service_port_range(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", public_port=80)) @@ -232,6 +320,20 @@ def test_creating_qod_session_with_service_port_range(httpx_mock, client): mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "publicPort": 80 + }, + }, + "applicationServer": { + "ipv4Address": "5.6.7.8" + }, + "duration": 3600, + "applicationServerPorts": { + "ranges": [{"from": 1024, "to": 3000}] + }, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -252,13 +354,16 @@ def test_creating_qod_session_with_service_port_range(httpx_mock, client): "applicationServer": { "ipv4Address": "5.6.7.8" }, + "duration": 3600, "applicationServerPorts": { "ranges": [{"from": 1024, "to": 3000}] - }, + } }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", service_ports=PortsSpec(ranges=[PortRange(start=1024, end=3000)])) + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", service_ports=PortsSpec(ranges=[PortRange(start=1024, end=3000)]), duration=3600) + assert session.service_ports.ranges[0].start == 1024 + assert session.service_ports.ranges[0].end == 3000 def test_creating_a_qod_session_with_duration(httpx_mock, client): device = client.devices.get(ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", public_port=80), phone_number = "9382948473") @@ -266,6 +371,7 @@ def test_creating_a_qod_session_with_duration(httpx_mock, client): mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "duration": 3600, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -298,6 +404,17 @@ def test_creating_a_qod_session_with_notification_url_and_auth_token(httpx_mock, mock_response = { "sessionId": "08305343-7ed2-43b7-8eda-4c5ae9805bd0", "qosProfile": "QOS_L", + "device": { + "phoneNumber": "9382948473", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "publicPort": 80 + }, + }, + "applicationServer": { + "ipv4Address": "5.6.7.8", + }, + "duration": 3600, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -318,18 +435,27 @@ def test_creating_a_qod_session_with_notification_url_and_auth_token(httpx_mock, "applicationServer": { "ipv4Address": "5.6.7.8", }, + "duration": 3600, "notificationUrl": "https://example.com", "notificationAuthToken": "Bearer my-auth-token" }).encode('utf-8'), json=mock_response) - session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", notification_url="https://example.com", notification_auth_token="my-auth-token") + session = device.create_qod_session(service_ipv4="5.6.7.8", profile="QOS_L", notification_url="https://example.com", notification_auth_token="my-auth-token", duration=3600) def test_getting_one_session(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80)) mock_response = { "sessionId": "1234", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "privateAddress": "1.1.1.2", + "publicPort": 80 + }, + }, "qosStatus": "REQUESTED", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -343,6 +469,7 @@ def test_getting_one_session(httpx_mock, client): session = client.sessions.get("1234") assert session.id == mock_response['sessionId'] + def test_getting_all_sessions(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address = DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80)) @@ -350,6 +477,14 @@ def test_getting_all_sessions(httpx_mock, client): mock_response = [{ "sessionId": "1234", "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "privateAddress": "1.1.1.2", + "publicPort": 80 + }, + }, "qosStatus": "BLA", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -371,6 +506,14 @@ def test_getting_all_sessions_phone_number(httpx_mock, client): mock_response = [{ "sessionId": "1234", "qosProfile": "QOS_L", + "device": { + "phoneNumber": "1234567890", + "ipv4Address": { + "publicAddress": "1.1.1.2", + "privateAddress": "1.1.1.2", + "publicPort": 80 + }, + }, "qosStatus": "BLA", "startedAt": "2024-06-18T08:48:12.300312Z", "expiresAt": "2024-06-18T08:48:12.300312Z" @@ -421,6 +564,116 @@ def test_create_qod_session_requires_ip(httpx_mock, client): device = client.devices.get("testuser@open5glab.net", ipv4_address=DeviceIpv4Addr(public_address="1.1.1.2", private_address="1.1.1.2", public_port=80), phone_number="9382948473") with pytest.raises(ValueError) as excinfo: - session = device.create_qod_session(profile="QOS_L") + session = device.create_qod_session(profile="QOS_L", duration=3600) assert "At least one of IP parameters must be provided" in str(excinfo.value) + + +def test_getting_all_sessions_filtered_by_device_naid(httpx_mock, client): + device = client.devices.get("testuser@open5glab.net") + + mock_response = [{ + "sessionId": "1234", + "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + }, + "qosStatus": "BLA", + "startedAt": "2024-06-18T08:48:12.300312Z", + "expiresAt": "2024-06-18T08:48:12.300312Z" + }, { + "sessionId": "1234", + "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "test2user@open5glab.net", + }, + "qosStatus": "BLA", + "startedAt": "2024-06-18T08:48:12.300312Z", + "expiresAt": "2024-06-18T08:48:12.300312Z" + },] + + httpx_mock.add_response( + method='GET', + url='https://quality-of-service-on-demand.p-eu.rapidapi.com/sessions?networkAccessIdentifier=testuser@open5glab.net', + json=mock_response + ) + + sessions = device.sessions() + + assert len(sessions) == 1 + +def test_getting_all_sessions_filtered_by_device_phone_number(httpx_mock, client): + device = client.devices.get(phone_number="+1234567890") + + mock_response = [{ + "sessionId": "1234", + "qosProfile": "QOS_L", + "device": { + "phoneNumber": "+1234567890", + }, + "qosStatus": "BLA", + "startedAt": "2024-06-18T08:48:12.300312Z", + "expiresAt": "2024-06-18T08:48:12.300312Z" + }, { + "sessionId": "234", + "qosProfile": "QOS_L", + "device": { + "phoneNumber": "+23423532434", + }, + "qosStatus": "BLA", + "startedAt": "2024-06-18T08:48:12.300312Z", + "expiresAt": "2024-06-18T08:48:12.300312Z" + }, + { + "sessionId": "6643", + "qosProfile": "QOS_M", + "device": { + "phoneNumber": "+1234567890", + }, + "qosStatus": "BLA", + "startedAt": "2024-06-18T08:48:12.300312Z", + "expiresAt": "2024-06-18T08:48:12.300312Z" + }] + + httpx_mock.add_response( + method='GET', + url='https://quality-of-service-on-demand.p-eu.rapidapi.com/sessions?phoneNumber=+1234567890', + json=mock_response + ) + + sessions = device.sessions() + + assert len(sessions) == 2 + +def test_getting_all_sessions_filtered_by_device_with_naid_and_phone_no(httpx_mock, client): + device = client.devices.get(network_access_identifier="testuser@open5glab.net", phone_number="+1234567890") + + mock_response = [{ + "sessionId": "1234", + "qosProfile": "QOS_L", + "device": { + "networkAccessIdentifier": "testuser@open5glab.net", + }, + "qosStatus": "BLA", + "startedAt": "2024-06-18T08:48:12.300312Z", + "expiresAt": "2024-06-18T08:48:12.300312Z" + }, { + "sessionId": "234", + "qosProfile": "QOS_L", + "device": { + "phoneNumber": "+23423532434", + }, + "qosStatus": "BLA", + "startedAt": "2024-06-18T08:48:12.300312Z", + "expiresAt": "2024-06-18T08:48:12.300312Z" + }] + + httpx_mock.add_response( + method='GET', + url='https://quality-of-service-on-demand.p-eu.rapidapi.com/sessions?networkAccessIdentifier=testuser@open5glab.net', + json=mock_response + ) + + sessions = device.sessions() + + assert len(sessions) == 1 \ No newline at end of file