Skip to content

Commit

Permalink
Synchronize subscriptions in parallel
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Gladkov <[email protected]>
  • Loading branch information
legionus committed Oct 11, 2023
1 parent 0cad95d commit 04cb8fe
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 28 deletions.
13 changes: 13 additions & 0 deletions jiramail/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,16 @@ def read_config() -> Dict[str, Any] | Error:

logger.info("config has been read")
return config


def setup_logger(logger: logging.Logger, level: int, fmt: str) -> logging.Logger:
formatter = logging.Formatter(fmt=fmt, datefmt="%H:%M:%S")

handler = logging.StreamHandler()
handler.setLevel(level)
handler.setFormatter(formatter)

logger.setLevel(level)
logger.addHandler(handler)

return logger
9 changes: 1 addition & 8 deletions jiramail/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,7 @@ def setup_logger(cmdargs: argparse.Namespace) -> None:
if cmdargs.quiet:
level = logging.CRITICAL

fmt = logging.Formatter(fmt="%(asctime)s %(message)s", datefmt="[%H:%M:%S]")

handlr = logging.StreamHandler()
handlr.setLevel(level)
handlr.setFormatter(fmt)

logger.setLevel(level)
logger.addHandler(handlr)
jiramail.setup_logger(logger, level=level, fmt="[%(asctime)s] %(message)s")


def cmd() -> int:
Expand Down
82 changes: 62 additions & 20 deletions jiramail/subs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,52 @@
__author__ = 'Alexey Gladkov <[email protected]>'

import argparse
import multiprocessing
import re

from typing import Dict, List, Any

import jiramail
import jiramail.mbox

logger = jiramail.logger
config_section = "sub"


def sync_mailbox(config: Dict[str, Any], mailbox: str, queries: Dict[str, List[str]]) -> int:
logger = jiramail.setup_logger(multiprocessing.get_logger(),
level=jiramail.logger.level,
fmt="[%(asctime)s] pid=%(process)d: %(message)s")
jiramail.logger = logger
jiramail.mbox.logger = logger

logger.critical("process started for `%s'", mailbox)

try:
jiramail.jserv = jiramail.Connection(config.get("jira", {}))
except Exception as e:
logger.critical("unable to connect to jira: %s", e)
return jiramail.EX_FAILURE

try:
mbox = jiramail.Mailbox(mailbox)
except Exception as e:
logger.critical("unable to open mailbox: %s", e)
return jiramail.EX_FAILURE

for target in queries.keys():
logger.info("syncing subscription `%s' to `%s' ...", target, mailbox)

for query in queries[target]:
jiramail.mbox.process_query(query, mbox)

logger.critical("section `%s' synced", target)

mbox.close()

return jiramail.EX_SUCCESS


# pylint: disable-next=unused-argument
def main(cmdargs: argparse.Namespace) -> int:
config = jiramail.read_config()
Expand All @@ -25,13 +62,7 @@ def main(cmdargs: argparse.Namespace) -> int:
if config_section not in config:
return jiramail.EX_SUCCESS

try:
jiramail.jserv = jiramail.Connection(config.get("jira", {}))
except Exception as e:
logger.critical("unable to connect to jira: %s", e)
return jiramail.EX_FAILURE

ret = jiramail.EX_SUCCESS
mailboxes: Dict[str, Dict[str, List[str]]] = {}

for target in config[config_section]:
section = config[config_section][target]
Expand All @@ -43,29 +74,40 @@ def main(cmdargs: argparse.Namespace) -> int:
if "mbox" not in section:
logger.critical("section `%s.%s' does not contain the 'mbox' parameter which specifies the output mbox file.",
config_section, target)
ret = jiramail.EX_FAILURE
continue
return jiramail.EX_FAILURE

mailbox = section["mbox"]

logger.info("syncing section `%s' to `%s' ...", target, mailbox)
if mailbox not in mailboxes:
mailboxes[mailbox] = {}

try:
mbox = jiramail.Mailbox(mailbox)
except Exception as e:
logger.critical("unable to open mailbox: %s", e)
ret = jiramail.EX_FAILURE
continue
if target not in mailboxes[mailbox]:
mailboxes[mailbox][target] = []

if "assignee" in section:
username = section["assignee"]
jiramail.mbox.process_query(f"assignee = '{username}'", mbox)
mailboxes[mailbox][target].append(f"assignee = '{username}'")

if "query" in section:
jiramail.mbox.process_query(section["query"], mbox)
mailboxes[mailbox][target].append(section["query"])

mbox.close()
nprocs = min(5, len(mailboxes.keys()))

logger.critical("section `%s' synced", target)
if nprocs == 0:
return jiramail.EX_SUCCESS

ret = jiramail.EX_SUCCESS

with multiprocessing.Pool(processes=nprocs) as pool:
results = []

for mailbox, queries in mailboxes.items():
results.append(pool.apply_async(sync_mailbox, (config, mailbox, queries,)))

for result in results:
rc = result.get()

if rc != jiramail.EX_SUCCESS:
ret = rc

return ret

0 comments on commit 04cb8fe

Please sign in to comment.