-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[feature/PI-346-changelog_modify] modify transform and load
- Loading branch information
Showing
11 changed files
with
561 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from uuid import UUID | ||
|
||
DEFAULT_ORGANISATION = "CDEF" | ||
|
||
DEFAULT_PRODUCT_TEAM = { | ||
"id": UUID(int=0x12345678123456781234567812345678), | ||
"name": "ROOT", | ||
} | ||
|
||
EXCEPTIONAL_ODS_CODES = { | ||
"696B001", | ||
"TESTEBS1", | ||
"TESTLSP0", | ||
"TESTLSP1", | ||
"TESTLSP3", | ||
"TMSAsync1", | ||
"TMSAsync2", | ||
"TMSAsync3", | ||
"TMSAsync4", | ||
"TMSAsync5", | ||
"TMSAsync6", | ||
"TMSEbs2", | ||
} | ||
|
||
BAD_UNIQUE_IDENTIFIERS = { | ||
"31af51067f47f1244d38", # pragma: allowlist secret | ||
"a83e1431f26461894465", # pragma: allowlist secret | ||
"S2202584A2577603", | ||
"S100049A300185", | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from domain.core.device import Device | ||
from pydantic import ValidationError | ||
from sds.domain.constants import ModificationType | ||
from sds.domain.nhs_accredited_system import NhsAccreditedSystem | ||
from sds.domain.nhs_mhs import NhsMhs | ||
|
||
from .utils import InvalidModificationRequest, new_questionnaire_response_from_template | ||
|
||
|
||
class MandatoryFieldError(Exception): | ||
def __init__(self, field): | ||
super().__init__(f"Field '{field}' cannot be null") | ||
|
||
|
||
class NoValuesToRemove(Exception): | ||
pass | ||
|
||
|
||
def _unique_list(*items): | ||
return list(set(items)) | ||
|
||
|
||
def _parse_and_validate_field( | ||
model: type[NhsAccreditedSystem] | type[NhsMhs], field: str, value | ||
) -> list: | ||
try: | ||
parsed_value = model.parse_and_validate_field(field=field, value=value) | ||
except ValidationError: | ||
raise InvalidModificationRequest(field) | ||
|
||
if isinstance(parsed_value, (set, list)): | ||
return list(parsed_value) | ||
else: | ||
return [parsed_value] | ||
|
||
|
||
def update_device_metadata( | ||
device: Device, | ||
model: type[NhsAccreditedSystem] | type[NhsMhs], | ||
modification_type: ModificationType, | ||
field_alias: str, | ||
new_values: list, | ||
) -> Device: | ||
field_name = model.get_field_name_for_alias(alias=field_alias) | ||
((questionnaire_response,),) = device.questionnaire_responses.values() | ||
_current_values = questionnaire_response.get_response(question_name=field_name) | ||
|
||
if modification_type == ModificationType.ADD: | ||
_updated_values = _unique_list(*_current_values, *new_values) | ||
parsed_values = _parse_and_validate_field( | ||
model=model, field=field_name, value=_updated_values | ||
) | ||
elif modification_type == ModificationType.REPLACE: | ||
parsed_values = _parse_and_validate_field( | ||
model=model, field=field_name, value=new_values | ||
) | ||
elif modification_type == ModificationType.DELETE: | ||
if model.is_mandatory_field(field_name): | ||
raise MandatoryFieldError(field_name) | ||
if not _current_values: | ||
raise InvalidModificationRequest( | ||
field_name, "This device has no such data to delete" | ||
) | ||
parsed_values = [] | ||
|
||
new_questionnaire_response = new_questionnaire_response_from_template( | ||
questionnaire_response=questionnaire_response, | ||
field_to_update=field_name, | ||
new_values=parsed_values, | ||
) | ||
device.update_questionnaire_response( | ||
questionnaire_response_index=0, | ||
questionnaire_response=new_questionnaire_response, | ||
) | ||
return device |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
from typing import Callable, Generator | ||
|
||
from domain.core.device import Device | ||
from domain.core.device_key import DeviceKeyType | ||
from domain.core.product_team import ProductTeam | ||
from domain.core.validation import DEVICE_KEY_SEPARATOR | ||
from sds.cpm_translation.utils import get_in_list_of_dict | ||
from sds.domain.constants import ModificationType | ||
from sds.domain.nhs_accredited_system import NhsAccreditedSystem | ||
from sds.domain.nhs_mhs import NhsMhs | ||
|
||
from ..constants import DEFAULT_PRODUCT_TEAM | ||
from .utils import InvalidModificationRequest, new_questionnaire_response_from_template | ||
|
||
|
||
class NotAnSdsKey(Exception): | ||
pass | ||
|
||
|
||
class AccreditedSystemAlreadyExists(Exception): | ||
def __init__(self, ods_code): | ||
super().__init__(f"Accredited System with ODS code '{ods_code}' already exists") | ||
|
||
|
||
MHS_KEY_FIELDS = ["nhs_id_code", "nhs_mhs_party_key", "nhs_mhs_svc_ia"] | ||
|
||
|
||
def get_modify_key_function( | ||
model: type[NhsMhs] | type[NhsAccreditedSystem], | ||
modification_type: ModificationType, | ||
field_name: str, | ||
) -> Callable[[list[Device], str, any], Generator[Device, None, None]]: | ||
match (model, modification_type, field_name): | ||
case ( | ||
NhsAccreditedSystem, | ||
ModificationType.ADD, | ||
"nhs_as_client", | ||
): | ||
return new_accredited_system | ||
case ( | ||
NhsAccreditedSystem, | ||
ModificationType.REPLACE, | ||
"nhs_as_client", | ||
): | ||
return replace_accredited_systems | ||
case ( | ||
NhsAccreditedSystem, | ||
ModificationType.DELETE, | ||
"nhs_as_client", | ||
): | ||
raise InvalidModificationRequest(field_name) | ||
case ( | ||
NhsMhs, | ||
ModificationType.ADD, | ||
"nhs_mhs_party_key" | "nhs_mhs_svc_ia" | "nhs_id_code", | ||
): | ||
raise InvalidModificationRequest(field_name) | ||
case ( | ||
NhsMhs, | ||
ModificationType.REPLACE, | ||
"nhs_mhs_party_key" | "nhs_mhs_svc_ia" | "nhs_id_code", | ||
): | ||
return replace_msg_handling_system | ||
case ( | ||
NhsMhs, | ||
ModificationType.DELETE, | ||
"nhs_mhs_party_key" | "nhs_mhs_svc_ia" | "nhs_id_code", | ||
): | ||
raise InvalidModificationRequest(field_name) | ||
case _: | ||
raise NotAnSdsKey | ||
|
||
|
||
def new_accredited_system( | ||
devices: list[Device], field_name: str, value: str | ||
) -> Generator[Device, None, None]: | ||
(ods_code,) = NhsAccreditedSystem.parse_and_validate_field( | ||
field=field_name, value=value | ||
) | ||
|
||
current_ods_codes = {device.ods_code for device in devices} | ||
if ods_code in current_ods_codes: | ||
raise AccreditedSystemAlreadyExists(ods_code) | ||
|
||
device = devices[0] | ||
( | ||
(questionnaire_id, (questionnaire_response,)), | ||
) = device.questionnaire_responses.items() | ||
new_questionnaire_response = new_questionnaire_response_from_template( | ||
questionnaire_response=questionnaire_response, | ||
field_to_update=field_name, | ||
new_values=[ods_code], | ||
) | ||
unique_identifier = device.indexes[(questionnaire_id, "unique_identifier")] | ||
new_accredited_system_id = DEVICE_KEY_SEPARATOR.join((ods_code, unique_identifier)) | ||
|
||
product_team = ProductTeam( | ||
id=device.product_team_id, ods_code=ods_code, name=DEFAULT_PRODUCT_TEAM["name"] | ||
) | ||
new_device = product_team.create_device(name=device.name, type=device.type) | ||
new_device.add_questionnaire_response( | ||
questionnaire_response=new_questionnaire_response | ||
) | ||
new_device.add_key( | ||
type=DeviceKeyType.ACCREDITED_SYSTEM_ID, key=new_accredited_system_id | ||
) | ||
new_device.add_index( | ||
questionnaire_id=questionnaire_id, question_name="unique_identifier" | ||
) | ||
yield new_device | ||
|
||
|
||
def replace_accredited_systems( | ||
devices: list[Device], field_name: str, value: str | ||
) -> Generator[Device, None, None]: | ||
current_ods_codes = {device.ods_code for device in devices} | ||
final_ods_codes = NhsAccreditedSystem.parse_and_validate_field( | ||
field=field_name, value=value | ||
) | ||
removed_ods_codes = current_ods_codes - final_ods_codes | ||
for device in devices: | ||
if device.ods_code in removed_ods_codes: | ||
device.delete() | ||
yield device | ||
|
||
for new_ods_code in final_ods_codes - current_ods_codes: | ||
yield from new_accredited_system( | ||
devices=devices, field_name=field_name, value=[new_ods_code] | ||
) | ||
|
||
|
||
def _get_msg_handling_system_scoped_key_parts( | ||
responses: list[dict], | ||
) -> Generator[str, None, None]: | ||
for key_field in MHS_KEY_FIELDS: | ||
(_value,) = get_in_list_of_dict(obj=responses, key=key_field) | ||
yield _value.strip() | ||
|
||
|
||
def replace_msg_handling_system( | ||
devices: list[Device], field_name: str, value: str | ||
) -> Generator[Device, None, None]: | ||
(device,) = devices | ||
device.delete() | ||
yield device | ||
|
||
( | ||
(questionnaire_id, (_questionnaire_response,)), | ||
) = device.questionnaire_responses.items() | ||
new_value = NhsMhs.parse_and_validate_field(field=field_name, value=value) | ||
new_questionnaire_response = new_questionnaire_response_from_template( | ||
questionnaire_response=_questionnaire_response, | ||
field_to_update=field_name, | ||
new_values=[new_value], | ||
) | ||
key_parts = _get_msg_handling_system_scoped_key_parts( | ||
responses=_questionnaire_response.responses | ||
) | ||
new_scoped_party_key = DEVICE_KEY_SEPARATOR.join(key_parts) | ||
(ods_code,) = get_in_list_of_dict( | ||
obj=_questionnaire_response.responses, key="nhs_id_code" | ||
) | ||
|
||
product_team = ProductTeam( | ||
id=device.product_team_id, ods_code=ods_code, name=DEFAULT_PRODUCT_TEAM["name"] | ||
) | ||
new_device = product_team.create_device(name=device.name, type=device.type) | ||
new_device.add_questionnaire_response( | ||
questionnaire_response=new_questionnaire_response | ||
) | ||
new_device.add_key( | ||
type=DeviceKeyType.MESSAGE_HANDLING_SYSTEM_ID, key=new_scoped_party_key | ||
) | ||
new_device.add_index( | ||
questionnaire_id=questionnaire_id, question_name="unique_identifier" | ||
) | ||
yield new_device |
Oops, something went wrong.