From 6c69eb39632181932b4cd4ebc747c27d7cc51da3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Misbach?= Date: Wed, 30 Oct 2024 18:05:23 +0100 Subject: [PATCH] [uss_qualifier/scenarios/utm/data_exchange_validation] Refactor filtering of mock_uss_interactions --- .../get_op_data_validation.py | 54 +++++----- .../expected_interactions_test_steps.py | 100 +++++++++++------- .../validate_notification_received.py | 55 ++-------- 3 files changed, 101 insertions(+), 108 deletions(-) diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/get_op_data_validation.py b/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/get_op_data_validation.py index 75e47c2af1..7eff427a7b 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/get_op_data_validation.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/get_op_data_validation.py @@ -41,7 +41,9 @@ expect_no_interuss_post_interactions, expect_mock_uss_receives_op_intent_notification, mock_uss_interactions, - is_op_intent_notification_with_id, + notif_op_intent_id_filter, + operation_filter, + direction_filter, ) from monitoring.monitorlib.clients.mock_uss.mock_uss_scd_injection_api import ( MockUssFlightBehavior, @@ -198,26 +200,25 @@ def _plan_successfully_test_case(self, times: Dict[TimeDuringTest, Time]): "Check for notification to tested_uss due to subscription in flight 2 area" ) tested_uss_notifications, _ = mock_uss_interactions( - scenario=self, - mock_uss=self.mock_uss, - op_id=OperationID.NotifyOperationalIntentDetailsChanged, - direction=QueryDirection.Outgoing, - since=flight_2_planning_time, - is_applicable=is_op_intent_notification_with_id(flight_2_oi_ref.id), + self, + self.mock_uss, + flight_2_planning_time, + operation_filter(OperationID.NotifyOperationalIntentDetailsChanged), + direction_filter(QueryDirection.Outgoing), + notif_op_intent_id_filter(flight_2_oi_ref.id), ) self.end_test_step() self.begin_test_step("Validate flight2 GET interaction, if no notification") if not tested_uss_notifications: tested_uss_get_requests, query = mock_uss_interactions( - scenario=self, - mock_uss=self.mock_uss, - op_id=OperationID.GetOperationalIntentDetails, - direction=QueryDirection.Incoming, - since=flight_1_planning_time, - query_params=dict( - entityid=flight_2_oi_ref.id, + self, + self.mock_uss, + flight_1_planning_time, + operation_filter( + OperationID.GetOperationalIntentDetails, entityid=flight_2_oi_ref.id ), + direction_filter(QueryDirection.Incoming), ) with self.check( "Expect GET request when no notification", @@ -322,26 +323,25 @@ def _plan_unsuccessfully_test_case(self, times: Dict[TimeDuringTest, Time]): "Check for notification to tested_uss due to subscription in flight 2 area" ) tested_uss_notifications, _ = mock_uss_interactions( - scenario=self, - mock_uss=self.mock_uss, - op_id=OperationID.NotifyOperationalIntentDetailsChanged, - direction=QueryDirection.Outgoing, - since=flight_2_planning_time, - is_applicable=is_op_intent_notification_with_id(flight_2_oi_ref.id), + self, + self.mock_uss, + flight_2_planning_time, + operation_filter(OperationID.NotifyOperationalIntentDetailsChanged), + direction_filter(QueryDirection.Outgoing), + notif_op_intent_id_filter(flight_2_oi_ref.id), ) self.end_test_step() self.begin_test_step("Validate flight2 GET interaction, if no notification") if not tested_uss_notifications: tested_uss_get_requests, query = mock_uss_interactions( - scenario=self, - mock_uss=self.mock_uss, - op_id=OperationID.GetOperationalIntentDetails, - direction=QueryDirection.Incoming, - since=flight_1_planning_time, - query_params=dict( - entityid=flight_2_oi_ref.id, + self, + self.mock_uss, + flight_1_planning_time, + operation_filter( + OperationID.GetOperationalIntentDetails, entityid=flight_2_oi_ref.id ), + direction_filter(QueryDirection.Incoming), ) with self.check( "Expect GET request when no notification", diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/test_steps/expected_interactions_test_steps.py b/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/test_steps/expected_interactions_test_steps.py index 6e78b97edf..a67b7be92c 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/test_steps/expected_interactions_test_steps.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/data_exchange_validation/test_steps/expected_interactions_test_steps.py @@ -43,11 +43,11 @@ def expect_mock_uss_receives_op_intent_notification( # Check for 'notification found' will be done periodically by waiting for a duration till max_wait_time found, query = wait_in_intervals(mock_uss_interactions)( - scenario=scenario, - mock_uss=mock_uss, - op_id=OperationID.NotifyOperationalIntentDetailsChanged, - direction=QueryDirection.Incoming, - since=st, + scenario, + mock_uss, + st, + operation_filter(OperationID.NotifyOperationalIntentDetailsChanged), + direction_filter(QueryDirection.Incoming), ) with scenario.check("Expect Notification sent", [participant_id]) as check: @@ -78,11 +78,11 @@ def expect_no_interuss_post_interactions( "we have to wait the longest it may take a USS to send a notification before we can establish that they didn't send a notification", ) interactions, query = mock_uss_interactions( - scenario=scenario, - mock_uss=mock_uss, - op_id=OperationID.NotifyOperationalIntentDetailsChanged, - direction=QueryDirection.Incoming, - since=st, + scenario, + mock_uss, + st, + operation_filter(OperationID.NotifyOperationalIntentDetailsChanged), + direction_filter(QueryDirection.Incoming), ) for interaction in interactions: @@ -117,19 +117,11 @@ def expect_no_interuss_post_interactions( def mock_uss_interactions( scenario: TestScenarioType, mock_uss: MockUSSClient, - op_id: OperationID, - direction: QueryDirection, since: StringBasedDateTime, - query_params: Optional[Dict[str, str]] = None, - is_applicable: Optional[Callable[[Interaction], bool]] = None, + *is_applicable: Callable[[Interaction], bool], ) -> Tuple[List[Interaction], Query]: - """ - Determine if mock_uss recorded an interaction for the specified operation in the specified direction. + """Retrieve mock_uss interactions given specific criteria.""" - Raises: - KeyError: if query_params contains a non-existing parameter - IndexError: if query_params is missing a parameter - """ with scenario.check( "Mock USS interactions logs retrievable", [mock_uss.participant_id] ) as check: @@ -145,27 +137,16 @@ def mock_uss_interactions( query_timestamps=[q.request.timestamp for q in e.queries], ) - op = api.OPERATIONS[op_id] + return filter_interactions(interactions, is_applicable), query - if query_params is None: - query_params = {} # avoid linting error due to immutable default argument - op_path = op.path.format(**query_params) # raises KeyError, IndexError - if is_applicable is None: - is_applicable = lambda i: True - result = [] - for interaction in interactions: - if ( - interaction.direction == direction - and interaction.query.request.method == op.verb - and re.search(op_path, interaction.query.request.url) - and is_applicable(interaction) - ): - result.append(interaction) - return result, query +def filter_interactions( + interactions: List[Interaction], filters: Iterable[Callable[[Interaction], bool]] +) -> List[Interaction]: + return list(filter(lambda x: all(f(x) for f in filters), interactions)) -def is_op_intent_notification_with_id( +def notif_op_intent_id_filter( op_intent_id: EntityID, ) -> Callable[[Interaction], bool]: """Returns an `is_applicable` function that detects whether an op intent notification refers to the specified operational intent.""" @@ -179,3 +160,48 @@ def is_applicable(interaction: Interaction) -> bool: return False return is_applicable + + +def base_url_filter( + base_url: str, +) -> Callable[[Interaction], bool]: + """Returns an `is_applicable` function that detects if the request in an interaction is sent to the given base url.""" + + def is_applicable(interaction: Interaction) -> bool: + return interaction.query.request.url.startswith(base_url) + + return is_applicable + + +def direction_filter( + direction: QueryDirection, +) -> Callable[[Interaction], bool]: + """Returns an `is_applicable` filter that filters according to query direction.""" + + def is_applicable(interaction: Interaction) -> bool: + return interaction.direction == direction + + return is_applicable + + +def operation_filter( + op_id: OperationID, + **query_params: str, +) -> Callable[[Interaction], bool]: + """ + Returns an `is_applicable` filter that filters according to operation ID. + If the operation has query parameters, they must be provided through `**query_params`. + + Raises: + KeyError: if query_params contains a non-existing parameter + IndexError: if query_params is missing a parameter + """ + op = api.OPERATIONS[op_id] + op_path = op.path.format(**query_params) # raises KeyError, IndexError + + def is_applicable(interaction: Interaction) -> bool: + return interaction.query.request.method == op.verb and re.search( + op_path, interaction.query.request.url + ) + + return is_applicable diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/subscription_notifications/test_steps/validate_notification_received.py b/monitoring/uss_qualifier/scenarios/astm/utm/subscription_notifications/test_steps/validate_notification_received.py index e9c3c29efb..222c9e4381 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/subscription_notifications/test_steps/validate_notification_received.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/subscription_notifications/test_steps/validate_notification_received.py @@ -5,6 +5,10 @@ from datetime import datetime from monitoring.uss_qualifier.scenarios.astm.utm.data_exchange_validation.test_steps.expected_interactions_test_steps import ( mock_uss_interactions, + operation_filter, + direction_filter, + notif_op_intent_id_filter, + base_url_filter, ) from monitoring.uss_qualifier.scenarios.astm.utm.data_exchange_validation.test_steps.wait import ( wait_in_intervals, @@ -52,14 +56,13 @@ def expect_tested_uss_receives_notification_from_mock_uss( ) as check: try: interactions, query = wait_in_intervals(mock_uss_interactions)( - scenario=scenario, - mock_uss=mock_uss, - op_id=OperationID.NotifyOperationalIntentDetailsChanged, - direction=QueryDirection.Outgoing, - since=StringBasedDateTime(interactions_since_time), - is_applicable=_is_notification_sent_to_url_with_op_intent_id( - op_intent_ref_id, tested_uss_base_url - ), + scenario, + mock_uss, + StringBasedDateTime(interactions_since_time), + operation_filter(OperationID.NotifyOperationalIntentDetailsChanged), + direction_filter(QueryDirection.Outgoing), + notif_op_intent_id_filter(op_intent_ref_id), + base_url_filter(tested_uss_base_url), ) except ValueError as e: check.record_failed( @@ -112,42 +115,6 @@ def expect_tested_uss_receives_notification_from_mock_uss( ) -def _is_notification_sent_to_url_with_op_intent_id( - op_intent_id: str, - base_url: str, -) -> Callable[[Interaction], bool]: - """ - Returns an `is_applicable` function that detects if the request in an interaction is sent to the given url and with given op_intent_id - - Args: - op_intent_id: The operational_intent id that needs to be notified - base_url: The url domain to which the notification request needs to be sent - - """ - - def is_applicable(interaction: Interaction) -> bool: - if "json" in interaction.query.request and interaction.query.request.json: - if ( - interaction.query.query_type - == QueryType.F3548v21USSNotifyOperationalIntentDetailsChanged - ): - try: - notification = ImplicitDict.parse( - interaction.query.request.json, - PutOperationalIntentDetailsParameters, - ) - except (ValueError, TypeError, KeyError) as e: - raise ValueError( - f"Parsing mock_uss notification to type PutOperationalIntentDetailsParameters failed - {e}" - ) - return (notification.operational_intent_id == op_intent_id) and ( - base_url in interaction.query.request.url - ) - return False - - return is_applicable - - def _check_notification_sent_with_subscription_id_and_response( interactions: List[Interaction], subscription_id: str ) -> Tuple[bool, int]: