diff --git a/src/import/activities.py b/src/import/activities.py new file mode 100644 index 0000000..5f8d16c --- /dev/null +++ b/src/import/activities.py @@ -0,0 +1,79 @@ +""" +Activities for the import Temporal workflow +""" +import dataclasses +import os +import zipfile + +from pathlib import Path + +from temporalio import activity + +from article import Article +from test_import import import_bulk + + +@dataclasses.dataclass +class UnzipFileConfiguration: + """ + Data class to hold the configuration for the unzip_file function. + """ + zip_path: str + data_path: str + + +@dataclasses.dataclass +class ParseFileArguments: + """ + Data class containing configuration for the parse_file function. + """ + names: list[str] + + +@dataclasses.dataclass +class SaveBulkArguments: + """ + Data class containing configuration for the save_bulk function. + """ + articles: list[dict] + + + + +@activity.defn +async def unzip_file(configuration: UnzipFileConfiguration) -> str: + """ + Unzip a file and return the contents of its files. + :param configuration: + :return: The location of the unzipped files + """ + with zipfile.ZipFile(configuration.zip_path, "r") as zip_ref: + zip_name = Path(configuration.zip_path).stem + unzip_dir = f"{configuration.data_path}/{zip_name}" + Path(unzip_dir).mkdir(parents=True, exist_ok=True) + zip_ref.extractall(unzip_dir) + return unzip_dir + + +@activity.defn +async def parse_file(args: ParseFileArguments) -> list[dict]: + """ + Parse file contents. We're not sure what is in the files so this is an action. + :param args: + :return: + """ + # return Article.from_file(args.name) + tmp = [] + for file in args.names: + tmp.append(Article.from_file(file).to_dict()) + return tmp + + +@activity.defn +async def save_bulk(args: SaveBulkArguments): + """ + Save a bulk of articles to elasticsearch + :param args: + :return: + """ + import_bulk(args.articles) diff --git a/src/import/article.py b/src/import/article.py index 71570b3..fb1b3b1 100644 --- a/src/import/article.py +++ b/src/import/article.py @@ -49,43 +49,51 @@ def set_from(self, from_raw): :param from_raw: Raw string containing the 'from' name and email :return: """ - if '<' in from_raw: - # Format "First Last " - from_parts = from_raw.split('<') - self.from_email = from_parts[-1].strip('> ') - self.from_name = '<'.join(from_parts[:-1]).strip(' ') - elif '(' in from_raw: - # Format: "email@address.com (First Last)" - from_parts = from_raw.split('(') - self.from_name = from_parts[-1].strip(') ') - self.from_email = '('.join(from_parts[:-1]).strip(' ') - else: - # Format: "email@address.com" - self.from_name = 'No name given' - self.from_email = from_raw + try: + if '<' in from_raw: + # Format "First Last " + from_parts = from_raw.split('<') + self.from_email = from_parts[-1].strip('> ') + self.from_name = '<'.join(from_parts[:-1]).strip(' ') + elif '(' in from_raw: + # Format: "email@address.com (First Last)" + from_parts = from_raw.split('(') + self.from_name = from_parts[-1].strip(') ') + self.from_email = '('.join(from_parts[:-1]).strip(' ') + else: + # Format: "email@address.com" + self.from_name = 'No name given' + self.from_email = from_raw + except TypeError as e: + raise ValueError( + f"Could not parse from: '{from_raw}'. Article: {self.article_id}" + ) from e def to_dict(self): """ Get a dictionary representation of the article. :return: """ - return { - 'id': self.article_id, - 'path': self.headers['path'], - 'folder': self.headers['location'], - 'from_name': self.from_name, - 'from_email': self.from_email, - 'newsgroups': self.headers['newsgroups'], - 'subject': self.headers['subject'], - 'message_id': self.headers['subject'], - 'date': self.date.isoformat(), - 'year': self.date.year, - 'x_gateway': self.headers['x_gateway'], - 'lines': self.headers['lines'], - 'xref': self.headers['xref'], - 'references': self.references, - 'body': self.body, - } + try: + return { + 'id': self.article_id, + 'path': self.headers['path'], + # 'folder': self.headers['location'], + 'from_name': self.from_name, + 'from_email': self.from_email, + 'newsgroups': self.headers['newsgroups'], + 'subject': self.headers['subject'], + 'message_id': self.headers['subject'], + 'date': self.date.isoformat(), + 'year': self.date.year, + 'x_gateway': self.headers['x_gateway'], + 'lines': self.headers['lines'], + 'xref': self.headers['xref'], + 'references': self.references, + 'body': self.body, + } + except TypeError as e: + raise Exception(f"Failed to serialize article '{self.article_id}'") from e def to_json(self): """ diff --git a/src/import/import_workflow.py b/src/import/import_workflow.py new file mode 100644 index 0000000..a69963c --- /dev/null +++ b/src/import/import_workflow.py @@ -0,0 +1,127 @@ +""" +This file defines a workflow for using Temporal for importing archives. +""" +import asyncio +import dataclasses +from datetime import timedelta + +from temporalio import workflow +from temporalio.workflow import ParentClosePolicy + +with (workflow.unsafe.imports_passed_through()): + from activities import ParseFileArguments, SaveBulkArguments, UnzipFileConfiguration,parse_file, save_bulk, unzip_file + from pathlib import Path + import os + + +@dataclasses.dataclass +class ImportConfiguration: + """ + Data class for storing configuration parameters for import. + """ + zip_path: str + data_path: str + bulk_size: int + + +@dataclasses.dataclass +class ImportArticlesConfiguration: + """ + Data clas for storing configuration parameters for import. + """ + filenames: list[str] + index: int + + +@workflow.defn +class ImportArchive: + """ + Import workflow + """ + + @staticmethod + def split_bulk(total: list, size: int) -> list: + """ + Split bulk articles into chunks. + :param total: complete set of items + :param size: size of chunks + :return: A list of lists, chunks of size :size: from :total: + """ + for i in range(0, len(total), size): + yield total[i:i + size] + + @staticmethod + def files_in_dir(dirname: str) -> list[str]: + """ + Import files from a directory. + :param dirname: + :return: + """ + files = list(Path(dirname).rglob("*")) + articles = [] + for file in files: + if not os.path.isfile(file): + continue + articles.append(file.as_posix()) + return articles + + @workflow.run + async def run(self, configuration: ImportConfiguration) -> None: + """ + Execute the workflow + :param configuration: + :return: + """ + # First we get the file contents from the zip + files_location = await workflow.execute_activity( + unzip_file, + UnzipFileConfiguration( + zip_path=configuration.zip_path, + data_path=configuration.data_path, + ), + start_to_close_timeout=timedelta(minutes=5), + ) + + files = ImportArchive.files_in_dir(files_location) + + generator = self.split_bulk(list(files), configuration.bulk_size) + + threads = [] + i = 0 + for chunk in generator: + threads.append(workflow.execute_child_workflow( + ImportArticles.run, + ImportArticlesConfiguration(filenames=chunk, index=i), + id=f"import_{configuration.zip_path}_{i}", + parent_close_policy=ParentClosePolicy.ABANDON + )) + i += 1 + + results = await asyncio.gather(*threads) + +@workflow.defn +class ImportArticles: + """ + Child workflow for importing a bulk + """ + @workflow.run + async def run(self, configuration: ImportArticlesConfiguration): + """ + + :param configuration: + :return: + """ + args = ParseFileArguments(names=configuration.filenames) + articles = await workflow.execute_activity( + parse_file, + args, + start_to_close_timeout=timedelta(minutes=15), + activity_id=f"parse_{configuration.index}", + ) + + await workflow.execute_activity( + save_bulk, + SaveBulkArguments(articles=articles), + start_to_close_timeout=timedelta(minutes=15), + activity_id=f"save_{configuration.index}", + ) diff --git a/src/import/run_import_workflow.py b/src/import/run_import_workflow.py new file mode 100644 index 0000000..e1343a6 --- /dev/null +++ b/src/import/run_import_workflow.py @@ -0,0 +1,43 @@ +""" +Start the import workflow. +""" +import asyncio +from pathlib import Path + +from temporalio.client import Client + +import import_workflow + + +async def main(): + """ + Create a workflow in Temporal. + :return: + """ + client = await Client.connect("localhost:7233") + + zipfiles = Path("data").rglob("*.zip") + + threads = [] + + for file in zipfiles: + args = import_workflow.ImportConfiguration( + zip_path=file.as_posix(), + data_path="data", + bulk_size=10, + ) + + file_base = file.stem + + threads.append(client.execute_workflow( + import_workflow.ImportArchive.run, + args, + id=f"import-{file_base}", + task_queue='occassio-import', + )) + + asyncio.gather(*threads) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/import/test_import.py b/src/import/test_import.py index eed68d5..c46941e 100755 --- a/src/import/test_import.py +++ b/src/import/test_import.py @@ -96,26 +96,34 @@ def split_bulk(total, size): def process_bulk(filenames): """ - Process a bulk of filenames + Process a bulk of filenames into articles :param filenames: :return: """ articles = [] for filename in filenames: try: - articles.append(Article.from_file(filename)) + articles.append(Article.from_file(filename).to_dict()) # pylint: disable=broad-exception-caught except Exception as e: # pylint: enable=broad-exception-caught print(f"Failed to parse file '{filename}'") print(e) + import_bulk(articles) + +def import_bulk(articles: list[dict]) -> None: + """ + Post a bulk of articles to Elasticsearch + :param articles: + :return: + """ client = Elasticsearch("http://localhost:9200") bulk = [ { "_index": "articles", - "_id": article.id, - "_source": article.to_json(), + "_id": article['id'], + "_source": article, } for article in articles ] diff --git a/src/import/worker.py b/src/import/worker.py new file mode 100644 index 0000000..952afac --- /dev/null +++ b/src/import/worker.py @@ -0,0 +1,22 @@ +import asyncio + +from temporalio.client import Client +from temporalio.worker import Worker + +from import_workflow import ImportArchive, ImportArticles +from activities import unzip_file, parse_file, save_bulk + + +async def main(): + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue='occassio-import', + workflows=[ImportArchive, ImportArticles], + activities=[unzip_file, parse_file, save_bulk], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main())