Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier/scenarios/utm/data_exchange_validation] Refactor filtering of mock_uss_interactions #831

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
expect_no_interuss_post_interactions,
expect_mock_uss_receives_op_intent_notification,
mock_uss_interactions,
is_op_intent_notification_with_id,
notif_op_intent_id_filter,
operation_filter,
direction_filter,
)
from monitoring.monitorlib.clients.mock_uss.mock_uss_scd_injection_api import (
MockUssFlightBehavior,
Expand Down Expand Up @@ -198,26 +200,25 @@ def _plan_successfully_test_case(self, times: Dict[TimeDuringTest, Time]):
"Check for notification to tested_uss due to subscription in flight 2 area"
)
tested_uss_notifications, _ = mock_uss_interactions(
scenario=self,
mock_uss=self.mock_uss,
op_id=OperationID.NotifyOperationalIntentDetailsChanged,
direction=QueryDirection.Outgoing,
since=flight_2_planning_time,
is_applicable=is_op_intent_notification_with_id(flight_2_oi_ref.id),
self,
self.mock_uss,
flight_2_planning_time,
operation_filter(OperationID.NotifyOperationalIntentDetailsChanged),
direction_filter(QueryDirection.Outgoing),
notif_op_intent_id_filter(flight_2_oi_ref.id),
)
self.end_test_step()

self.begin_test_step("Validate flight2 GET interaction, if no notification")
if not tested_uss_notifications:
tested_uss_get_requests, query = mock_uss_interactions(
scenario=self,
mock_uss=self.mock_uss,
op_id=OperationID.GetOperationalIntentDetails,
direction=QueryDirection.Incoming,
since=flight_1_planning_time,
query_params=dict(
entityid=flight_2_oi_ref.id,
self,
self.mock_uss,
flight_1_planning_time,
operation_filter(
OperationID.GetOperationalIntentDetails, entityid=flight_2_oi_ref.id
),
direction_filter(QueryDirection.Incoming),
)
with self.check(
"Expect GET request when no notification",
Expand Down Expand Up @@ -322,26 +323,25 @@ def _plan_unsuccessfully_test_case(self, times: Dict[TimeDuringTest, Time]):
"Check for notification to tested_uss due to subscription in flight 2 area"
)
tested_uss_notifications, _ = mock_uss_interactions(
scenario=self,
mock_uss=self.mock_uss,
op_id=OperationID.NotifyOperationalIntentDetailsChanged,
direction=QueryDirection.Outgoing,
since=flight_2_planning_time,
is_applicable=is_op_intent_notification_with_id(flight_2_oi_ref.id),
self,
self.mock_uss,
flight_2_planning_time,
operation_filter(OperationID.NotifyOperationalIntentDetailsChanged),
direction_filter(QueryDirection.Outgoing),
notif_op_intent_id_filter(flight_2_oi_ref.id),
)
self.end_test_step()

self.begin_test_step("Validate flight2 GET interaction, if no notification")
if not tested_uss_notifications:
tested_uss_get_requests, query = mock_uss_interactions(
scenario=self,
mock_uss=self.mock_uss,
op_id=OperationID.GetOperationalIntentDetails,
direction=QueryDirection.Incoming,
since=flight_1_planning_time,
query_params=dict(
entityid=flight_2_oi_ref.id,
self,
self.mock_uss,
flight_1_planning_time,
operation_filter(
OperationID.GetOperationalIntentDetails, entityid=flight_2_oi_ref.id
),
direction_filter(QueryDirection.Incoming),
)
with self.check(
"Expect GET request when no notification",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ def expect_mock_uss_receives_op_intent_notification(

# Check for 'notification found' will be done periodically by waiting for a duration till max_wait_time
found, query = wait_in_intervals(mock_uss_interactions)(
scenario=scenario,
mock_uss=mock_uss,
op_id=OperationID.NotifyOperationalIntentDetailsChanged,
direction=QueryDirection.Incoming,
since=st,
scenario,
mock_uss,
st,
operation_filter(OperationID.NotifyOperationalIntentDetailsChanged),
direction_filter(QueryDirection.Incoming),
)

with scenario.check("Expect Notification sent", [participant_id]) as check:
Expand Down Expand Up @@ -78,11 +78,11 @@ def expect_no_interuss_post_interactions(
"we have to wait the longest it may take a USS to send a notification before we can establish that they didn't send a notification",
)
interactions, query = mock_uss_interactions(
scenario=scenario,
mock_uss=mock_uss,
op_id=OperationID.NotifyOperationalIntentDetailsChanged,
direction=QueryDirection.Incoming,
since=st,
scenario,
mock_uss,
st,
operation_filter(OperationID.NotifyOperationalIntentDetailsChanged),
direction_filter(QueryDirection.Incoming),
)

for interaction in interactions:
Expand Down Expand Up @@ -117,19 +117,11 @@ def expect_no_interuss_post_interactions(
def mock_uss_interactions(
scenario: TestScenarioType,
mock_uss: MockUSSClient,
op_id: OperationID,
direction: QueryDirection,
since: StringBasedDateTime,
query_params: Optional[Dict[str, str]] = None,
is_applicable: Optional[Callable[[Interaction], bool]] = None,
*is_applicable: Callable[[Interaction], bool],
) -> Tuple[List[Interaction], Query]:
"""
Determine if mock_uss recorded an interaction for the specified operation in the specified direction.
"""Retrieve mock_uss interactions given specific criteria."""

Raises:
KeyError: if query_params contains a non-existing parameter
IndexError: if query_params is missing a parameter
"""
with scenario.check(
"Mock USS interactions logs retrievable", [mock_uss.participant_id]
) as check:
Expand All @@ -145,27 +137,16 @@ def mock_uss_interactions(
query_timestamps=[q.request.timestamp for q in e.queries],
)

op = api.OPERATIONS[op_id]
return filter_interactions(interactions, is_applicable), query

if query_params is None:
query_params = {} # avoid linting error due to immutable default argument
op_path = op.path.format(**query_params) # raises KeyError, IndexError

if is_applicable is None:
is_applicable = lambda i: True
result = []
for interaction in interactions:
if (
interaction.direction == direction
and interaction.query.request.method == op.verb
and re.search(op_path, interaction.query.request.url)
and is_applicable(interaction)
):
result.append(interaction)
return result, query
def filter_interactions(
interactions: List[Interaction], filters: Iterable[Callable[[Interaction], bool]]
) -> List[Interaction]:
return list(filter(lambda x: all(f(x) for f in filters), interactions))


def is_op_intent_notification_with_id(
def notif_op_intent_id_filter(
op_intent_id: EntityID,
) -> Callable[[Interaction], bool]:
"""Returns an `is_applicable` function that detects whether an op intent notification refers to the specified operational intent."""
Expand All @@ -179,3 +160,48 @@ def is_applicable(interaction: Interaction) -> bool:
return False

return is_applicable


def base_url_filter(
base_url: str,
) -> Callable[[Interaction], bool]:
"""Returns an `is_applicable` function that detects if the request in an interaction is sent to the given base url."""

def is_applicable(interaction: Interaction) -> bool:
return interaction.query.request.url.startswith(base_url)

return is_applicable


def direction_filter(
direction: QueryDirection,
) -> Callable[[Interaction], bool]:
"""Returns an `is_applicable` filter that filters according to query direction."""

def is_applicable(interaction: Interaction) -> bool:
return interaction.direction == direction

return is_applicable


def operation_filter(
op_id: OperationID,
**query_params: str,
) -> Callable[[Interaction], bool]:
"""
Returns an `is_applicable` filter that filters according to operation ID.
If the operation has query parameters, they must be provided through `**query_params`.

Raises:
KeyError: if query_params contains a non-existing parameter
IndexError: if query_params is missing a parameter
"""
op = api.OPERATIONS[op_id]
op_path = op.path.format(**query_params) # raises KeyError, IndexError

def is_applicable(interaction: Interaction) -> bool:
return interaction.query.request.method == op.verb and re.search(
op_path, interaction.query.request.url
)

return is_applicable
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from datetime import datetime
from monitoring.uss_qualifier.scenarios.astm.utm.data_exchange_validation.test_steps.expected_interactions_test_steps import (
mock_uss_interactions,
operation_filter,
direction_filter,
notif_op_intent_id_filter,
base_url_filter,
)
from monitoring.uss_qualifier.scenarios.astm.utm.data_exchange_validation.test_steps.wait import (
wait_in_intervals,
Expand Down Expand Up @@ -52,14 +56,13 @@ def expect_tested_uss_receives_notification_from_mock_uss(
) as check:
try:
interactions, query = wait_in_intervals(mock_uss_interactions)(
scenario=scenario,
mock_uss=mock_uss,
op_id=OperationID.NotifyOperationalIntentDetailsChanged,
direction=QueryDirection.Outgoing,
since=StringBasedDateTime(interactions_since_time),
is_applicable=_is_notification_sent_to_url_with_op_intent_id(
op_intent_ref_id, tested_uss_base_url
),
scenario,
mock_uss,
StringBasedDateTime(interactions_since_time),
operation_filter(OperationID.NotifyOperationalIntentDetailsChanged),
direction_filter(QueryDirection.Outgoing),
notif_op_intent_id_filter(op_intent_ref_id),
base_url_filter(tested_uss_base_url),
)
except ValueError as e:
check.record_failed(
Expand Down Expand Up @@ -112,42 +115,6 @@ def expect_tested_uss_receives_notification_from_mock_uss(
)


def _is_notification_sent_to_url_with_op_intent_id(
op_intent_id: str,
base_url: str,
) -> Callable[[Interaction], bool]:
"""
Returns an `is_applicable` function that detects if the request in an interaction is sent to the given url and with given op_intent_id

Args:
op_intent_id: The operational_intent id that needs to be notified
base_url: The url domain to which the notification request needs to be sent

"""

def is_applicable(interaction: Interaction) -> bool:
if "json" in interaction.query.request and interaction.query.request.json:
if (
interaction.query.query_type
== QueryType.F3548v21USSNotifyOperationalIntentDetailsChanged
):
try:
notification = ImplicitDict.parse(
interaction.query.request.json,
PutOperationalIntentDetailsParameters,
)
except (ValueError, TypeError, KeyError) as e:
raise ValueError(
f"Parsing mock_uss notification to type PutOperationalIntentDetailsParameters failed - {e}"
)
return (notification.operational_intent_id == op_intent_id) and (
base_url in interaction.query.request.url
)
return False

return is_applicable


def _check_notification_sent_with_subscription_id_and_response(
interactions: List[Interaction], subscription_id: str
) -> Tuple[bool, int]:
Expand Down
Loading