Skip to content

Commit

Permalink
libcovebods: Code formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Ed (ODSC) committed Dec 12, 2024
1 parent d034028 commit 2c42298
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 64 deletions.
14 changes: 9 additions & 5 deletions libcovebods/data_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import ijson # type: ignore

STATEMENT_MAPPING = {"entity": "entityStatement",
"person": "personStatement",
"relationship": "ownershipOrControlStatement"}
STATEMENT_MAPPING = {
"entity": "entityStatement",
"person": "personStatement",
"relationship": "ownershipOrControlStatement",
}


def get_statement_type(statement):
if isinstance(statement, dict):
Expand All @@ -15,6 +18,7 @@ def get_statement_type(statement):
return STATEMENT_MAPPING[statement.get("recordType")]
return "unknown"


class DataReader:
"""Class to hold information on where to get data and provides methods to access it.
Expand Down Expand Up @@ -53,12 +57,12 @@ def get_all_data(self):
with open(self._filename, "rb") as fp:
for statement in ijson.items(fp, "item"):
statementType = get_statement_type(statement)
#statementType = (
# statementType = (
# statement.get("statementType")
# if isinstance(statement, dict)
# and isinstance(statement.get("statementType"), str)
# else "unknown"
#)
# )
if statementType in count_statement_types:
if (
count_statement_types[statementType]
Expand Down
82 changes: 63 additions & 19 deletions libcovebods/tasks/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,9 @@ def check_entity_statement_first_pass(self, statement):
{
"type": "statement_entity_securities_listings_haspubliclisting_is_false",
"statement_type": None,
"securities_listings": statement["recordDetails"]["publicListing"]["securitiesListings"],
"securities_listings": statement["recordDetails"][
"publicListing"
]["securitiesListings"],
"statement": statement.get("statementId"),
}
)
Expand Down Expand Up @@ -1352,18 +1354,32 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

def check_statement_first_pass(self, statement):
if ("recordId" in statement and "statementDate" in statement and
"statementDate" in statement and statement["statementDate"]):
if (
"recordId" in statement
and "statementDate" in statement
and "statementDate" in statement
and statement["statementDate"]
):
record_status = statement.get("recordStatus")
record_type = statement.get("recordType")
statement_id = statement.get("statementId")
if statement["recordId"] in self._series:
self._series[statement["recordId"]].append(
[statement["statementDate"], record_status, record_type, statement_id]
[
statement["statementDate"],
record_status,
record_type,
statement_id,
]
)
else:
self._series[statement["recordId"]] = [
[statement["statementDate"], record_status, record_type, statement_id]
[
statement["statementDate"],
record_status,
record_type,
statement_id,
]
]

def final_checks(self):
Expand Down Expand Up @@ -1510,7 +1526,9 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "interested_party_must_be_record_id",
"statement_type": None,
"statement": statement.get("statementId"),
"interested_party": statement["recordDetails"]["interestedParty"],
"interested_party": statement["recordDetails"][
"interestedParty"
],
}
)
elif self._records[
Expand All @@ -1521,7 +1539,9 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "interested_party_can_only_refer_to_entity_or_person",
"statement_type": None,
"statement": statement.get("statementId"),
"interested_party": statement["recordDetails"]["interestedParty"],
"interested_party": statement["recordDetails"][
"interestedParty"
],
}
)
else:
Expand All @@ -1544,7 +1564,9 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "interest_beneficial_ownership_interested_party_not_person",
"statement_type": None,
"statement": statement.get("statementId"),
"interested_party": statement["recordDetails"]["interestedParty"],
"interested_party": statement[
"recordDetails"
]["interestedParty"],
}
)

Expand Down Expand Up @@ -1626,8 +1648,12 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "relationship_interests_subject_should_be_entity_nomination_arrangement",
"statement_type": None,
"statement": statement.get("statementId"),
"subject_record_type": self._records[statement["recordDetails"]["subject"]][0],
"subject_record_subtype": self._records[statement["recordDetails"]["subject"]][1],
"subject_record_type": self._records[
statement["recordDetails"]["subject"]
][0],
"subject_record_subtype": self._records[
statement["recordDetails"]["subject"]
][1],
}
)
else:
Expand All @@ -1647,8 +1673,12 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "relationship_interests_subject_should_be_entity_nomination_arrangement",
"statement_type": None,
"statement": statement.get("statementId"),
"subject_record_type": self._records[statement["recordDetails"]["subject"]][0],
"subject_record_subtype": self._records[statement["recordDetails"]["subject"]][1],
"subject_record_type": self._records[
statement["recordDetails"]["subject"]
][0],
"subject_record_subtype": self._records[
statement["recordDetails"]["subject"]
][1],
}
)
elif "type" in interest and interest["type"] in (
Expand All @@ -1670,8 +1700,12 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "relationship_interests_subject_should_be_entity_trust",
"statement_type": None,
"statement": statement.get("statementId"),
"subject_record_type": self._records[statement["recordDetails"]["subject"]][0],
"subject_record_subtype": self._records[statement["recordDetails"]["subject"]][1],
"subject_record_type": self._records[
statement["recordDetails"]["subject"]
][0],
"subject_record_subtype": self._records[
statement["recordDetails"]["subject"]
][1],
}
)
else:
Expand All @@ -1689,8 +1723,12 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "relationship_interests_subject_should_be_entity_trust",
"statement_type": None,
"statement": statement.get("statementId"),
"subject_record_type": self._records[statement["recordDetails"]["subject"]][0],
"subject_record_subtype": self._records[statement["recordDetails"]["subject"]][1],
"subject_record_type": self._records[
statement["recordDetails"]["subject"]
][0],
"subject_record_subtype": self._records[
statement["recordDetails"]["subject"]
][1],
}
)

Expand Down Expand Up @@ -1752,7 +1790,9 @@ def check_ownership_or_control_statement_second_pass(self, statement):
"type": "relationship_interested_party_not_before_relationship_in_dataset",
"statement_type": None,
"statement": statement.get("statementId"),
"interested_party_id": statement["recordDetails"]["interestedParty"],
"interested_party_id": statement["recordDetails"][
"interestedParty"
],
}
)

Expand Down Expand Up @@ -1787,7 +1827,9 @@ def check_person_statement_first_pass(self, statement):
"type": "person_identifiers_invalid_composition",
"statement_type": None,
"statement": statement.get("statementId"),
"scheme": identifier["scheme"] if "scheme" in identifier else None,
"scheme": identifier["scheme"]
if "scheme" in identifier
else None,
}
)
else:
Expand Down Expand Up @@ -1871,6 +1913,8 @@ def check_entity_statement_first_pass(self, statement):
"type": "entity_identifiers_not_known_scheme",
"statement_type": None,
"statement": statement.get("statementId"),
"scheme": identifier["scheme"] if "scheme" in identifier else None,
"scheme": identifier["scheme"]
if "scheme" in identifier
else None,
}
)
37 changes: 22 additions & 15 deletions libcovebods/tasks/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,17 @@ def check_ownership_or_control_statement_second_pass(self, statement):
interested_party = statement["recordDetails"].get("interestedParty")
if interested_party:
if interested_party in self.entity_record_ids:
self.count_ownership_or_control_statement_interested_party_with_entity += 1
self.count_ownership_or_control_statement_interested_party_with_entity += (
1
)
if interested_party in self.person_record_ids:
self.count_ownership_or_control_statement_interested_party_with_person += 1
self.count_ownership_or_control_statement_interested_party_with_person += (
1
)
if isinstance(interested_party, dict):
self.count_ownership_or_control_statement_interested_party_with_unspecified += 1
self.count_ownership_or_control_statement_interested_party_with_unspecified += (
1
)

def get_statistics(self):
data = {
Expand All @@ -417,6 +423,7 @@ def get_statistics(self):
class StatisticsCurrentOwnershipOrControlStatementsAndReplacesStatementsMissing(
AdditionalCheck
):
@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_less_than("0.3")

Expand Down Expand Up @@ -455,35 +462,35 @@ def get_statistics(self):
}
return data

class StatisticsStatementsRecordStatus(
AdditionalCheck
):

class StatisticsStatementsRecordStatus(AdditionalCheck):
@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

def __init__(self, lib_cove_bods_config, schema_object):
super().__init__(lib_cove_bods_config, schema_object)
#self.count_replaces_statements_missing = 0
#self.statement_ids = set()
#self.current_statement_ids = set()
self.records = {}
self.missing_new_records = {}
self.current_records_count = 0
self.missing_new_records_count = 0

def check_statement_first_pass(self, statement):
if (isinstance(statement.get("recordStatus"), str) and
isinstance(statement.get("recordId"), str) and
statement.get("recordStatus") in ('new', 'updated', 'closed')):
if (
isinstance(statement.get("recordStatus"), str)
and isinstance(statement.get("recordId"), str)
and statement.get("recordStatus") in ("new", "updated", "closed")
):
if not statement.get("recordId") in self.records:
if not statement.get("recordStatus") == 'new':
self.missing_new_records[statement.get("recordId")] = statement.get("statementId")
if not statement.get("recordStatus") == "new":
self.missing_new_records[statement.get("recordId")] = statement.get(
"statementId"
)
self.records[statement.get("recordId")] = statement.get("recordStatus")

def final_checks(self):
for record_id in self.records:
if not self.records[record_id] == 'closed':
if not self.records[record_id] == "closed":
self.current_records_count += 1
for record_id in self.missing_new_records:
self.missing_new_records_count += 1
Expand Down
4 changes: 3 additions & 1 deletion tests/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def bods_json_output(
sample_mode=False,
):
# Data Reader
data_reader = libcovebods.data_reader.DataReader(input_file_name, sample_mode=sample_mode)
data_reader = libcovebods.data_reader.DataReader(
input_file_name, sample_mode=sample_mode
)

# classes
if not lib_cove_bods_config:
Expand Down
20 changes: 11 additions & 9 deletions tests/test_additional_fields_0_4_0.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import os
import tempfile

import libcovebods.data_reader
from libcovebods.additionalfields import AdditionalFields
from libcovebods.config import LibCoveBODSConfig
from libcovebods.schema import SchemaBODS
from tests.api import bods_json_output


Expand All @@ -14,7 +10,10 @@ def test_additional_fields_1():
prefix="lib-cove-bods-tests-", dir=tempfile.gettempdir()
)
json_filename = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "fixtures", "0.4", "additional_fields_1.json"
os.path.dirname(os.path.realpath(__file__)),
"fixtures",
"0.4",
"additional_fields_1.json",
)

results = bods_json_output(cove_temp_folder, json_filename)
Expand All @@ -31,14 +30,17 @@ def test_additional_fields_2():
prefix="lib-cove-bods-tests-", dir=tempfile.gettempdir()
)
json_filename = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "fixtures", "0.4", "additional_fields_2.json"
os.path.dirname(os.path.realpath(__file__)),
"fixtures",
"0.4",
"additional_fields_2.json",
)

results = bods_json_output(cove_temp_folder, json_filename)

assert results["schema_version"] == "0.4"
assert results["validation_errors_count"] == 0
assert results["additional_fields_count"] == 0
assert results["additional_checks_count"] == 2 # Some checks fail because isolated statement


assert (
results["additional_checks_count"] == 2
) # Some checks fail because isolated statement
Loading

0 comments on commit 2c42298

Please sign in to comment.