From 04cb8feafac85267dbcea979fcbcb8c6aa474a2e Mon Sep 17 00:00:00 2001 From: Alexey Gladkov Date: Wed, 11 Oct 2023 17:14:27 +0200 Subject: [PATCH] Synchronize subscriptions in parallel Signed-off-by: Alexey Gladkov --- jiramail/__init__.py | 13 +++++++ jiramail/command.py | 9 +---- jiramail/subs.py | 82 +++++++++++++++++++++++++++++++++----------- 3 files changed, 76 insertions(+), 28 deletions(-) diff --git a/jiramail/__init__.py b/jiramail/__init__.py index dd8695a..e3b0800 100644 --- a/jiramail/__init__.py +++ b/jiramail/__init__.py @@ -191,3 +191,16 @@ def read_config() -> Dict[str, Any] | Error: logger.info("config has been read") return config + + +def setup_logger(logger: logging.Logger, level: int, fmt: str) -> logging.Logger: + formatter = logging.Formatter(fmt=fmt, datefmt="%H:%M:%S") + + handler = logging.StreamHandler() + handler.setLevel(level) + handler.setFormatter(formatter) + + logger.setLevel(level) + logger.addHandler(handler) + + return logger diff --git a/jiramail/command.py b/jiramail/command.py index 6a9ecef..4d5a8c6 100644 --- a/jiramail/command.py +++ b/jiramail/command.py @@ -116,14 +116,7 @@ def setup_logger(cmdargs: argparse.Namespace) -> None: if cmdargs.quiet: level = logging.CRITICAL - fmt = logging.Formatter(fmt="%(asctime)s %(message)s", datefmt="[%H:%M:%S]") - - handlr = logging.StreamHandler() - handlr.setLevel(level) - handlr.setFormatter(fmt) - - logger.setLevel(level) - logger.addHandler(handlr) + jiramail.setup_logger(logger, level=level, fmt="[%(asctime)s] %(message)s") def cmd() -> int: diff --git a/jiramail/subs.py b/jiramail/subs.py index 2f5a007..28feee5 100644 --- a/jiramail/subs.py +++ b/jiramail/subs.py @@ -5,8 +5,11 @@ __author__ = 'Alexey Gladkov ' import argparse +import multiprocessing import re +from typing import Dict, List, Any + import jiramail import jiramail.mbox @@ -14,6 +17,40 @@ config_section = "sub" +def sync_mailbox(config: Dict[str, Any], mailbox: str, queries: Dict[str, List[str]]) -> int: + logger = jiramail.setup_logger(multiprocessing.get_logger(), + level=jiramail.logger.level, + fmt="[%(asctime)s] pid=%(process)d: %(message)s") + jiramail.logger = logger + jiramail.mbox.logger = logger + + logger.critical("process started for `%s'", mailbox) + + try: + jiramail.jserv = jiramail.Connection(config.get("jira", {})) + except Exception as e: + logger.critical("unable to connect to jira: %s", e) + return jiramail.EX_FAILURE + + try: + mbox = jiramail.Mailbox(mailbox) + except Exception as e: + logger.critical("unable to open mailbox: %s", e) + return jiramail.EX_FAILURE + + for target in queries.keys(): + logger.info("syncing subscription `%s' to `%s' ...", target, mailbox) + + for query in queries[target]: + jiramail.mbox.process_query(query, mbox) + + logger.critical("section `%s' synced", target) + + mbox.close() + + return jiramail.EX_SUCCESS + + # pylint: disable-next=unused-argument def main(cmdargs: argparse.Namespace) -> int: config = jiramail.read_config() @@ -25,13 +62,7 @@ def main(cmdargs: argparse.Namespace) -> int: if config_section not in config: return jiramail.EX_SUCCESS - try: - jiramail.jserv = jiramail.Connection(config.get("jira", {})) - except Exception as e: - logger.critical("unable to connect to jira: %s", e) - return jiramail.EX_FAILURE - - ret = jiramail.EX_SUCCESS + mailboxes: Dict[str, Dict[str, List[str]]] = {} for target in config[config_section]: section = config[config_section][target] @@ -43,29 +74,40 @@ def main(cmdargs: argparse.Namespace) -> int: if "mbox" not in section: logger.critical("section `%s.%s' does not contain the 'mbox' parameter which specifies the output mbox file.", config_section, target) - ret = jiramail.EX_FAILURE - continue + return jiramail.EX_FAILURE mailbox = section["mbox"] - logger.info("syncing section `%s' to `%s' ...", target, mailbox) + if mailbox not in mailboxes: + mailboxes[mailbox] = {} - try: - mbox = jiramail.Mailbox(mailbox) - except Exception as e: - logger.critical("unable to open mailbox: %s", e) - ret = jiramail.EX_FAILURE - continue + if target not in mailboxes[mailbox]: + mailboxes[mailbox][target] = [] if "assignee" in section: username = section["assignee"] - jiramail.mbox.process_query(f"assignee = '{username}'", mbox) + mailboxes[mailbox][target].append(f"assignee = '{username}'") if "query" in section: - jiramail.mbox.process_query(section["query"], mbox) + mailboxes[mailbox][target].append(section["query"]) - mbox.close() + nprocs = min(5, len(mailboxes.keys())) - logger.critical("section `%s' synced", target) + if nprocs == 0: + return jiramail.EX_SUCCESS + + ret = jiramail.EX_SUCCESS + + with multiprocessing.Pool(processes=nprocs) as pool: + results = [] + + for mailbox, queries in mailboxes.items(): + results.append(pool.apply_async(sync_mailbox, (config, mailbox, queries,))) + + for result in results: + rc = result.get() + + if rc != jiramail.EX_SUCCESS: + ret = rc return ret