Skip to content

Commit

Permalink
refactor: change queue_emu_changes to always do one day's worth of data
Browse files Browse the repository at this point in the history
We're only ever using the only-do-one parameter, so might as well just make it default behaviour.
  • Loading branch information
jrdh committed Aug 10, 2024
1 parent 5988e57 commit e38c8d1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 58 deletions.
13 changes: 6 additions & 7 deletions dataimporter/cli/emu.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def auto(config: Config, one: bool = False, delay_sync: bool = False):
with DataImporter(config) as importer:
while True:
console.log("Queuing next dump set")
dates_queued = importer.queue_emu_changes(only_one=True)
if not dates_queued:
date_queued = importer.queue_emu_changes()
if date_queued is None:
console.log("No more dumps to import, done")
break

console.log(f"Date queued: {dates_queued[0].isoformat()}")
console.log(f"Date queued: {date_queued.isoformat()}")

for name in VIEW_NAMES:
console.log(f"Adding changes from {name} view to mongo")
Expand Down Expand Up @@ -92,13 +92,12 @@ def queue(amount: str, config: Config):

while amount > 0:
console.log("Queuing next dump set")
dates_queued = importer.queue_emu_changes(only_one=True)
if not dates_queued:
date_queued = importer.queue_emu_changes()
if date_queued is None:
console.log("No data to queue")
break
else:
date_queued = dates_queued[0].isoformat()
console.log(f"Date queued: {date_queued}")
console.log(f"Date queued: {date_queued.isoformat()}")
amount -= 1


Expand Down
70 changes: 30 additions & 40 deletions dataimporter/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,55 +170,45 @@ def queue_changes(self, records: Iterable[SourceRecord], store_name: str):
for view in views:
view.queue(batch)

def queue_emu_changes(self, only_one: bool = False) -> List[date]:
def queue_emu_changes(self) -> Optional[date]:
"""
Look for new EMu dumps, upsert the records into the appropriate DataDB and then
queue the changes into the derived views.
Look for new EMu dumps and if any are found beyond the date of the last queued
EMu import, add the next day's data to the stores and view queues.
:param only_one: if True, only process the first set of dumps and then return,
otherwise, process them all (default: False)
:return the dates that were queued
:return the date that was queued or None if no dumps were found
"""
last_queued = self.emu_status.get()
dump_sets = find_emu_dumps(self.config.dumps_path, after=last_queued)
if not dump_sets:
return []
return None

if only_one:
dump_sets = dump_sets[:1]
next_day_dump_set = dump_sets[0]

store_names = {store.name for store in self.stores if store.name != "gbif"}
dates_queued = []
for dump_set in dump_sets:
for dump in dump_set.dumps:
# normal tables are immediately processable, but if the dump is from
# the eaudit table we need to do some additional work because each
# audit record refers to a potentially different table from which it
# is deleting a record
if dump.table != "eaudit":
self.queue_changes(dump.read(), dump.table)
else:
# wrap the dump stream in a filter to only allow through records
# we want to process
filtered_dump = filter(
partial(is_valid_eaudit_record, tables=store_names),
dump.read(),
)
# queue the changes to each table's database in turn
for table, records in groupby(
filtered_dump, key=lambda record: record.data["AudTable"]
):
# convert the raw audit records into delete records as we
# queue them
self.queue_changes(
map(convert_eaudit_to_delete, records), table
)
# we've handled all the dumps from this date, update the last date stored on
# disk in case we fail later to avoid redoing work
self.emu_status.update(dump_set.date)
dates_queued.append(dump_set.date)

return dates_queued
for dump in next_day_dump_set.dumps:
# normal tables are immediately processable, but if the dump is from
# the eaudit table we need to do some additional work because each
# audit record refers to a potentially different table from which it
# is deleting a record
if dump.table != "eaudit":
self.queue_changes(dump.read(), dump.table)
else:
# wrap the dump stream in a filter to only allow through records
# we want to process
filtered_dump = filter(
partial(is_valid_eaudit_record, tables=store_names),
dump.read(),
)
# queue the changes to each table's database in turn
for table, records in groupby(
filtered_dump, key=lambda record: record.data["AudTable"]
):
# convert the raw audit records into delete records as we
# queue them
self.queue_changes(map(convert_eaudit_to_delete, records), table)

self.emu_status.update(next_day_dump_set.date)
return next_day_dump_set.date

def queue_gbif_changes(self):
"""
Expand Down
20 changes: 9 additions & 11 deletions tests/test_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_queue_emu_changes(self, importer: DataImporter, config: Config):
create_etaxonomy("2"),
)

importer.queue_emu_changes()
assert importer.queue_emu_changes() == first_dump_date

assert importer.get_store("ecatalogue").size() == 4
assert importer.get_store("emultimedia").size() == 3
Expand Down Expand Up @@ -148,7 +148,7 @@ def test_queue_emu_changes(self, importer: DataImporter, config: Config):
create_eaudit("1", "etaxonomy"),
)

importer.queue_emu_changes()
assert importer.queue_emu_changes() == second_dump_date

# these have all lost 1 to reflect the newly deleted records
assert importer.get_store("ecatalogue").size() == 3
Expand Down Expand Up @@ -183,7 +183,7 @@ def test_queue_emu_changes(self, importer: DataImporter, config: Config):
create_emultimedia("4"),
)

importer.queue_emu_changes()
assert importer.queue_emu_changes() == third_dump_date

assert importer.get_store("ecatalogue").size() == 3
# there's a new emultimedia record now
Expand Down Expand Up @@ -217,13 +217,6 @@ def test_queue_emu_changes_only_one(self, config: Config):
config.dumps_path, "etaxonomy", first_dump_date, create_etaxonomy("1")
)

importer.queue_emu_changes(only_one=True)

assert importer.emu_status.get() == first_dump_date
assert importer.get_store("ecatalogue").size() == 1
assert importer.get_store("emultimedia").size() == 1
assert importer.get_store("etaxonomy").size() == 1

second_dump_date = date(2023, 10, 4)
create_dump(
config.dumps_path,
Expand All @@ -238,8 +231,13 @@ def test_queue_emu_changes_only_one(self, config: Config):
config.dumps_path, "etaxonomy", second_dump_date, create_etaxonomy("2")
)

importer.queue_emu_changes(only_one=True)
assert importer.queue_emu_changes() == first_dump_date
assert importer.emu_status.get() == first_dump_date
assert importer.get_store("ecatalogue").size() == 1
assert importer.get_store("emultimedia").size() == 1
assert importer.get_store("etaxonomy").size() == 1

assert importer.queue_emu_changes() == second_dump_date
assert importer.emu_status.get() == second_dump_date
assert importer.get_store("ecatalogue").size() == 2
assert importer.get_store("emultimedia").size() == 2
Expand Down

0 comments on commit e38c8d1

Please sign in to comment.