Skip to content

Commit

Permalink
AMB-0002- Unique permission check for action flag
Browse files Browse the repository at this point in the history
  • Loading branch information
ASubaran committed Nov 5, 2024
1 parent 65f8be2 commit e565e69
Show file tree
Hide file tree
Showing 9 changed files with 1,053 additions and 933 deletions.
72 changes: 36 additions & 36 deletions filenameprocessor/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,42 @@ class Constants:

VALID_VERSIONS = ["V5"]

EXPECTED_CSV_HEADERS = [
"NHS_NUMBER",
"PERSON_FORENAME",
"PERSON_SURNAME",
"PERSON_DOB",
"PERSON_GENDER_CODE",
"PERSON_POSTCODE",
"DATE_AND_TIME",
"SITE_CODE",
"SITE_CODE_TYPE_URI",
"UNIQUE_ID",
"UNIQUE_ID_URI",
"ACTION_FLAG",
"PERFORMING_PROFESSIONAL_FORENAME",
"PERFORMING_PROFESSIONAL_SURNAME",
"RECORDED_DATE",
"PRIMARY_SOURCE",
"VACCINATION_PROCEDURE_CODE",
"VACCINATION_PROCEDURE_TERM",
"DOSE_SEQUENCE",
"VACCINE_PRODUCT_CODE",
"VACCINE_PRODUCT_TERM",
"VACCINE_MANUFACTURER",
"BATCH_NUMBER",
"EXPIRY_DATE",
"SITE_OF_VACCINATION_CODE",
"SITE_OF_VACCINATION_TERM",
"ROUTE_OF_VACCINATION_CODE",
"ROUTE_OF_VACCINATION_TERM",
"DOSE_AMOUNT",
"DOSE_UNIT_CODE",
"DOSE_UNIT_TERM",
"INDICATION_CODE",
"LOCATION_CODE",
"LOCATION_CODE_TYPE_URI",
]
# EXPECTED_CSV_HEADERS = [
# "NHS_NUMBER",
# "PERSON_FORENAME",
# "PERSON_SURNAME",
# "PERSON_DOB",
# "PERSON_GENDER_CODE",
# "PERSON_POSTCODE",
# "DATE_AND_TIME",
# "SITE_CODE",
# "SITE_CODE_TYPE_URI",
# "UNIQUE_ID",
# "UNIQUE_ID_URI",
# "ACTION_FLAG",
# "PERFORMING_PROFESSIONAL_FORENAME",
# "PERFORMING_PROFESSIONAL_SURNAME",
# "RECORDED_DATE",
# "PRIMARY_SOURCE",
# "VACCINATION_PROCEDURE_CODE",
# "VACCINATION_PROCEDURE_TERM",
# "DOSE_SEQUENCE",
# "VACCINE_PRODUCT_CODE",
# "VACCINE_PRODUCT_TERM",
# "VACCINE_MANUFACTURER",
# "BATCH_NUMBER",
# "EXPIRY_DATE",
# "SITE_OF_VACCINATION_CODE",
# "SITE_OF_VACCINATION_TERM",
# "ROUTE_OF_VACCINATION_CODE",
# "ROUTE_OF_VACCINATION_TERM",
# "DOSE_AMOUNT",
# "DOSE_UNIT_CODE",
# "DOSE_UNIT_TERM",
# "INDICATION_CODE",
# "LOCATION_CODE",
# "LOCATION_CODE_TYPE_URI",
# ]

# Mappings from ODS code to supplier name.
# NOTE: Any ODS code not found in this dictionary's keys is invalid for this service
Expand Down
96 changes: 49 additions & 47 deletions filenameprocessor/src/initial_file_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from datetime import datetime
from constants import Constants
from fetch_permissions import get_permissions_config_json_from_cache
from utils_for_filenameprocessor import extract_file_key_elements, get_csv_content_dict_reader
from utils_for_filenameprocessor import extract_file_key_elements
# get_csv_content_dict_reader

logger = logging.getLogger()

Expand All @@ -29,9 +30,9 @@ def is_valid_datetime(timestamp: str) -> bool:
return True


def validate_content_headers(csv_content_reader):
"""Returns a bool to indicate whether the given CSV headers match the 34 expected headers exactly"""
return csv_content_reader.fieldnames == Constants.EXPECTED_CSV_HEADERS
# def validate_content_headers(csv_content_reader):
# """Returns a bool to indicate whether the given CSV headers match the 34 expected headers exactly"""
# return csv_content_reader.fieldnames == Constants.EXPECTED_CSV_HEADERS


def get_supplier_permissions(supplier: str) -> list:
Expand All @@ -48,37 +49,37 @@ def validate_vaccine_type_permissions(supplier: str, vaccine_type: str):
return vaccine_type in " ".join(allowed_permissions)


def validate_action_flag_permissions(csv_content_dict_reader, supplier: str, vaccine_type: str) -> bool:
"""
Returns True if the supplier has permission to perform ANY of the requested actions for the given vaccine type,
else False.
"""
# Obtain the allowed permissions for the supplier
allowed_permissions_set = set(get_supplier_permissions(supplier))

# If the supplier has full permissions for the vaccine type return True
if f"{vaccine_type}_FULL" in allowed_permissions_set:
logger.info("%s has FULL permissions to create, update and delete", supplier)
return True

# Extract a list of all unique operation permissions requested in the csv file
operations_requested = set()
for row in csv_content_dict_reader:
action_flag = row.get("ACTION_FLAG", "").upper()
operations_requested.add("CREATE" if action_flag == "NEW" else action_flag)

# Check if any of the CSV permissions match the allowed permissions
operation_requests_set = {f"{vaccine_type}_{operation}" for operation in operations_requested}
if operation_requests_set.intersection(allowed_permissions_set):
logger.info(
"%s permissions %s matches one of the requested permissions required to %s",
supplier,
allowed_permissions_set,
operation_requests_set,
)
return True

return False
# def validate_action_flag_permissions(csv_content_dict_reader, supplier: str, vaccine_type: str) -> bool:
# """
# Returns True if the supplier has permission to perform ANY of the requested actions for the given vaccine type,
# else False.
# """
# # Obtain the allowed permissions for the supplier
# allowed_permissions_set = set(get_supplier_permissions(supplier))

# # If the supplier has full permissions for the vaccine type return True
# if f"{vaccine_type}_FULL" in allowed_permissions_set:
# logger.info("%s has FULL permissions to create, update and delete", supplier)
# return True

# # Extract a list of all unique operation permissions requested in the csv file
# operations_requested = set()
# for row in csv_content_dict_reader:
# action_flag = row.get("ACTION_FLAG", "").upper()
# operations_requested.add("CREATE" if action_flag == "NEW" else action_flag)

# # Check if any of the CSV permissions match the allowed permissions
# operation_requests_set = {f"{vaccine_type}_{operation}" for operation in operations_requested}
# if operation_requests_set.intersection(allowed_permissions_set):
# logger.info(
# "%s permissions %s matches one of the requested permissions required to %s",
# supplier,
# allowed_permissions_set,
# operation_requests_set,
# )
# return True

# return False


def initial_file_validation(file_key: str, bucket_name: str):
Expand Down Expand Up @@ -108,24 +109,25 @@ def initial_file_validation(file_key: str, bucket_name: str):
logger.error("Initial file validation failed: invalid file key")
return False

# Obtain the file content
csv_content_dict_reader = get_csv_content_dict_reader(bucket_name=bucket_name, file_key=file_key)
# # Obtain the file content
# csv_content_dict_reader = get_csv_content_dict_reader(bucket_name=bucket_name, file_key=file_key)

# Validate the content headers
if not validate_content_headers(csv_content_dict_reader):
logger.error("Initial file validation failed: incorrect column headers")
return False
# # Validate the content headers
# if not validate_content_headers(csv_content_dict_reader):
# logger.error("Initial file validation failed: incorrect column headers")
# return False

# Validate has permissions for the vaccine type
if not validate_vaccine_type_permissions(supplier, vaccine_type):
logger.error("Initial file validation failed: %s does not have permissions for %s", supplier, vaccine_type)
return False

# Validate has permission to perform at least one of the requested actions
if not validate_action_flag_permissions(csv_content_dict_reader, supplier, vaccine_type):
logger.info(
"Initial file validation failed: %s does not have permissions for any csv ACTION_FLAG operations", supplier
)
return False
# # Validate has permission to perform at least one of the requested actions
# if not validate_action_flag_permissions(csv_content_dict_reader, supplier, vaccine_type):
# logger.info(
# "Initial file validation failed: %s does not have permissions for any csv ACTION_FLAG operations",
# supplier
# )
# return False

return True, get_permissions_config_json_from_cache().get("all_permissions", {}).get(supplier, [])
Loading

0 comments on commit e565e69

Please sign in to comment.