Skip to content

Commit

Permalink
checks.py: Add AdditionalChecks
Browse files Browse the repository at this point in the history
  • Loading branch information
Ed (ODSC) committed Sep 13, 2024
1 parent abb4344 commit ebf380d
Showing 1 changed file with 84 additions and 4 deletions.
88 changes: 84 additions & 4 deletions libcovebods/tasks/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,8 @@ def _add_statement_ids_to_statement_ids_counted(self, statement_ids):
class CheckHasPublicListing(AdditionalCheck):
@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.3")
return (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and
schema_object.is_schema_version_less_than("0.4"))

@staticmethod
def get_additional_check_types_possible(
Expand All @@ -619,10 +620,36 @@ def check_entity_statement_first_pass(self, statement):
)


class CheckHasPublicListingRecord(AdditionalCheck):
@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

@staticmethod
def get_additional_check_types_possible(
lib_cove_bods_config, schema_object
) -> list:
return ["has_public_listing_information_but_has_public_listing_is_false"]

def check_entity_statement_first_pass(self, statement):
record = statement.get("recordDetails")
if isinstance(record, dict) and isinstance(record.get("publicListing"), dict):
pl = record.get("publicListing")
if pl.get("companyFilingsURLs") or pl.get("securitiesListings"):
if not pl.get("hasPublicListing"):
self._additional_check_results.append(
{
"type": "has_public_listing_information_but_has_public_listing_is_false",
"statement_type": "entity",
"statement": statement.get("statementId"),
}
)

class CheckEntityTypeAndEntitySubtypeAlign(AdditionalCheck):
@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.3")
return (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and
schema_object.is_schema_version_less_than("0.4"))

@staticmethod
def get_additional_check_types_possible(
Expand Down Expand Up @@ -652,7 +679,8 @@ def __init__(self, lib_cove_bods_config, schema_object):

@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.3")
return (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and
schema_object.is_schema_version_less_than("0.4"))

@staticmethod
def get_additional_check_types_possible(
Expand All @@ -663,7 +691,8 @@ def get_additional_check_types_possible(
"entity_security_listing_market_identifier_code_set_but_not_operating_market_identifier_code",
"entity_security_listing_operating_market_identifier_code_set_but_not_market_identifier_code",
]
if schema_object.is_schema_version_equal_to_or_greater_than("0.3")
if (schema_object.is_schema_version_equal_to_or_greater_than("0.3") and
schema_object.is_schema_version_less_than("0.4"))
else []
)

Expand Down Expand Up @@ -695,3 +724,54 @@ def check_entity_statement_first_pass(self, statement):
"statement": statement.get("statementID"),
}
)

class CheckEntitySecurityListingsMICSCodesRecord(AdditionalCheck):
def __init__(self, lib_cove_bods_config, schema_object):
super().__init__(lib_cove_bods_config, schema_object)

@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

@staticmethod
def get_additional_check_types_possible(
lib_cove_bods_config, schema_object
) -> list:
return (
[
"entity_security_listing_market_identifier_code_set_but_not_operating_market_identifier_code",
"entity_security_listing_operating_market_identifier_code_set_but_not_market_identifier_code",
]
if schema_object.is_schema_version_equal_to_or_greater_than("0.3")
else []
)

def check_entity_statement_first_pass(self, statement):
record = statement.get("recordDetails")
if isinstance(record, dict) and isinstance(record.get("publicListing"), dict) and isinstance(
record["publicListing"].get("securitiesListings"), list
):
for securitiesListing in record["publicListing"].get(
"securitiesListings"
):
if isinstance(securitiesListing, dict):
marketIdentifierCode = securitiesListing.get("marketIdentifierCode")
operatingMarketIdentifierCode = securitiesListing.get(
"operatingMarketIdentifierCode"
)
if marketIdentifierCode and not operatingMarketIdentifierCode:
self._additional_check_results.append(
{
"type": "entity_security_listing_market_identifier_code_set_but_not_operating_market_identifier_code",
"statement_type": "entity",
"statement": statement.get("statementId"),
}
)
elif operatingMarketIdentifierCode and not marketIdentifierCode:
self._additional_check_results.append(
{
"type": "entity_security_listing_operating_market_identifier_code_set_but_not_market_identifier_code",
"statement_type": "entity",
"statement": statement.get("statementId"),
}
)

0 comments on commit ebf380d

Please sign in to comment.