diff --git a/dataimporter/__init__.py b/dataimporter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataimporter/emu/__init__.py b/dataimporter/emu/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataimporter/emu/dumps.py b/dataimporter/emu/dumps.py new file mode 100644 index 0000000..893f125 --- /dev/null +++ b/dataimporter/emu/dumps.py @@ -0,0 +1,263 @@ +import gzip +import itertools +import re +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from functools import cached_property, total_ordering +from pathlib import Path +from typing import Dict, Union, Tuple, Iterable, Any +from typing import List + +from splitgill.utils import to_timestamp, parse_to_timestamp + +EMU_ID_FIELD = "irn" +# this is arbitrary-ish but it's the time of the first good full dumps we have +FIRST_VERSION = to_timestamp(datetime(2017, 8, 30)) + + +@dataclass +class EMuRecord: + """ + A class representing an EMu record. + """ + + id: int + version: int + data: Dict[str, Union[str, Tuple[str]]] + + @property + def is_deleted(self) -> bool: + """ + Returns True if the record's data represents a deletion, False if not. + + :return: True if the record's data represents a deletion, False if not. + """ + return not bool(self.data) + + def __contains__(self, field: str) -> bool: + """ + Checks if the given field is present in this record's data. + + :param field: the field name + :return: True if the field exists, False if not + """ + return field in self.data + + +@total_ordering +class EMuTable(Enum): + """ + Enumeration of the EMu tables we currently handle. + + The value of the enum indicates the order they should be ingested in with EAudit + first and then the others after. + """ + + eaudit = 0 + ecatalogue = 1 + emultimedia = 2 + etaxonomy = 3 + + def __lt__(self, other): + # implemented for the total_ordering annotation on the class and allow the + # values to be used to prioritise the ingest of the tables + if isinstance(other, EMuTable): + return self.value < other.value + return NotImplemented + + @property + def is_stored(self) -> bool: + """ + Whether the table's data should be stored or not. + + Currently, only EAudit is ignored as it is actually providing information about + the other tables (like deletes). + """ + return self != EMuTable.eaudit + + +def find_emu_dumps(root: Path, after: int = FIRST_VERSION) -> List["EMuDump"]: + """ + Find all the EMu dumps in the given path and return them as a list of EMuDump + objects. The list returned will be sorted in the order that the dumps should be + processed. + + :param root: the root directory + :param after: only dumps on or after this version will be returned, defaults to the + first full EMu dump from 30/08/17 (see FIRST_VERSION at the module + level) + :return: a sorted list of EMuDump objects + """ + dumps = [] + dump_matcher = re.compile( + r"(?P\w+)\.(?:deleted-)?export\.(?P[0-9]{8})\.gz" + ) + + for path in root.iterdir(): + match = dump_matcher.match(path.name) + if match: + table_name, date = match.groups() + try: + table = EMuTable[table_name] + except KeyError as e: + # ignore as we don't deal with this table + continue + + if table is EMuTable.eaudit: + dump = EMuAuditDump(path, table, date) + else: + dump = EMuDump(path, table, date) + + if dump.version >= after: + dumps.append(dump) + + return sorted(dumps) + + +class EMuDump: + """ + Class representing an EMu export (or "dump") texexport file. + + Each file represents data from a single table. + """ + + def __init__(self, path: Path, table: EMuTable, date: str): + """ + :param path: the full path to the dump file + :param table: the table the dump file is from + :param date: the date string of the export + """ + self.path = path + self.table = table + self.date = date + # convert the date into a version timestamp + self.version = parse_to_timestamp(date, "%Y%m%d", tzinfo=timezone.utc) + + @property + def is_audit(self): + """ + Is this an audit dump? + + :return: True if it is, False if not. + """ + return self.table == EMuTable.eaudit + + def __str__(self) -> str: + return f"Dump {self.table}@{self.version}/{self.date} [{self.path}]" + + def __eq__(self, other: Any): + if isinstance(other, EMuDump): + return self.version == other.version and self.table == other.table + return NotImplemented + + def __lt__(self, other: Any): + if isinstance(other, EMuDump): + # order by version, then table. The main goal here is to ensure the versions + # are ordered correctly and the audit dumps are ordered before normal tables + # as we need to do deletes first + return (self.version, self.table) < (other.version, other.table) + + return NotImplemented + + @property + def size(self) -> int: + """ + Returns the size of the dump in bytes. This is the size of the compressed dump + file. + + :return: the file size of the dump + """ + return self.path.stat().st_size + + @cached_property + def count(self) -> int: + """ + Returns the number of records in the dump by counting the number of IRNs we + find. This requires reading the whole dump. + + :return: the number of records in the dump + """ + irn_field_prefix = f"{EMU_ID_FIELD}:1=" + with gzip.open(self.path, "rt", encoding="utf-8") as f: + return sum(1 for line in f if line.lstrip().startswith(irn_field_prefix)) + + def __iter__(self) -> Iterable[EMuRecord]: + """ + Reads the dump file and yield an EMuRecord object per record found in the dump. + If a record read from the dump doesn't have a detectable IRN then no record is + yielded and that record is skipped. + + :return: yields EMuRecord objects + """ + # cache this, so we don't have to look it up everytime we want to use it (for + # performance) + version = self.version + + with gzip.open(self.path, "rt", encoding="utf-8") as f: + # state variables for each record + emu_id = None + data = {} + + # each record is delimited in the EMu dump using a line with just ### on it. + # This chain here ensures that the file ends with a ### line even if one + # isn't in the file, thus forcing the record to be yielded if it's valid. + # Note that I've never seen a file not end with ### in the real world, but + # anything's possible with EMu! + for line in itertools.chain(f, ["###"]): + line = line.strip() + if not line: + continue + + if line != "###": + # the format is := + field, value = line.split("=", 1) + field = field.split(":", 1)[0] + + if field == EMU_ID_FIELD: + emu_id = int(value) + + existing = data.get(field) + if existing is None: + # the value isn't in the data dict, add it + data[field] = value + else: + if isinstance(existing, tuple): + # there is an existing set of values in the data dict, add + # the new value in a new tuple + data[field] = (*existing, value) + else: + # there is an existing value (just one) in the data dict, + # add the new value in a new tuple + data[field] = (existing, value) + else: + if emu_id is not None: + yield EMuRecord(emu_id, version, data) + + # refresh our record state holding variables + emu_id = None + data = {} + + +class EMuAuditDump(EMuDump): + """ + Class representing an EMu audit table export (or "dump") texexport file. + + Each file represents data from the EAudit table which accounts for changes to any + table in EMu. We specifically filter the audit table dumps for deletions, this is + achieved through an overriden __iter__ method. + """ + + def __iter__(self) -> Iterable[EMuRecord]: + def record_filter(record: EMuRecord): + # filter the dump's records so that only valid deletions are yielded + return ( + # we only want delete operations + record.data.get("AudOperation") == "delete" + # AudKey is the irn of the deleted record, so it must have this field + and "AudKey" in record.data + # and this is the table the record was deleted from + and "AudTable" in record.data + ) + + yield from filter(record_filter, super().__iter__()) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/emu/__init__.py b/tests/emu/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/emu/test_dumps.py b/tests/emu/test_dumps.py new file mode 100644 index 0000000..90bf949 --- /dev/null +++ b/tests/emu/test_dumps.py @@ -0,0 +1,241 @@ +import gzip +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock + +from splitgill.utils import to_timestamp + +from dataimporter.emu.dumps import ( + EMuTable, + find_emu_dumps, + EMuDump, + EMuAuditDump, + EMU_ID_FIELD, + EMuRecord, +) +from tests.helpers.dumps import create_dump + + +class TestEMuRecord: + def test_is_deleted(self): + assert EMuRecord(1, 2, {}).is_deleted + assert not EMuRecord(1, 2, {"a": "4"}).is_deleted + + def test_contains(self): + record = EMuRecord(1, 2, {"a": "4", "b": "26"}) + assert "a" in record + assert "b" in record + assert "c" not in record + + +class TestEMuTable: + def test_ordering(self): + tables = sorted(EMuTable) + assert tables[0] == EMuTable.eaudit + + def test_is_stored(self): + for table in EMuTable: + if table == EMuTable.eaudit: + assert not table.is_stored + else: + assert table.is_stored + + +class TestFindEMuDumps: + def test_no_files(self, tmp_path: Path): + assert not find_emu_dumps(tmp_path) + + def test_after_works(self, tmp_path: Path): + after = to_timestamp(datetime(2020, 3, 15)) + + for day in range(12, 18): + create_dump(tmp_path, EMuTable.ecatalogue, datetime(2020, 3, day)) + + dumps = find_emu_dumps(tmp_path, after=after) + assert len(dumps) == 3 + + def test_skip_invalid(self, tmp_path: Path): + create_dump(tmp_path, EMuTable.ecatalogue, datetime(2020, 3, 1)) + create_dump(tmp_path, "invalid", datetime(2020, 3, 2)) + + dumps = find_emu_dumps(tmp_path) + assert len(dumps) == 1 + + def test_audit_dumps(self, tmp_path: Path): + path_1 = create_dump(tmp_path, EMuTable.eaudit, datetime(2020, 2, 1)) + path_2 = create_dump(tmp_path, EMuTable.ecatalogue, datetime(2020, 3, 1)) + + dumps = find_emu_dumps(tmp_path) + + assert len(dumps) == 2 + assert dumps[0] == EMuAuditDump(path_1, EMuTable.eaudit, "20200201") + assert dumps[1] == EMuDump(path_2, EMuTable.ecatalogue, "20200301") + assert isinstance(dumps[0], EMuAuditDump) + assert isinstance(dumps[0], EMuDump) + + def test_order(self, tmp_path: Path): + path_4 = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4)) + path_2 = create_dump(tmp_path, EMuTable.ecatalogue, datetime(2020, 2, 1)) + path_1 = create_dump(tmp_path, EMuTable.eaudit, datetime(2020, 2, 1)) + path_5 = create_dump(tmp_path, EMuTable.emultimedia, datetime(2020, 3, 4)) + path_3 = create_dump(tmp_path, EMuTable.eaudit, datetime(2020, 2, 4)) + + dumps = find_emu_dumps(tmp_path) + + assert dumps == [ + EMuAuditDump(path_1, EMuTable.eaudit, "20200201"), + EMuAuditDump(path_2, EMuTable.ecatalogue, "20200201"), + EMuAuditDump(path_3, EMuTable.eaudit, "20200204"), + EMuAuditDump(path_4, EMuTable.etaxonomy, "20200204"), + EMuAuditDump(path_5, EMuTable.emultimedia, "20200304"), + ] + + +class TestEMuDump: + def test_version_parsing(self): + dump = EMuDump(MagicMock(), MagicMock(), "2020720") + assert dump.version == to_timestamp(datetime(2020, 7, 20)) + + def test_is_audit(self): + audit_dump = EMuDump(MagicMock(), EMuTable.eaudit, "2020720") + not_audit_dump = EMuDump(MagicMock(), EMuTable.ecatalogue, "2020720") + + assert audit_dump.is_audit + assert not not_audit_dump.is_audit + + def test_eq(self): + # eq should only care about the version and the table + assert EMuDump(MagicMock(), EMuTable.eaudit, "2020720") == EMuDump( + MagicMock(), EMuTable.eaudit, "2020720" + ) + assert not EMuDump(MagicMock(), EMuTable.ecatalogue, "2020720") == EMuDump( + MagicMock(), EMuTable.eaudit, "2020720" + ) + assert not EMuDump(MagicMock(), EMuTable.emultimedia, "2020720") == EMuDump( + MagicMock(), EMuTable.emultimedia, "2020721" + ) + + def test_lt(self): + a = EMuDump(MagicMock(), EMuTable.ecatalogue, "2020720") + b = EMuDump(MagicMock(), EMuTable.ecatalogue, "2020721") + assert a < b + + a = EMuDump(MagicMock(), EMuTable.ecatalogue, "2020720") + b = EMuDump(MagicMock(), EMuTable.emultimedia, "2020720") + assert a < b + + a = EMuDump(MagicMock(), EMuTable.emultimedia, "2020720") + b = EMuDump(MagicMock(), EMuTable.eaudit, "2020720") + assert b < a + + def test_size(self, tmp_path: Path): + path = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4)) + + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + + assert path.stat().st_size == dump.size + + def test_count(self, tmp_path: Path): + records = [{"x": i} for i in range(10)] + path = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4), *records) + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + assert dump.count == 10 + + def test_count_empty(self, tmp_path: Path): + path = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4)) + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + assert dump.count == 0 + + def test_iter(self, tmp_path: Path): + records = [{"x": str(i)} for i in range(10)] + path = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4), *records) + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + + read_records = list(dump) + + assert len(read_records) == len(records) + assert read_records == [ + EMuRecord( + i, dump.version, {"rownum": str(i), EMU_ID_FIELD: str(i), **record} + ) + for i, record in enumerate(records, start=1) + ] + + def test_iter_empty(self, tmp_path: Path): + path = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4)) + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + + assert not list(dump) + + def test_iter_missing_irn(self, tmp_path: Path): + path = tmp_path / "a_bad_dump.gz" + + with gzip.open(path, "wt", encoding="utf-8") as f: + # don't write an irn + f.writelines([f"rownum=1\n", f"x:1=beans\n", "###\n"]) + + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + + assert not list(dump) + + def test_iter_multiple_values(self, tmp_path: Path): + records = [{"x": (str(i), str(i + 1), str(i + 5))} for i in range(10)] + path = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4), *records) + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + + read_records = list(dump) + + assert len(read_records) == len(records) + assert read_records == [ + EMuRecord( + i, dump.version, {"rownum": str(i), EMU_ID_FIELD: str(i), **record} + ) + for i, record in enumerate(records, start=1) + ] + + def test_iter_blank_lines_and_no_delimiter_end(self, tmp_path: Path): + records = [{"x": (str(i), str(i + 1), str(i + 5))} for i in range(10)] + path = create_dump(tmp_path, EMuTable.etaxonomy, datetime(2020, 2, 4), *records) + with gzip.open(path, "at") as f: + # add a couple of new lines and don't add a ### at the end either + f.write("\n\n") + + dump = EMuDump(path, EMuTable.etaxonomy, "20200204") + read_records = list(dump) + assert len(read_records) == len(records) + + +class TestEMuAuditDump: + def test_iter(self, tmp_path: Path): + records = [ + {"AudOperation": "delete", "AudKey": str(i), "AudTable": "ecatalogue"} + for i in range(10) + ] + # add a record we should ignore + records.append( + { + "AudOperation": "not a delete!", + "AudKey": "100", + "AudTable": "ecatalogue", + } + ) + # add a delete on a table we don't deal with + records.append( + { + "AudOperation": "delete", + "AudKey": "101", + "AudTable": "not an emu table ever", + } + ) + + path = create_dump(tmp_path, EMuTable.eaudit, datetime(2020, 1, 4), *records) + dump = EMuAuditDump(path, EMuTable.eaudit, "20200104") + + read_records = list(dump) + assert len(read_records) == 11 + # check they are all deletes + assert all(record.data["AudOperation"] == "delete" for record in read_records) + assert all( + record.data["AudTable"] == "ecatalogue" for record in read_records[:-1] + ) + assert read_records[-1].data["AudTable"] == "not an emu table ever" diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/helpers/dumps.py b/tests/helpers/dumps.py new file mode 100644 index 0000000..3bc0bcf --- /dev/null +++ b/tests/helpers/dumps.py @@ -0,0 +1,59 @@ +import gzip +from datetime import datetime +from pathlib import Path +from typing import Union + +from dataimporter.emu.dumps import EMuTable, EMU_ID_FIELD + + +def create_dump( + root: Path, table: Union[str, EMuTable], date: datetime, *records: dict +) -> Path: + """ + Creates an EMu dump using the given parameters to form the path in the root, and + then adding the records. If no records are provided, a valid dump is still + generated, it will just include no records. + + :param root: the directory to put the dump in + :param table: the EMu table being dumped, doesn't need to be valid (hence the + str|EMuTable type) + :param date: the date of the dump + :param records: 0+ records as dicts + :return: the path of the created dump + """ + export_part = "export" + if isinstance(table, EMuTable): + # eaudit dumps have a slightly different name format to normal dumps + if table == EMuTable.eaudit: + export_part = "deleted-export" + table = table.name + + # form the path + dump = root / f"{table}.{export_part}.{date.strftime('%Y%m%d')}.gz" + + with gzip.open(dump, "wt", encoding="utf-8") as f: + for row, record in enumerate(records, start=1): + # create rownum and irn values for the record if it doesn't have them + row = record.get("rownum", row) + irn = record.get(EMU_ID_FIELD, row) + f.writelines( + [ + f"rownum={row}\n", + f"{EMU_ID_FIELD}:1={irn}\n", + ] + ) + + # write the other values + for key, value in record.items(): + # we've done these, ignore if found + if key == EMU_ID_FIELD or key == "rownum": + continue + + if not isinstance(value, (tuple, list)): + value = [value] + + f.writelines([f"{key}:{i}={v}\n" for i, v in enumerate(value)]) + + f.write("###\n") + + return dump