Skip to content

Commit

Permalink
implement test flow using temporal
Browse files Browse the repository at this point in the history
using temporal for the import makes it easier to find files with errors, rewriting the import to use it
  • Loading branch information
jarno-knaw committed Jul 30, 2024
1 parent 09ac539 commit 858dd7b
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 35 deletions.
79 changes: 79 additions & 0 deletions src/import/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
Activities for the import Temporal workflow
"""
import dataclasses
import os
import zipfile

from pathlib import Path

from temporalio import activity

from article import Article
from test_import import import_bulk


@dataclasses.dataclass
class UnzipFileConfiguration:
"""
Data class to hold the configuration for the unzip_file function.
"""
zip_path: str
data_path: str


@dataclasses.dataclass
class ParseFileArguments:
"""
Data class containing configuration for the parse_file function.
"""
names: list[str]


@dataclasses.dataclass
class SaveBulkArguments:
"""
Data class containing configuration for the save_bulk function.
"""
articles: list[dict]




@activity.defn
async def unzip_file(configuration: UnzipFileConfiguration) -> str:
"""
Unzip a file and return the contents of its files.
:param configuration:
:return: The location of the unzipped files
"""
with zipfile.ZipFile(configuration.zip_path, "r") as zip_ref:
zip_name = Path(configuration.zip_path).stem
unzip_dir = f"{configuration.data_path}/{zip_name}"
Path(unzip_dir).mkdir(parents=True, exist_ok=True)
zip_ref.extractall(unzip_dir)
return unzip_dir


@activity.defn
async def parse_file(args: ParseFileArguments) -> list[dict]:
"""
Parse file contents. We're not sure what is in the files so this is an action.
:param args:
:return:
"""
# return Article.from_file(args.name)
tmp = []
for file in args.names:
tmp.append(Article.from_file(file).to_dict())
return tmp


@activity.defn
async def save_bulk(args: SaveBulkArguments):
"""
Save a bulk of articles to elasticsearch
:param args:
:return:
"""
import_bulk(args.articles)
70 changes: 39 additions & 31 deletions src/import/article.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,51 @@ def set_from(self, from_raw):
:param from_raw: Raw string containing the 'from' name and email
:return:
"""
if '<' in from_raw:
# Format "First Last <[email protected]>"
from_parts = from_raw.split('<')
self.from_email = from_parts[-1].strip('> ')
self.from_name = '<'.join(from_parts[:-1]).strip(' ')
elif '(' in from_raw:
# Format: "[email protected] (First Last)"
from_parts = from_raw.split('(')
self.from_name = from_parts[-1].strip(') ')
self.from_email = '('.join(from_parts[:-1]).strip(' ')
else:
# Format: "[email protected]"
self.from_name = 'No name given'
self.from_email = from_raw
try:
if '<' in from_raw:
# Format "First Last <[email protected]>"
from_parts = from_raw.split('<')
self.from_email = from_parts[-1].strip('> ')
self.from_name = '<'.join(from_parts[:-1]).strip(' ')
elif '(' in from_raw:
# Format: "[email protected] (First Last)"
from_parts = from_raw.split('(')
self.from_name = from_parts[-1].strip(') ')
self.from_email = '('.join(from_parts[:-1]).strip(' ')
else:
# Format: "[email protected]"
self.from_name = 'No name given'
self.from_email = from_raw
except TypeError as e:
raise ValueError(
f"Could not parse from: '{from_raw}'. Article: {self.article_id}"
) from e

def to_dict(self):
"""
Get a dictionary representation of the article.
:return:
"""
return {
'id': self.article_id,
'path': self.headers['path'],
'folder': self.headers['location'],
'from_name': self.from_name,
'from_email': self.from_email,
'newsgroups': self.headers['newsgroups'],
'subject': self.headers['subject'],
'message_id': self.headers['subject'],
'date': self.date.isoformat(),
'year': self.date.year,
'x_gateway': self.headers['x_gateway'],
'lines': self.headers['lines'],
'xref': self.headers['xref'],
'references': self.references,
'body': self.body,
}
try:
return {
'id': self.article_id,
'path': self.headers['path'],
# 'folder': self.headers['location'],
'from_name': self.from_name,
'from_email': self.from_email,
'newsgroups': self.headers['newsgroups'],
'subject': self.headers['subject'],
'message_id': self.headers['subject'],
'date': self.date.isoformat(),
'year': self.date.year,
'x_gateway': self.headers['x_gateway'],
'lines': self.headers['lines'],
'xref': self.headers['xref'],
'references': self.references,
'body': self.body,
}
except TypeError as e:
raise Exception(f"Failed to serialize article '{self.article_id}'") from e

def to_json(self):
"""
Expand Down
127 changes: 127 additions & 0 deletions src/import/import_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
This file defines a workflow for using Temporal for importing archives.
"""
import asyncio
import dataclasses
from datetime import timedelta

from temporalio import workflow
from temporalio.workflow import ParentClosePolicy

with (workflow.unsafe.imports_passed_through()):
from activities import ParseFileArguments, SaveBulkArguments, UnzipFileConfiguration,parse_file, save_bulk, unzip_file
from pathlib import Path
import os


@dataclasses.dataclass
class ImportConfiguration:
"""
Data class for storing configuration parameters for import.
"""
zip_path: str
data_path: str
bulk_size: int


@dataclasses.dataclass
class ImportArticlesConfiguration:
"""
Data clas for storing configuration parameters for import.
"""
filenames: list[str]
index: int


@workflow.defn
class ImportArchive:
"""
Import workflow
"""

@staticmethod
def split_bulk(total: list, size: int) -> list:
"""
Split bulk articles into chunks.
:param total: complete set of items
:param size: size of chunks
:return: A list of lists, chunks of size :size: from :total:
"""
for i in range(0, len(total), size):
yield total[i:i + size]

@staticmethod
def files_in_dir(dirname: str) -> list[str]:
"""
Import files from a directory.
:param dirname:
:return:
"""
files = list(Path(dirname).rglob("*"))
articles = []
for file in files:
if not os.path.isfile(file):
continue
articles.append(file.as_posix())
return articles

@workflow.run
async def run(self, configuration: ImportConfiguration) -> None:
"""
Execute the workflow
:param configuration:
:return:
"""
# First we get the file contents from the zip
files_location = await workflow.execute_activity(
unzip_file,
UnzipFileConfiguration(
zip_path=configuration.zip_path,
data_path=configuration.data_path,
),
start_to_close_timeout=timedelta(minutes=5),
)

files = ImportArchive.files_in_dir(files_location)

generator = self.split_bulk(list(files), configuration.bulk_size)

threads = []
i = 0
for chunk in generator:
threads.append(workflow.execute_child_workflow(
ImportArticles.run,
ImportArticlesConfiguration(filenames=chunk, index=i),
id=f"import_{configuration.zip_path}_{i}",
parent_close_policy=ParentClosePolicy.ABANDON
))
i += 1

results = await asyncio.gather(*threads)

@workflow.defn
class ImportArticles:
"""
Child workflow for importing a bulk
"""
@workflow.run
async def run(self, configuration: ImportArticlesConfiguration):
"""
:param configuration:
:return:
"""
args = ParseFileArguments(names=configuration.filenames)
articles = await workflow.execute_activity(
parse_file,
args,
start_to_close_timeout=timedelta(minutes=15),
activity_id=f"parse_{configuration.index}",
)

await workflow.execute_activity(
save_bulk,
SaveBulkArguments(articles=articles),
start_to_close_timeout=timedelta(minutes=15),
activity_id=f"save_{configuration.index}",
)
43 changes: 43 additions & 0 deletions src/import/run_import_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
Start the import workflow.
"""
import asyncio
from pathlib import Path

from temporalio.client import Client

import import_workflow


async def main():
"""
Create a workflow in Temporal.
:return:
"""
client = await Client.connect("localhost:7233")

zipfiles = Path("data").rglob("*.zip")

threads = []

for file in zipfiles:
args = import_workflow.ImportConfiguration(
zip_path=file.as_posix(),
data_path="data",
bulk_size=10,
)

file_base = file.stem

threads.append(client.execute_workflow(
import_workflow.ImportArchive.run,
args,
id=f"import-{file_base}",
task_queue='occassio-import',
))

asyncio.gather(*threads)


if __name__ == "__main__":
asyncio.run(main())
16 changes: 12 additions & 4 deletions src/import/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,34 @@ def split_bulk(total, size):

def process_bulk(filenames):
"""
Process a bulk of filenames
Process a bulk of filenames into articles
:param filenames:
:return:
"""
articles = []
for filename in filenames:
try:
articles.append(Article.from_file(filename))
articles.append(Article.from_file(filename).to_dict())
# pylint: disable=broad-exception-caught
except Exception as e:
# pylint: enable=broad-exception-caught
print(f"Failed to parse file '{filename}'")
print(e)
import_bulk(articles)


def import_bulk(articles: list[dict]) -> None:
"""
Post a bulk of articles to Elasticsearch
:param articles:
:return:
"""
client = Elasticsearch("http://localhost:9200")
bulk = [
{
"_index": "articles",
"_id": article.id,
"_source": article.to_json(),
"_id": article['id'],
"_source": article,
}
for article in articles
]
Expand Down
22 changes: 22 additions & 0 deletions src/import/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio

from temporalio.client import Client
from temporalio.worker import Worker

from import_workflow import ImportArchive, ImportArticles
from activities import unzip_file, parse_file, save_bulk


async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue='occassio-import',
workflows=[ImportArchive, ImportArticles],
activities=[unzip_file, parse_file, save_bulk],
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

0 comments on commit 858dd7b

Please sign in to comment.