diff --git a/deploy/dev/huey_events.sh b/deploy/dev/huey_events.sh new file mode 100755 index 0000000000..8296525df3 --- /dev/null +++ b/deploy/dev/huey_events.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# ~~LongRunning:Queue->Huey:Technology~~ +huey_consumer.py portality.tasks.consumer_events_queue.events_queue >> ~/huey_events_queue.log 2>&1 diff --git a/deploy/dev/huey_scheduled_long.sh b/deploy/dev/huey_scheduled_long.sh new file mode 100755 index 0000000000..3a3cfc0b0c --- /dev/null +++ b/deploy/dev/huey_scheduled_long.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# ~~LongRunning:Queue->Huey:Technology~~ +huey_consumer.py portality.tasks.consumer_scheduled_long_queue.scheduled_long_queue >> ~/huey_scheduled_long_queue.log 2>&1 diff --git a/deploy/dev/huey_scheduled_short.sh b/deploy/dev/huey_scheduled_short.sh new file mode 100755 index 0000000000..532519cdea --- /dev/null +++ b/deploy/dev/huey_scheduled_short.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +# ~~LongRunning:Queue->Huey:Technology~~ +huey_consumer.py portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue >> ~/huey_scheduled_short_queue.log 2>&1 diff --git a/deploy/supervisor/production-background/huey-events.conf b/deploy/supervisor/production-background/huey-events.conf new file mode 100644 index 0000000000..f4625a1f26 --- /dev/null +++ b/deploy/supervisor/production-background/huey-events.conf @@ -0,0 +1,9 @@ +[program:huey-events] +command=/home/cloo/doaj/bin/python /home/cloo/doaj/bin/huey_consumer.py -v portality.tasks.consumer_events_queue.events_queue +environment= DOAJENV=production +user=cloo +directory=/home/cloo/doaj/src/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/production-background/huey-scheduled-long.conf b/deploy/supervisor/production-background/huey-scheduled-long.conf new file mode 100644 index 0000000000..6f49379c1c --- /dev/null +++ b/deploy/supervisor/production-background/huey-scheduled-long.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-long] +command=/home/cloo/doaj/bin/python /home/cloo/doaj/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_long_queue.scheduled_long_queue +environment= DOAJENV=production +user=cloo +directory=/home/cloo/doaj/src/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/production-background/huey-scheduled-short.conf b/deploy/supervisor/production-background/huey-scheduled-short.conf new file mode 100644 index 0000000000..ba6eac7525 --- /dev/null +++ b/deploy/supervisor/production-background/huey-scheduled-short.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-short] +command=/home/cloo/doaj/bin/python /home/cloo/doaj/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue +environment= DOAJENV=production +user=cloo +directory=/home/cloo/doaj/src/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/test/huey-events.conf b/deploy/supervisor/test/huey-events.conf new file mode 100644 index 0000000000..8a6f9de104 --- /dev/null +++ b/deploy/supervisor/test/huey-events.conf @@ -0,0 +1,9 @@ +[program:huey-events] +command=/home/cloo/doaj/bin/python /home/cloo/doaj/bin/huey_consumer.py -v portality.tasks.consumer_events_queue.events_queue +environment= DOAJENV=test +user=cloo +directory=/home/cloo/doaj/src/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/test/huey-scheduled-long.conf b/deploy/supervisor/test/huey-scheduled-long.conf new file mode 100644 index 0000000000..fa10f8f997 --- /dev/null +++ b/deploy/supervisor/test/huey-scheduled-long.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-long] +command=/home/cloo/doaj/bin/python /home/cloo/doaj/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_long_queue.scheduled_long_queue +environment= DOAJENV=test +user=cloo +directory=/home/cloo/doaj/src/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/deploy/supervisor/test/huey-scheduled-short.conf b/deploy/supervisor/test/huey-scheduled-short.conf new file mode 100644 index 0000000000..758b60a2c6 --- /dev/null +++ b/deploy/supervisor/test/huey-scheduled-short.conf @@ -0,0 +1,9 @@ +[program:huey-scheduled-short] +command=/home/cloo/doaj/bin/python /home/cloo/doaj/bin/huey_consumer.py -v portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue +environment= DOAJENV=test +user=cloo +directory=/home/cloo/doaj/src/doaj +stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log +stderr_logfile=/var/log/supervisor/%(program_name)s-error.log +autostart=true +autorestart=true diff --git a/doajtest/fixtures/background.py b/doajtest/fixtures/background.py index 9f89581c7d..4733c9b5a1 100644 --- a/doajtest/fixtures/background.py +++ b/doajtest/fixtures/background.py @@ -52,12 +52,14 @@ def cleanup(self): def save_mock_bgjob(action=None, status=None, created_before_sec=0, is_save=True, - queue_id=None): + blocking=True, queue_id=None): bgjob = BackgroundJob() - if action: + if not action: from portality.tasks.journal_csv import JournalCSVBackgroundTask bgjob.action = JournalCSVBackgroundTask.__action__ + else: + bgjob.action = action if status: bgjob._set_with_struct("status", status) @@ -69,6 +71,6 @@ def save_mock_bgjob(action=None, status=None, created_before_sec=0, is_save=True bgjob.queue_id = queue_id if is_save: - bgjob.save(blocking=True) + bgjob.save(blocking=blocking) return bgjob diff --git a/doajtest/helpers.py b/doajtest/helpers.py index 1fcf47eba3..c5072eb531 100644 --- a/doajtest/helpers.py +++ b/doajtest/helpers.py @@ -19,7 +19,7 @@ from portality.lib import paths, dates from portality.lib.dates import FMT_DATE_STD from portality.lib.thread_utils import wait_until -from portality.tasks.redis_huey import main_queue, long_running +from portality.tasks.redis_huey import events_queue, scheduled_short_queue, scheduled_long_queue from portality.util import url_for @@ -163,10 +163,13 @@ def setUpClass(cls) -> None: # always_eager has been replaced by immediate # for huey version > 2 # https://huey.readthedocs.io/en/latest/guide.html - main_queue.always_eager = True - long_running.always_eager = True - main_queue.immediate = True - long_running.immediate = True + events_queue.always_eager = True + scheduled_short_queue.always_eager = True + scheduled_long_queue.always_eager = True + + events_queue.immediate = True + scheduled_short_queue.immediate = True + scheduled_long_queue.immediate = True dao.DomainObject.save = dao_proxy(dao.DomainObject.save, type="instance") dao.DomainObject.delete = dao_proxy(dao.DomainObject.delete, type="instance") diff --git a/doajtest/matrices/background_task_status/background_task_status.matrix.csv b/doajtest/matrices/background_task_status/background_task_status.matrix.csv new file mode 100644 index 0000000000..ec93701d08 --- /dev/null +++ b/doajtest/matrices/background_task_status/background_task_status.matrix.csv @@ -0,0 +1,36 @@ +test_id,in_queue,oldest_queued,error_count,error_age,lrs_success_or_error,queued,errors,lrs +1,0,young,0,out_of_period,complete,stable,stable,stable +2,0,young,0,out_of_period,error,stable,stable,unstable +3,0,young,0,out_of_period,empty,stable,stable,unstable +4,0,young,1,in_period,complete,stable,unstable,stable +5,0,young,1,in_period,error,stable,unstable,unstable +6,0,young,1,out_of_period,complete,stable,stable,stable +7,0,young,1,out_of_period,error,stable,stable,unstable +8,1,old,0,out_of_period,complete,unstable,stable,stable +9,1,old,0,out_of_period,error,unstable,stable,unstable +10,1,old,0,out_of_period,empty,unstable,stable,unstable +11,1,old,1,in_period,complete,unstable,unstable,stable +12,1,old,1,in_period,error,unstable,unstable,unstable +13,1,old,1,out_of_period,complete,unstable,stable,stable +14,1,old,1,out_of_period,error,unstable,stable,unstable +15,1,young,0,out_of_period,complete,stable,stable,stable +16,1,young,0,out_of_period,error,stable,stable,unstable +17,1,young,0,out_of_period,empty,stable,stable,unstable +18,1,young,1,in_period,complete,stable,unstable,stable +19,1,young,1,in_period,error,stable,unstable,unstable +20,1,young,1,out_of_period,complete,stable,stable,stable +21,1,young,1,out_of_period,error,stable,stable,unstable +22,2,old,0,out_of_period,complete,unstable,stable,stable +23,2,old,0,out_of_period,error,unstable,stable,unstable +24,2,old,0,out_of_period,empty,unstable,stable,unstable +25,2,old,1,in_period,complete,unstable,unstable,stable +26,2,old,1,in_period,error,unstable,unstable,unstable +27,2,old,1,out_of_period,complete,unstable,stable,stable +28,2,old,1,out_of_period,error,unstable,stable,unstable +29,2,young,0,out_of_period,complete,unstable,stable,stable +30,2,young,0,out_of_period,error,unstable,stable,unstable +31,2,young,0,out_of_period,empty,unstable,stable,unstable +32,2,young,1,in_period,complete,unstable,unstable,stable +33,2,young,1,in_period,error,unstable,unstable,unstable +34,2,young,1,out_of_period,complete,unstable,stable,stable +35,2,young,1,out_of_period,error,unstable,stable,unstable diff --git a/doajtest/matrices/background_task_status/background_task_status.settings.csv b/doajtest/matrices/background_task_status/background_task_status.settings.csv new file mode 100644 index 0000000000..755877f2bb --- /dev/null +++ b/doajtest/matrices/background_task_status/background_task_status.settings.csv @@ -0,0 +1,17 @@ +field,test_id,in_queue,oldest_queued,error_count,error_age,lrs_success_or_error,queued,errors,lrs +type,index,generated,generated,generated,generated,generated,conditional,conditional,conditional +default,,,,,,,stable,stable,stable +,,,,,,,,, +values,,0,old,0,in_period,complete,stable,stable,stable +values,,1,young,1,out_of_period,error,unstable,unstable,unstable +values,,2,,,,empty,,, +,,,,,,,,, +constraint in_queue,,0,young,,,,,, +constraint error_count,,,,0,out_of_period,,,, +constraint error_count,,,,1,,!empty,,, +,,,,,,,,, +conditional queued,,2,,,,,unstable,, +conditional queued,,,old,,,,unstable,, +conditional errors,,,,1,in_period,,,unstable, +conditional lrs,,,,,,error,,,unstable +conditional lrs,,,,,,empty,,,unstable \ No newline at end of file diff --git a/doajtest/matrices/background_task_status/background_task_status.settings.json b/doajtest/matrices/background_task_status/background_task_status.settings.json new file mode 100644 index 0000000000..f103b658bc --- /dev/null +++ b/doajtest/matrices/background_task_status/background_task_status.settings.json @@ -0,0 +1,150 @@ +{ + "parameters": [ + { + "name": "test_id", + "type": "index" + }, + { + "name": "in_queue", + "type": "generated", + "values": { + "0": { + "constraints": { + "oldest_queued": { + "or": [ + "young" + ] + } + } + }, + "1": {}, + "2": {} + } + }, + { + "name": "oldest_queued", + "type": "generated", + "values": { + "old": {}, + "young": {} + } + }, + { + "name": "error_count", + "type": "generated", + "values": { + "0": { + "constraints": { + "error_age": { + "or": [ + "out_of_period" + ] + } + } + }, + "1": { + "constraints": { + "lrs_success_or_error": { + "nor": [ + "empty" + ] + } + } + } + } + }, + { + "name": "error_age", + "type": "generated", + "values": { + "in_period": {}, + "out_of_period": {} + } + }, + { + "name": "lrs_success_or_error", + "type": "generated", + "values": { + "complete": {}, + "error": {}, + "empty": {} + } + }, + { + "name": "queued", + "type": "conditional", + "default": "stable", + "values": { + "stable": {}, + "unstable": { + "conditions": [ + { + "in_queue": { + "or": [ + "2" + ] + } + }, + { + "oldest_queued": { + "or": [ + "old" + ] + } + } + ] + } + } + }, + { + "name": "errors", + "type": "conditional", + "default": "stable", + "values": { + "stable": {}, + "unstable": { + "conditions": [ + { + "error_count": { + "or": [ + "1" + ] + }, + "error_age": { + "or": [ + "in_period" + ] + } + } + ] + } + } + }, + { + "name": "lrs", + "type": "conditional", + "default": "stable", + "values": { + "stable": {}, + "unstable": { + "conditions": [ + { + "lrs_success_or_error": { + "or": [ + "error" + ] + } + }, + { + "lrs_success_or_error": { + "or": [ + "empty" + ] + } + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/doajtest/unit/test_background_helper.py b/doajtest/unit/test_background_helper.py index fe8f4b3e0d..d2c46ef7de 100644 --- a/doajtest/unit/test_background_helper.py +++ b/doajtest/unit/test_background_helper.py @@ -7,15 +7,16 @@ from portality.background import BackgroundTask from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running, main_queue +from portality.tasks.redis_huey import events_queue, scheduled_short_queue, scheduled_long_queue class TestBackgroundHelper(TestCase): def test_get_queue_id_by_task_queue(self): cases = [ - (long_running, constants.BGJOB_QUEUE_ID_LONG), - (main_queue, constants.BGJOB_QUEUE_ID_MAIN), + (events_queue, constants.BGJOB_QUEUE_ID_EVENTS), + (scheduled_long_queue, constants.BGJOB_QUEUE_ID_SCHEDULED_LONG), + (scheduled_short_queue, constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT), (None, constants.BGJOB_QUEUE_ID_UNKNOWN), ] @@ -57,7 +58,7 @@ def tearDownClass(cls) -> None: helpers.patch_config(app, cls.org_config) def test_register_schedule(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_a)) + helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_a)) @helper.register_schedule def _fn(): @@ -66,7 +67,7 @@ def _fn(): assert isinstance(_fn, huey.api.TaskWrapper) def test_register_schedule__schedule_not_found(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, + helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_schedule_not_exist)) with self.assertRaises(RuntimeError): @helper.register_schedule @@ -74,7 +75,7 @@ def _fn(): print('fake fn') def test_register_execute(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, fixture_bgtask_class(self.task_name_b)) + helper = background_helper.RedisHueyTaskHelper(scheduled_short_queue, fixture_bgtask_class(self.task_name_b)) @helper.register_execute(is_load_config=True) def _fn(): @@ -84,7 +85,7 @@ def _fn(): assert _fn.retries == self.expected_retries def test_register_execute__config_not_found(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, + helper = background_helper.RedisHueyTaskHelper(events_queue, fixture_bgtask_class(self.task_name_schedule_not_exist)) with self.assertRaises(RuntimeError): @@ -93,7 +94,7 @@ def _fn(): print('fake fn') def test_register_execute__without_load_config(self): - helper = background_helper.RedisHueyTaskHelper(main_queue, + helper = background_helper.RedisHueyTaskHelper(events_queue, fixture_bgtask_class(self.task_name_schedule_not_exist)) @helper.register_execute(is_load_config=False) diff --git a/doajtest/unit/test_background_task_status.py b/doajtest/unit/test_background_task_status.py deleted file mode 100644 index 05b163a356..0000000000 --- a/doajtest/unit/test_background_task_status.py +++ /dev/null @@ -1,235 +0,0 @@ -import json - -from doajtest.fixtures.background import save_mock_bgjob -from doajtest.helpers import DoajTestCase, apply_test_case_config, patch_config -from portality import constants -from portality.bll import DOAJ -from portality.tasks.anon_export import AnonExportBackgroundTask -from portality.tasks.journal_csv import JournalCSVBackgroundTask - -background_task_status = DOAJ.backgroundTaskStatusService() -is_stable = background_task_status.is_stable - -# config BG_MONITOR_ERRORS_CONFIG -bg_monitor_errors_config__empty = { - 'BG_MONITOR_ERRORS_CONFIG': {} -} - -bg_monitor_errors_config__a = { - 'BG_MONITOR_ERRORS_CONFIG': { - JournalCSVBackgroundTask.__action__: { - 'check_sec': 3600, - 'allowed_num_err': 0, - } - } -} - -bg_monitor_errors_config__b = { - 'BG_MONITOR_ERRORS_CONFIG': { - 'kajdlaksj': { - 'check_sec': 3600, - 'allowed_num_err': 0, - } - } -} - -# config BG_MONITOR_QUEUED_CONFIG -bg_monitor_queued_config__a = { - 'BG_MONITOR_QUEUED_CONFIG': { - 'journal_csv': { - 'total': 99999999, - 'oldest': 1200, - } - } -} - -bg_monitor_queued_config__zero_total = { - 'BG_MONITOR_QUEUED_CONFIG': { - 'journal_csv': { - 'total': 0, - 'oldest': 1200, - } - } -} - -# config BG_MONITOR_LAST_COMPLETED -bg_monitor_last_completed__now = { - 'BG_MONITOR_LAST_COMPLETED': { - 'main_queue': 0, - 'long_running': 0, - } -} - -bg_monitor_last_completed__a = { - 'BG_MONITOR_LAST_COMPLETED': { - 'main_queue': 10000, - 'long_running': 10000, - } -} - - -class TestBackgroundTaskStatus(DoajTestCase): - @classmethod - def setUpClass(cls) -> None: - super().setUpClass() - cls.org_config = patch_config(cls.app_test, { - 'HUEY_SCHEDULE': { - JournalCSVBackgroundTask.__action__: constants.CRON_NEVER, - AnonExportBackgroundTask.__action__: constants.CRON_NEVER, - }, - }) - - @classmethod - def tearDownClass(cls) -> None: - super().tearDownClass() - patch_config(cls.app_test, cls.org_config) - - @staticmethod - def assert_stable_dict(val: dict): - assert is_stable(val.get('status')) - assert len(val.get('err_msgs')) == 0 - - @staticmethod - def assert_unstable_dict(val): - assert not is_stable(val.get('status')) - assert len(val.get('err_msgs')) - - @apply_test_case_config(bg_monitor_last_completed__now) - def test_create_background_status__invalid_last_completed__main_queue(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_MAIN, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - assert not is_stable(status_dict['status']) - self.assert_unstable_dict(status_dict['queues'].get('main_queue', {})) - self.assert_stable_dict(status_dict['queues'].get('long_running', {})) - - @apply_test_case_config(bg_monitor_last_completed__now) - def test_create_background_status__invalid_last_completed__long_running(self): - save_mock_bgjob(AnonExportBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_LONG, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - assert not is_stable(status_dict['status']) - self.assert_stable_dict(status_dict['queues'].get('main_queue', {})) - self.assert_unstable_dict(status_dict['queues'].get('long_running', {})) - - @apply_test_case_config(bg_monitor_last_completed__a) - def test_create_background_status__valid_last_completed(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_MAIN, - status=constants.BGJOB_STATUS_COMPLETE, ) - save_mock_bgjob(AnonExportBackgroundTask.__action__, - queue_id=constants.BGJOB_QUEUE_ID_LONG, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - assert is_stable(status_dict['status']) - self.assert_stable_dict(status_dict['queues'].get('main_queue', {})) - self.assert_stable_dict(status_dict['queues'].get('long_running', {})) - - @apply_test_case_config(bg_monitor_last_completed__now) - def test_create_background_status__valid_last_completed__no_record(self): - status_dict = background_task_status.create_background_status() - assert is_stable(status_dict['status']) - - @apply_test_case_config(bg_monitor_errors_config__empty) - def test_create_background_status__empty_errors_config(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_ERROR, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - assert journal_csv_dict - # unstable action should be on top of the list after sorting - first_key = next(iter(status_dict['queues']['main_queue']['errors'])) - assert not is_stable(status_dict['queues']['main_queue']['errors'][first_key]['status']) - - @apply_test_case_config(bg_monitor_errors_config__a) - def test_create_background_status__error_in_period_found(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_ERROR, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - self.assert_unstable_dict(journal_csv_dict) - assert journal_csv_dict.get('in_monitoring_period', 0) > 0 - - @apply_test_case_config(bg_monitor_errors_config__a) - def test_create_background_status__error_in_period_not_found(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_ERROR, - created_before_sec=1000000000) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['errors'].get(JournalCSVBackgroundTask.__action__, {}) - - assert is_stable(status_dict['status']) - self.assert_stable_dict(journal_csv_dict) - assert journal_csv_dict.get('in_monitoring_period', 0) == 0 - - @apply_test_case_config(bg_monitor_queued_config__zero_total) - def test_create_background_status__queued_invalid_total(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_QUEUED, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - assert journal_csv_dict.get('total', 0) - self.assert_unstable_dict(journal_csv_dict) - - @apply_test_case_config(bg_monitor_queued_config__zero_total) - def test_create_background_status__queued_valid_total(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_COMPLETE, ) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert is_stable(status_dict['status']) - assert journal_csv_dict.get('total', 0) == 0 - self.assert_stable_dict(journal_csv_dict) - - @apply_test_case_config(bg_monitor_queued_config__a) - def test_create_background_status__queued_invalid_oldest(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_QUEUED, - created_before_sec=1000000000) - - status_dict = background_task_status.create_background_status() - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert not is_stable(status_dict['status']) - self.assert_unstable_dict(journal_csv_dict) - assert journal_csv_dict.get('oldest') is not None - - @apply_test_case_config(bg_monitor_queued_config__a) - def test_create_background_status__queued_valid_oldest(self): - save_mock_bgjob(JournalCSVBackgroundTask.__action__, - status=constants.BGJOB_STATUS_QUEUED, ) - - status_dict = background_task_status.create_background_status() - print(json.dumps(status_dict, indent=4)) - - journal_csv_dict = status_dict['queues']['main_queue']['queued'].get(JournalCSVBackgroundTask.__action__, {}) - - assert is_stable(status_dict['status']) - self.assert_stable_dict(journal_csv_dict) - assert journal_csv_dict.get('oldest') is not None diff --git a/doajtest/unit/test_background_task_status_parameterised.py b/doajtest/unit/test_background_task_status_parameterised.py new file mode 100644 index 0000000000..7b78538f90 --- /dev/null +++ b/doajtest/unit/test_background_task_status_parameterised.py @@ -0,0 +1,159 @@ +from parameterized import parameterized +from combinatrix.testintegration import load_parameter_sets + +from portality.models.background import BackgroundJob +from portality.lib.paths import rel2abs + +def load_cases(): + return load_parameter_sets(rel2abs(__file__, "..", "matrices", "background_task_status"), + "background_task_status", + "test_id", + {"test_id" : []}) + +import json + +from doajtest.fixtures.background import save_mock_bgjob +from doajtest.helpers import DoajTestCase, apply_test_case_config, patch_config +from portality import constants +from portality.bll import DOAJ +from portality.tasks.anon_export import AnonExportBackgroundTask +from portality.tasks.journal_csv import JournalCSVBackgroundTask + +background_task_status = DOAJ.backgroundTaskStatusService() + +# Configures the monitoring period and the allowed number of errors in that period before a queue is marked +# as unstable +BG_MONITOR_ERRORS_CONFIG = { + 'set_in_doaj': { + 'check_sec': 3000, + 'allowed_num_err': 0 + }, + 'anon_export': { + 'check_sec': 3000, + 'allowed_num_err': 0, + }, + 'journal_csv': { + 'check_sec': 3000, + 'allowed_num_err': 0 + } +} + +# Configures the total number of queued items and the age of the oldest of those queued items allowed +# before the queue is marked as unstable. This is provided by type, so we can monitor all types separately +BG_MONITOR_QUEUED_CONFIG = { + 'set_in_doaj': { + 'total': 1, + 'oldest': 3000, + }, + 'anon_export': { + 'total': 1, + 'oldest': 3000, + }, + 'journal_csv': { + 'total': 1, + 'oldest': 3000, + } +} + +BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG = { + 'anon_export': { + 'last_run_successful_in': 5000 + }, + 'set_in_doaj': { + 'last_run_successful_in': 5000 + }, + 'journal_csv': { + 'last_run_successful_in': 5000 + } +} + + +class TestBackgroundTaskStatus(DoajTestCase): + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + cls.org_config = patch_config(cls.app_test, { + 'HUEY_SCHEDULE': { + JournalCSVBackgroundTask.__action__: constants.CRON_NEVER, + AnonExportBackgroundTask.__action__: constants.CRON_NEVER, + }, + "BG_MONITOR_ERRORS_CONFIG": BG_MONITOR_ERRORS_CONFIG, + "BG_MONITOR_QUEUED_CONFIG": BG_MONITOR_QUEUED_CONFIG, + "BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG": BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG + }) + + @classmethod + def tearDownClass(cls) -> None: + super().tearDownClass() + patch_config(cls.app_test, cls.org_config) + + @parameterized.expand(load_cases) + def test_01_background_task_status(self, name, kwargs): + in_queue_arg = kwargs.get("in_queue") + oldest_queued_arg = kwargs.get("oldest_queued") + error_count_arg = kwargs.get("error_count") + error_age_arg = kwargs.get("error_age") + lrs_success_or_error_arg = kwargs.get("lrs_success_or_error") + queued_arg = kwargs.get("queued") + errors_arg = kwargs.get("errors") + lrs_arg = kwargs.get("lrs") + + in_queue = int(in_queue_arg) + oldest_queued = 3600 if oldest_queued_arg == "old" else 600 + error_count = int(error_count_arg) + error_age = 600 if error_age_arg == "in_period" else 3600 + + queues = [("events", "set_in_doaj"), + ("scheduled_long", "anon_export"), + ("scheduled_short", "journal_csv")] + + # set up + ########################################### + blocks = [] + for q, a in queues: + if in_queue > 0: + # create the number of jobs that should be in status "queued" + # their ages are set to the oldest allowed age + the index number for the purposes of disambiguation + for i in range(in_queue): + job = save_mock_bgjob(action=a, status="queued", created_before_sec=oldest_queued + i, is_save=True, blocking=False, queue_id=q) + blocks.append((job.id, job.last_updated)) + + if error_count > 0: + # create a single error job if the requested age + job = save_mock_bgjob(action=a, status="error", created_before_sec=error_age, is_save=True, blocking=False, queue_id=q) + blocks.append((job.id, job.last_updated)) + + if lrs_success_or_error_arg != "empty": + age = 4000 + # if there is an error that's being tested (see above), we need to make sure that the more recent job is "complete" + # to deactivate this test, if the lrs test is supposed to be "stable". + if error_count > 0 and lrs_success_or_error_arg == "complete": + age = 100 + job = save_mock_bgjob(action=a, status=lrs_success_or_error_arg, created_before_sec=age, is_save=True, blocking=False, queue_id=q) + blocks.append((job.id, job.last_updated)) + + BackgroundJob.blockall(blocks) + + # Execute + ########################################### + status = background_task_status.create_background_status() + + + # Assert + ########################################### + + for q, a in queues: + assert status['queues'][q]["errors"][a]["status"] == errors_arg + assert status['queues'][q]["queued"][a]["status"] == queued_arg + assert status['queues'][q]["last_run_successful"][a]["status"] == lrs_arg + + if "unstable" in [errors_arg, queued_arg, lrs_arg]: + assert status['queues'][q]["status"] == "unstable" + else: + assert status['queues'][q]["status"] == "stable" + + if "unstable" in [errors_arg, queued_arg, lrs_arg]: + assert status['status'] == "unstable" + else: + assert status['status'] == "stable" + diff --git a/doajtest/unit_tester/bgtask_tester.py b/doajtest/unit_tester/bgtask_tester.py index c5a02df971..3b3251cbb7 100644 --- a/doajtest/unit_tester/bgtask_tester.py +++ b/doajtest/unit_tester/bgtask_tester.py @@ -6,5 +6,6 @@ def test_queue_id_assigned(bgtask_class: Type[BackgroundTask]): job = bgtask_class.prepare('just a username') - assert job.queue_id in {constants.BGJOB_QUEUE_ID_MAIN, - constants.BGJOB_QUEUE_ID_LONG} + assert job.queue_id in {constants.BGJOB_QUEUE_ID_EVENTS, + constants.BGJOB_QUEUE_ID_SCHEDULED_LONG, + constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT} diff --git a/docs/dev/how-to-implement.md b/docs/dev/how-to-implement.md index b76f849d10..e468cc7c69 100644 --- a/docs/dev/how-to-implement.md +++ b/docs/dev/how-to-implement.md @@ -8,13 +8,13 @@ How to create a background job * choice a task queue, details of task queue can have find in `portality/tasks/redis_huey.py` ```python -huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue) ``` * add execute function below BackgroundTask class ```python -huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) @@ -76,5 +76,5 @@ HUEY_SCHEDULE = { ### Register your task -* add your execute and schedule function in `portality/tasks/consumer_long_running.py` - or `portality/tasks/consumer_main_queue.py` +* add your execute and/or schedule function in `portality/tasks/consumer_scheduled_long.py` + `portality/tasks/consumer_scheduled_short.py` or `portality/tasks/consumer_events_queue.py` diff --git a/docs/dev/user-guide/user-guide.md b/docs/dev/user-guide/user-guide.md index 143c09027b..2c9ba1d3d6 100644 --- a/docs/dev/user-guide/user-guide.md +++ b/docs/dev/user-guide/user-guide.md @@ -30,9 +30,9 @@ HUEY_SCHEDULE = { } ``` -* run your `main` background job consumer +* run your `scheduled_short` background job consumer ``` -~/venv/doaj/bin/huey_consumer.py portality.tasks.consumer_main_queue.main_queue +~/venv/doaj/bin/huey_consumer.py portality.tasks.consumer_scheduled_short_queue.scheduled_short_queue ``` * wait 10 ~ 30 minute for generate some background jobs diff --git a/portality/bll/services/background_task_status.py b/portality/bll/services/background_task_status.py index 3ebe187ff7..c928350287 100644 --- a/portality/bll/services/background_task_status.py +++ b/portality/bll/services/background_task_status.py @@ -4,8 +4,9 @@ import itertools from typing import Iterable -from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \ - BG_STATUS_STABLE, BG_STATUS_UNSTABLE +from portality import constants +# from portality.constants import BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN, BGJOB_QUEUE_ID_EVENTS, BGJOB_QUEUE_ID_SCHEDULED_LONG, BGJOB_QUEUE_ID_SCHEDULED_SHORT, BGJOB_STATUS_ERROR, BGJOB_STATUS_QUEUED, \ +# BG_STATUS_STABLE, BG_STATUS_UNSTABLE, BGJOB_STATUS_COMPLETE from portality.core import app from portality.lib import dates from portality.models.background import BackgroundJobQueryBuilder, BackgroundJob, SimpleBgjobQueue, \ @@ -21,11 +22,11 @@ class BackgroundTaskStatusService: @staticmethod def is_stable(val): - return val == BG_STATUS_STABLE + return val == constants.BG_STATUS_STABLE @staticmethod def to_bg_status_str(stable_val: bool) -> str: - return BG_STATUS_STABLE if stable_val else BG_STATUS_UNSTABLE + return constants.BG_STATUS_STABLE if stable_val else constants.BG_STATUS_UNSTABLE def all_stable(self, items: Iterable, field_name='status') -> bool: return all(self.is_stable(q.get(field_name)) for q in items) @@ -33,8 +34,48 @@ def all_stable(self, items: Iterable, field_name='status') -> bool: def all_stable_str(self, items: Iterable, field_name='status') -> str: return self.to_bg_status_str(self.all_stable(items, field_name)) + def create_last_successfully_run_status(self, action, last_run_successful_in=0, **_) -> dict: + if last_run_successful_in == 0: + return dict( + status=constants.BG_STATUS_STABLE, + last_run=None, + last_run_status=None, + err_msgs=[] + ) + + lr_query = (BackgroundJobQueryBuilder().action(action) + .since(dates.before_now(last_run_successful_in)) + .status_includes([constants.BGJOB_STATUS_COMPLETE, constants.BGJOB_STATUS_ERROR]) + .size(1) + .order_by('created_date', 'desc') + .build_query_dict()) + + lr_results = BackgroundJob.q2obj(q=lr_query) + lr_job = lr_results[0] if len(lr_results) > 0 else None + + status = constants.BG_STATUS_UNSTABLE + lr = None + last_run_status = None + msg = ["No background jobs completed or errored in the time period"] + + if lr_job is not None: + lr = lr_job.created_date + last_run_status = lr_job.status + if lr_job.status == constants.BGJOB_STATUS_COMPLETE: + status = constants.BG_STATUS_STABLE + msg = [] + else: + msg = ["Last job did not complete successfully"] + + return dict( + status=status, + last_run=lr, + last_run_status=last_run_status, + err_msgs=msg + ) + def create_errors_status(self, action, check_sec=3600, allowed_num_err=0, **_) -> dict: - in_monitoring_query = SimpleBgjobQueue(action, status=BGJOB_STATUS_ERROR, since=dates.before_now(check_sec)) + in_monitoring_query = SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR, since=dates.before_now(check_sec)) num_err_in_monitoring = BackgroundJob.hit_count(query=in_monitoring_query.query()) # prepare errors messages @@ -44,15 +85,15 @@ def create_errors_status(self, action, check_sec=3600, allowed_num_err=0, **_) - return dict( status=self.to_bg_status_str(not err_msgs), - total=BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=BGJOB_STATUS_ERROR).query()), + total=BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_ERROR).query()), in_monitoring_period=num_err_in_monitoring, err_msgs=err_msgs, ) def create_queued_status(self, action, total=2, oldest=1200, **_) -> dict: - total_queued = BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=BGJOB_STATUS_QUEUED).query()) + total_queued = BackgroundJob.hit_count(query=SimpleBgjobQueue(action, status=constants.BGJOB_STATUS_QUEUED).query()) oldest_query = (BackgroundJobQueryBuilder().action(action) - .status_includes(BGJOB_STATUS_QUEUED).size(1) + .status_includes(constants.BGJOB_STATUS_QUEUED).size(1) .order_by('created_date', 'asc') .build_query_dict()) oldest_jobs = list(BackgroundJob.q2obj(q=oldest_query)) @@ -92,6 +133,9 @@ def create_queues_status(self, queue_name) -> dict: queued = {action: self.create_queued_status(action, **config) for action, config in self.get_config_dict_by_queue_name('BG_MONITOR_QUEUED_CONFIG', queue_name).items()} + last_run = {action: self.create_last_successfully_run_status(action, **config) for action, config + in self.get_config_dict_by_queue_name('BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG', queue_name).items()} + # prepare for err_msgs limited_sec = app.config.get('BG_MONITOR_LAST_COMPLETED', {}).get(queue_name) if limited_sec is None: @@ -107,10 +151,11 @@ def create_queues_status(self, queue_name) -> dict: result_dict = dict( status=self.to_bg_status_str( - not err_msgs and self.all_stable(itertools.chain(errors.values(), queued.values()))), + not err_msgs and self.all_stable(itertools.chain(errors.values(), queued.values(), last_run.values()))), last_completed_job=last_completed_date and dates.format(last_completed_date), errors=errors, queued=queued, + last_run_successful=last_run, err_msgs=err_msgs, ) return result_dict @@ -129,7 +174,11 @@ def get_config_dict_by_queue_name(config_name, queue_name): def create_background_status(self) -> dict: queues = { queue_name: self.create_queues_status(queue_name) - for queue_name in [BGJOB_QUEUE_ID_LONG, BGJOB_QUEUE_ID_MAIN] + for queue_name in [constants.BGJOB_QUEUE_ID_LONG, + constants.BGJOB_QUEUE_ID_MAIN, + constants.BGJOB_QUEUE_ID_EVENTS, + constants.BGJOB_QUEUE_ID_SCHEDULED_LONG, + constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT] } result_dict = dict( diff --git a/portality/constants.py b/portality/constants.py index 4372aa503c..df0ed428d9 100644 --- a/portality/constants.py +++ b/portality/constants.py @@ -105,6 +105,9 @@ BGJOB_QUEUE_ID_LONG = 'long_running' BGJOB_QUEUE_ID_MAIN = 'main_queue' BGJOB_QUEUE_ID_UNKNOWN = 'unknown' +BGJOB_QUEUE_ID_EVENTS = "events" +BGJOB_QUEUE_ID_SCHEDULED_SHORT = "scheduled_short" +BGJOB_QUEUE_ID_SCHEDULED_LONG = "scheduled_long" # Background monitor status BG_STATUS_STABLE = 'stable' diff --git a/portality/scripts/priorities.csv b/portality/scripts/priorities.csv index f5e5cdcc59..bae43cd738 100644 --- a/portality/scripts/priorities.csv +++ b/portality/scripts/priorities.csv @@ -10,7 +10,7 @@ HP/PfT,"Priority: High, Workflow: Pending for Test",Review HP/rev,Priority: High,Review PfT,Workflow: Pending for Test,Review PfL,Workflow: Pending for Live,Review -Inv,Workflow: Initial Investigation,"Review, In Progress, To Do" +Inv,Workflow: Initial Investigation,"Review, In progress, To Do" Rev,,Review Near,Scale: Nearly Finished, Sch,Priority: Scheduled, diff --git a/portality/settings.py b/portality/settings.py index 9d6e313e6b..bc3282d6ad 100644 --- a/portality/settings.py +++ b/portality/settings.py @@ -442,8 +442,7 @@ "old_data_cleanup": {"month": "*", "day": "12", "day_of_week": "*", "hour": "6", "minute": "30"}, "monitor_bgjobs": {"month": "*", "day": "*/6", "day_of_week": "*", "hour": "10", "minute": "0"}, "find_discontinued_soon": {"month": "*", "day": "*", "day_of_week": "*", "hour": "0", "minute": "3"}, - "datalog_journal_added_update": {"month": "*", "day": "*", "day_of_week": "*", "hour": "*", "minute": "*/30"}, - "article_bulk_create": {"month": "*", "day": "*", "day_of_week": "*", "hour": "*", "minute": "20"}, + "datalog_journal_added_update": {"month": "*", "day": "*", "day_of_week": "*", "hour": "*", "minute": "*/30"} } @@ -1389,39 +1388,74 @@ # Background monitor # ~~->BackgroundMonitor:Feature~~ +# some time period for convenience +_MIN = 60 +_HOUR = 3600 +_DAY = 24 * _HOUR +_WEEK = 7 * _DAY + # Configures the age of the last completed job on the queue before the queue is marked as unstable # (in seconds) BG_MONITOR_LAST_COMPLETED = { - 'main_queue': 7200, # 2 hours - 'long_running': 93600, # 26 hours + 'events': 2 * _HOUR, # 2 hours + 'scheduled_short': 2 * _HOUR, # 2 hours + 'scheduled_long': _DAY + 2 * _HOUR, # 26 hours } # Default monitoring config for background job types which are not enumerated in BG_MONITOR_ERRORS_CONFIG below BG_MONITOR_DEFAULT_CONFIG = { + ## default values for queued config + + # the total number of items that are allowed to be in `queued` state at the same time. + # Any more than this and the result is flagged 'total': 2, - 'oldest': 1200, + + # The age of the oldest record allowed to be in the `queued` state. + # If the oldest queued item was created before this, the result is flagged + 'oldest': 20 * _MIN, + + ## default values for error config + + # The time period over which to check for errors, from now to now - check_sec + 'check_sec': _HOUR, + + # The number of errors allowed in the check period before the result is flagged + 'allowed_num_err': 0, + + # The last time this job ran within the specified time period, was it successful. + # If the most recent job in the timeframe is an error, this will trigger an "unstable" state (0 turns this off) + 'last_run_successful_in': 0 } # Configures the monitoring period and the allowed number of errors in that period before a queue is marked # as unstable BG_MONITOR_ERRORS_CONFIG = { - # Main queue - 'journal_csv': { - 'check_sec': 3600, # 1 hour, time period between scheduled runs + 'anon_export': { + 'check_sec': _WEEK, # a week + 'allowed_num_err': 0 + }, + 'article_bulk_create': { + 'check_sec': _DAY, # 1 day 'allowed_num_err': 0, }, - 'ingest_articles': { - 'check_sec': 86400, + 'article_cleanup_sync': { + 'check_sec': 2 * _DAY, # 2 days 'allowed_num_err': 0 }, - - # Long running 'harvest': { - 'check_sec': 86400, + 'check_sec': _DAY, 'allowed_num_err': 0, }, + 'ingest_articles': { + 'check_sec': _DAY, + 'allowed_num_err': 0 + }, + 'journal_csv': { + 'check_sec': 3 * _HOUR, + 'allowed_num_err': 1, + }, 'public_data_dump': { - 'check_sec': 86400 * 7, + 'check_sec': 2 * _HOUR, 'allowed_num_err': 0 } } @@ -1429,24 +1463,92 @@ # Configures the total number of queued items and the age of the oldest of those queued items allowed # before the queue is marked as unstable. This is provided by type, so we can monitor all types separately BG_MONITOR_QUEUED_CONFIG = { - # Main queue - 'journal_csv': { - 'total': 2, - 'oldest': 1200, # 20 mins + 'anon_export': { + 'total': 1, + 'oldest': 20 * _MIN }, - 'ingest_articles': { - 'total': 250, - 'oldest': 86400 + 'article_bulk_create': { + 'total': 3, + 'oldest': 10 * _MIN }, - - # Long running 'harvest': { 'total': 1, - 'oldest': 86400 + 'oldest': _DAY + }, + 'ingest_articles': { + 'total': 10, + 'oldest': 10 * _MIN + }, + 'journal_bulk_edit': { + 'total': 2, + 'oldest': 10 * _MIN + }, + 'journal_csv': { + 'total': 1, + 'oldest': 20 * _MIN }, 'public_data_dump': { 'total': 1, - 'oldest': 86400 + 'oldest': _DAY + }, + 'set_in_doaj': { + 'total': 2, + 'oldest': 10 * _MIN + }, + 'suggestion_bulk_edit': { + 'total': 2, + 'oldest': 10 * _MIN + } +} + +BG_MONITOR_LAST_SUCCESSFULLY_RUN_CONFIG = { + 'anon_export': { + 'last_run_successful_in': 32 * _DAY + }, + 'article_cleanup_sync': { + 'last_run_successful_in': 33 * _DAY + }, + 'async_workflow_notifications': { + 'last_run_successful_in': _WEEK + _DAY + }, + 'check_latest_es_backup': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'datalog_journal_added_update': { + 'last_run_successful_in': _HOUR + }, + 'find_discontinued_soon': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'harvest': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'journal_csv': { + 'last_run_successful_in': 2 * _HOUR + }, + 'monitor_bgjobs': { + 'last_run_successful_in': _WEEK + _DAY + }, + 'old_data_cleanup': { + 'last_run_successful_in': 32 * _DAY + }, + 'prune_es_backups': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'public_data_dump': { + 'last_run_successful_in': 32 * _DAY + }, + 'read_news': { + 'last_run_successful_in': 2 * _HOUR + }, + 'reporting': { + 'last_run_successful_in': 32 * _DAY + }, + 'request_es_backup': { + 'last_run_successful_in': _DAY + _HOUR + }, + 'sitemap': { + 'last_run_successful_in': _DAY + _HOUR } } diff --git a/portality/tasks/anon_export.py b/portality/tasks/anon_export.py index 234fc7727f..f9765c381d 100644 --- a/portality/tasks/anon_export.py +++ b/portality/tasks/anon_export.py @@ -13,7 +13,7 @@ from portality.lib.dataobj import DataStructureException from portality.store import StoreFactory from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue def _anonymise_email(record): @@ -182,7 +182,7 @@ def submit(cls, background_job): anon_export.schedule(args=(background_job.id,), delay=10) -huey_helper = AnonExportBackgroundTask.create_huey_helper(long_running) +huey_helper = AnonExportBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/application_autochecks.py b/portality/tasks/application_autochecks.py index 8566f31365..7cfbe5b838 100644 --- a/portality/tasks/application_autochecks.py +++ b/portality/tasks/application_autochecks.py @@ -2,7 +2,7 @@ from portality import models from portality.background import BackgroundTask, BackgroundApi, BackgroundException from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.bll import DOAJ @@ -88,7 +88,7 @@ def submit(cls, background_job): application_autochecks.schedule(args=(background_job.id,), delay=10) -huey_helper = ApplicationAutochecks.create_huey_helper(main_queue) +huey_helper = ApplicationAutochecks.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/article_bulk_create.py b/portality/tasks/article_bulk_create.py index 3066ecaefe..8ffaf7d84e 100644 --- a/portality/tasks/article_bulk_create.py +++ b/portality/tasks/article_bulk_create.py @@ -9,7 +9,7 @@ from portality.lib import dataobj from portality.models.uploads import BulkArticles from portality.tasks.helpers import background_helper, articles_upload_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue def get_upload_dir_path() -> Path: @@ -78,7 +78,7 @@ def submit(cls, background_job): background_helper.submit_by_background_job(background_job, article_bulk_create) -huey_helper = ArticleBulkCreateBackgroundTask.create_huey_helper(main_queue) +huey_helper = ArticleBulkCreateBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/article_bulk_delete.py b/portality/tasks/article_bulk_delete.py index 1237c44759..28059bd741 100644 --- a/portality/tasks/article_bulk_delete.py +++ b/portality/tasks/article_bulk_delete.py @@ -5,7 +5,7 @@ from portality import models from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.util import batch_up @@ -126,7 +126,7 @@ def submit(cls, background_job): article_bulk_delete.schedule(args=(background_job.id,), delay=10) -huey_helper = ArticleBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = ArticleBulkDeleteBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/article_cleanup_sync.py b/portality/tasks/article_cleanup_sync.py index cc519a7ea2..ccd9d31ff6 100644 --- a/portality/tasks/article_cleanup_sync.py +++ b/portality/tasks/article_cleanup_sync.py @@ -11,7 +11,7 @@ from portality.background import BackgroundTask, BackgroundApi, BackgroundException from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class ArticleCleanupSyncBackgroundTask(BackgroundTask): @@ -216,7 +216,7 @@ def submit(cls, background_job): article_cleanup_sync.schedule(args=(background_job.id,), delay=10) -huey_helper = ArticleCleanupSyncBackgroundTask.create_huey_helper(long_running) +huey_helper = ArticleCleanupSyncBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/article_duplicate_report.py b/portality/tasks/article_duplicate_report.py index 04419674e6..e6624ee81a 100644 --- a/portality/tasks/article_duplicate_report.py +++ b/portality/tasks/article_duplicate_report.py @@ -12,7 +12,7 @@ from portality.bll.doaj import DOAJ from portality.core import app, es_connection from portality.lib import dates -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class ArticleDuplicateReportBackgroundTask(BackgroundTask): @@ -286,7 +286,7 @@ def submit(cls, background_job): article_duplicate_report.schedule(args=(background_job.id,), delay=10) -huey_helper = ArticleDuplicateReportBackgroundTask.create_huey_helper(long_running) +huey_helper = ArticleDuplicateReportBackgroundTask.create_huey_helper(queue) ''' @long_running.periodic_task(schedule("article_duplicate_report")) diff --git a/portality/tasks/async_workflow_notifications.py b/portality/tasks/async_workflow_notifications.py index 70de3eb70e..07da446bd7 100644 --- a/portality/tasks/async_workflow_notifications.py +++ b/portality/tasks/async_workflow_notifications.py @@ -10,10 +10,9 @@ from portality.lib import dates from portality.lib.dates import FMT_DATETIME_STD from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue, schedule +from portality.tasks.redis_huey import scheduled_short_queue as queue, schedule from portality.ui import templates - class AgeQuery(object): def __init__(self, newest_date, status_filters): self._newest_date = newest_date @@ -430,7 +429,7 @@ def submit(cls, background_job): async_workflow_notifications.schedule(args=(background_job.id,), delay=10) -huey_helper = AsyncWorkflowBackgroundTask.create_huey_helper(main_queue) +huey_helper = AsyncWorkflowBackgroundTask.create_huey_helper(queue) @huey_helper.task_queue.periodic_task(schedule("async_workflow_notifications")) diff --git a/portality/tasks/check_latest_es_backup.py b/portality/tasks/check_latest_es_backup.py index 890d914e96..cfd2ff277d 100644 --- a/portality/tasks/check_latest_es_backup.py +++ b/portality/tasks/check_latest_es_backup.py @@ -5,7 +5,7 @@ from portality.background import BackgroundTask, BackgroundApi from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue class CheckLatestESBackupBackgroundTask(BackgroundTask): @@ -65,7 +65,7 @@ def submit(cls, background_job): check_latest_es_backup.schedule(args=(background_job.id,), delay=10) -huey_helper = CheckLatestESBackupBackgroundTask.create_huey_helper(main_queue) +huey_helper = CheckLatestESBackupBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/consumer_events_queue.py b/portality/tasks/consumer_events_queue.py new file mode 100644 index 0000000000..0308bc3661 --- /dev/null +++ b/portality/tasks/consumer_events_queue.py @@ -0,0 +1,26 @@ +# NOTE: this file is designed to be imported by Huey, the background job processor +# It changes the logging configuration. If it's imported anywhere else in the app, +# it will change the logging configuration for the entire app. +import logging +from portality.core import app + +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + +# import the queues +from portality.tasks.redis_huey import events_queue # noqa + +# now import the tasks that will bind to those queues + +from portality.tasks.article_bulk_create import article_bulk_create # noqa +from portality.tasks.article_bulk_delete import article_bulk_delete # noqa +from portality.tasks.ingestarticles import ingest_articles # noqa +from portality.tasks.journal_bulk_delete import journal_bulk_delete # noqa +from portality.tasks.journal_bulk_edit import journal_bulk_edit # noqa +from portality.tasks.preservation import preserve # noqa +from portality.tasks.journal_in_out_doaj import set_in_doaj # noqa +from portality.tasks.suggestion_bulk_edit import suggestion_bulk_edit # noqa + +# Conditionally enable new application autochecking +if app.config.get("AUTOCHECK_INCOMING", False): + from portality.tasks.application_autochecks import application_autochecks diff --git a/portality/tasks/consumer_scheduled_long_queue.py b/portality/tasks/consumer_scheduled_long_queue.py new file mode 100644 index 0000000000..f04b4a11c2 --- /dev/null +++ b/portality/tasks/consumer_scheduled_long_queue.py @@ -0,0 +1,17 @@ +# NOTE: this file is designed to be imported by Huey, the background job processor +# It changes the logging configuration. If it's imported anywhere else in the app, +# it will change the logging configuration for the entire app. +import logging + +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + +# import the queues +from portality.tasks.redis_huey import scheduled_long_queue # noqa + + +from portality.tasks.anon_export import scheduled_anon_export, anon_export # noqa +from portality.tasks.article_cleanup_sync import scheduled_article_cleanup_sync, article_cleanup_sync # noqa +from portality.tasks.harvester import scheduled_harvest # noqa +from portality.tasks.public_data_dump import scheduled_public_data_dump, public_data_dump # noqa +from portality.tasks.sitemap import scheduled_sitemap, generate_sitemap # noqa diff --git a/portality/tasks/consumer_scheduled_short_queue.py b/portality/tasks/consumer_scheduled_short_queue.py new file mode 100644 index 0000000000..bae97df34e --- /dev/null +++ b/portality/tasks/consumer_scheduled_short_queue.py @@ -0,0 +1,24 @@ +# NOTE: this file is designed to be imported by Huey, the background job processor +# It changes the logging configuration. If it's imported anywhere else in the app, +# it will change the logging configuration for the entire app. +import logging +from portality.core import app + +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + +# import the queues +from portality.tasks.redis_huey import scheduled_short_queue # noqa + +# now import the tasks that will bind to those queues +from portality.tasks.async_workflow_notifications import async_workflow_notifications # noqa +from portality.tasks.check_latest_es_backup import scheduled_check_latest_es_backup, check_latest_es_backup # noqa +from portality.tasks.datalog_journal_added_update import scheduled_datalog_journal_added_update, datalog_journal_added_update # noqa +from portality.tasks.find_discontinued_soon import scheduled_find_discontinued_soon, find_discontinued_soon # noqa +from portality.tasks.journal_csv import scheduled_journal_csv, journal_csv # noqa +from portality.tasks.monitor_bgjobs import scheduled_monitor_bgjobs, monitor_bgjobs # noqa +from portality.tasks.old_data_cleanup import scheduled_old_data_cleanup, old_data_cleanup # noqa +from portality.tasks.prune_es_backups import scheduled_prune_es_backups, prune_es_backups # noqa +from portality.tasks.read_news import scheduled_read_news, read_news # noqa +from portality.tasks.reporting import scheduled_reports, run_reports # noqa +from portality.tasks.request_es_backup import scheduled_request_es_backup, request_es_backup # noqa diff --git a/portality/tasks/datalog_journal_added_update.py b/portality/tasks/datalog_journal_added_update.py index 1d967b4ee3..a56259d509 100644 --- a/portality/tasks/datalog_journal_added_update.py +++ b/portality/tasks/datalog_journal_added_update.py @@ -25,7 +25,7 @@ from portality.models import Journal from portality.models.datalog_journal_added import DatalogJournalAdded from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue log = logging.getLogger(__name__) @@ -242,7 +242,7 @@ def submit(cls, background_job): background_helper.submit_by_background_job(background_job, datalog_journal_added_update) -huey_helper = DatalogJournalAddedUpdate.create_huey_helper(main_queue) +huey_helper = DatalogJournalAddedUpdate.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/find_discontinued_soon.py b/portality/tasks/find_discontinued_soon.py index 379618dc0a..2f7491221f 100644 --- a/portality/tasks/find_discontinued_soon.py +++ b/portality/tasks/find_discontinued_soon.py @@ -3,7 +3,7 @@ from portality.lib import dates from portality import models -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue from portality.background import BackgroundTask, BackgroundApi from portality.tasks.helpers import background_helper @@ -102,7 +102,7 @@ def submit(cls, background_job): find_discontinued_soon.schedule(args=(background_job.id,), delay=10) -huey_helper = FindDiscontinuedSoonBackgroundTask.create_huey_helper(main_queue) +huey_helper = FindDiscontinuedSoonBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/harvester.py b/portality/tasks/harvester.py index 730f12a427..bda25db3da 100644 --- a/portality/tasks/harvester.py +++ b/portality/tasks/harvester.py @@ -8,7 +8,7 @@ from portality.store import StoreFactory from portality.tasks.harvester_helpers import workflow from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class BGHarvesterLogger(object): @@ -111,7 +111,7 @@ def only_me(self): return False -huey_helper = HarvesterBackgroundTask.create_huey_helper(long_running) +huey_helper = HarvesterBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/helpers/background_helper.py b/portality/tasks/helpers/background_helper.py index 5af71afe5b..451ec73f68 100644 --- a/portality/tasks/helpers/background_helper.py +++ b/portality/tasks/helpers/background_helper.py @@ -12,7 +12,7 @@ from portality.background import BackgroundApi, BackgroundTask from portality.core import app from portality.decorators import write_required -from portality.tasks.redis_huey import long_running, main_queue, configure, schedule +from portality.tasks.redis_huey import long_running, main_queue, events_queue, scheduled_long_queue, scheduled_short_queue, configure, schedule TaskFactory = Callable[[models.BackgroundJob], BackgroundTask] _queue_for_action = None @@ -25,6 +25,12 @@ def get_queue_id_by_task_queue(task_queue: RedisHuey): return constants.BGJOB_QUEUE_ID_LONG elif task_queue.name == main_queue.name: return constants.BGJOB_QUEUE_ID_MAIN + elif task_queue.name == events_queue.name: + return constants.BGJOB_QUEUE_ID_EVENTS + elif task_queue.name == scheduled_long_queue.name: + return constants.BGJOB_QUEUE_ID_SCHEDULED_LONG + elif task_queue.name == scheduled_short_queue.name: + return constants.BGJOB_QUEUE_ID_SCHEDULED_SHORT else: app.logger.warning(f'unknown task_queue[{task_queue}]') return constants.BGJOB_QUEUE_ID_UNKNOWN diff --git a/portality/tasks/ingestarticles.py b/portality/tasks/ingestarticles.py index ec7d9551d0..b01fa6caa2 100644 --- a/portality/tasks/ingestarticles.py +++ b/portality/tasks/ingestarticles.py @@ -12,7 +12,7 @@ from portality.core import app from portality.lib import plugin from portality.tasks.helpers import background_helper, articles_upload_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.ui.messages import Messages DEFAULT_MAX_REMOTE_SIZE = 262144000 @@ -504,7 +504,7 @@ def __fail(record, previous, error): return __fail(record, previous, error="please check it before submitting again; " + str(e)) -huey_helper = IngestArticlesBackgroundTask.create_huey_helper(main_queue) +huey_helper = IngestArticlesBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/journal_autochecks.py b/portality/tasks/journal_autochecks.py index 5403ac6b83..422c0189cc 100644 --- a/portality/tasks/journal_autochecks.py +++ b/portality/tasks/journal_autochecks.py @@ -1,7 +1,7 @@ from portality import models from portality.background import BackgroundTask, BackgroundApi, BackgroundException from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import events_queue as queue from portality.bll import DOAJ ####################################### @@ -81,7 +81,7 @@ def submit(cls, background_job): journal_autochecks.schedule(args=(background_job.id,), delay=10) -huey_helper = JournalAutochecks.create_huey_helper(long_running) +huey_helper = JournalAutochecks.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/journal_bulk_delete.py b/portality/tasks/journal_bulk_delete.py index a237c44a9b..d485a1a6f5 100644 --- a/portality/tasks/journal_bulk_delete.py +++ b/portality/tasks/journal_bulk_delete.py @@ -9,7 +9,7 @@ from portality.bll import DOAJ from portality.core import app from portality.lib import dates -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.ui.messages import Messages @@ -154,7 +154,7 @@ def submit(cls, background_job): journal_bulk_delete.schedule(args=(background_job.id,), delay=10) -huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkDeleteBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/journal_bulk_edit.py b/portality/tasks/journal_bulk_edit.py index 3e39d4388e..355f8c0ce0 100644 --- a/portality/tasks/journal_bulk_edit.py +++ b/portality/tasks/journal_bulk_edit.py @@ -10,7 +10,7 @@ from portality.forms.application_forms import JournalFormFactory from portality.lib import dates -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.background import AdminBackgroundTask, BackgroundApi, BackgroundException, BackgroundSummary @@ -238,7 +238,7 @@ def submit(cls, background_job): journal_bulk_edit.schedule(args=(background_job.id,), delay=10) -huey_helper = JournalBulkEditBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalBulkEditBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/journal_csv.py b/portality/tasks/journal_csv.py index 153a13735e..af9e6dd4e8 100644 --- a/portality/tasks/journal_csv.py +++ b/portality/tasks/journal_csv.py @@ -3,7 +3,7 @@ from portality.bll.doaj import DOAJ from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue class JournalCSVBackgroundTask(BackgroundTask): @@ -65,7 +65,7 @@ def submit(cls, background_job): journal_csv.schedule(args=(background_job.id,), delay=10) -huey_helper = JournalCSVBackgroundTask.create_huey_helper(main_queue) +huey_helper = JournalCSVBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/journal_in_out_doaj.py b/portality/tasks/journal_in_out_doaj.py index 81a5bfbe59..8abdf02bb8 100644 --- a/portality/tasks/journal_in_out_doaj.py +++ b/portality/tasks/journal_in_out_doaj.py @@ -3,7 +3,7 @@ from portality.bll import DOAJ from portality.core import app -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue from portality.decorators import write_required from portality.background import BackgroundTask, BackgroundApi, BackgroundSummary @@ -143,7 +143,7 @@ def submit(cls, background_job): set_in_doaj.schedule(args=(background_job.id,), delay=10) -huey_helper = SetInDOAJBackgroundTask.create_huey_helper(main_queue) +huey_helper = SetInDOAJBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False) diff --git a/portality/tasks/monitor_bgjobs.py b/portality/tasks/monitor_bgjobs.py index 2220faf2de..ee4f474d50 100644 --- a/portality/tasks/monitor_bgjobs.py +++ b/portality/tasks/monitor_bgjobs.py @@ -4,7 +4,7 @@ from portality.background import BackgroundTask from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_short_queue as queue def get_system_email(): @@ -78,7 +78,7 @@ def submit(cls, background_job): background_helper.submit_by_background_job(background_job, monitor_bgjobs) -huey_helper = MonitorBgjobsBackgroundTask.create_huey_helper(long_running) +huey_helper = MonitorBgjobsBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/old_data_cleanup.py b/portality/tasks/old_data_cleanup.py index 219feffcd5..38a1cb0833 100644 --- a/portality/tasks/old_data_cleanup.py +++ b/portality/tasks/old_data_cleanup.py @@ -8,7 +8,7 @@ from portality.lib.es_queries import ES_DATETIME_FMT from portality.models import Notification, BackgroundJob from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_short_queue as queue class RetentionQuery: @@ -83,7 +83,7 @@ def submit(cls, background_job): ) -huey_helper = OldDataCleanupBackgroundTask.create_huey_helper(long_running) +huey_helper = OldDataCleanupBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/preservation.py b/portality/tasks/preservation.py index 4fc1cf09b9..3508413e9e 100644 --- a/portality/tasks/preservation.py +++ b/portality/tasks/preservation.py @@ -17,7 +17,7 @@ from portality.models import Account, Article, BackgroundJob, PreservationState from portality.regex import DOI_COMPILED, HTTP_URL_COMPILED from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue class PreservationException(Exception): @@ -425,7 +425,7 @@ def submit(cls, background_job): preserve.schedule(args=(background_job.id,), delay=10) -huey_helper = PreservationBackgroundTask.create_huey_helper(main_queue) +huey_helper = PreservationBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=True) diff --git a/portality/tasks/prune_es_backups.py b/portality/tasks/prune_es_backups.py index 2015cc8849..5352dc19cb 100644 --- a/portality/tasks/prune_es_backups.py +++ b/portality/tasks/prune_es_backups.py @@ -8,7 +8,7 @@ from portality.core import app from portality.lib import dates from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_short_queue as queue # FIXME: update for index-per-type @@ -78,7 +78,7 @@ def submit(cls, background_job): prune_es_backups.schedule(args=(background_job.id,), delay=10) -huey_helper = PruneESBackupsBackgroundTask.create_huey_helper(long_running) +huey_helper = PruneESBackupsBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/public_data_dump.py b/portality/tasks/public_data_dump.py index a67bf17cf6..5d6e4d6203 100644 --- a/portality/tasks/public_data_dump.py +++ b/portality/tasks/public_data_dump.py @@ -10,7 +10,7 @@ from portality.models import cache from portality.store import StoreFactory from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import long_running +from portality.tasks.redis_huey import scheduled_long_queue as queue class PublicDataDumpBackgroundTask(BackgroundTask): @@ -247,7 +247,7 @@ def submit(cls, background_job): public_data_dump.schedule(args=(background_job.id,), delay=10) -huey_helper = PublicDataDumpBackgroundTask.create_huey_helper(long_running) +huey_helper = PublicDataDumpBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/read_news.py b/portality/tasks/read_news.py index 35a2c314da..20f92d8645 100644 --- a/portality/tasks/read_news.py +++ b/portality/tasks/read_news.py @@ -4,7 +4,7 @@ from portality.background import BackgroundTask, BackgroundApi from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue class FeedError(Exception): @@ -95,7 +95,7 @@ def save_entry(entry): news.save() -huey_helper = ReadNewsBackgroundTask.create_huey_helper(main_queue) +huey_helper = ReadNewsBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/redis_huey.py b/portality/tasks/redis_huey.py index b3dc3ba5bf..5522e1d66b 100644 --- a/portality/tasks/redis_huey.py +++ b/portality/tasks/redis_huey.py @@ -2,15 +2,33 @@ from portality.core import app # every-day background jobs that take a few minutes each (like, bulk deletes and anything else requested by the user) +# DEPRECATED main_queue = RedisHuey('doaj_main_queue', host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], always_eager=app.config.get("HUEY_EAGER", False)) # jobs that might take a long time, like the harvester or the anon export, which can run for several hours +# DEPRECATED long_running = RedisHuey('doaj_long_running', host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], always_eager=app.config.get("HUEY_EAGER", False)) + +# short jobs to be run on demand from within the application +events_queue = RedisHuey('doaj_events_queue', + host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], + always_eager=app.config.get("HUEY_EAGER", False)) + +# scheduled jobs that can run for several hours each +scheduled_long_queue = RedisHuey('doaj_scheduled_long_queue', + host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], + always_eager=app.config.get("HUEY_EAGER", False)) + +# scheduled jobs that will typically run within a few minutes +scheduled_short_queue = RedisHuey('doaj_scheduled_short_queue', + host=app.config['REDIS_HOST'], port=app.config['REDIS_PORT'], + always_eager=app.config.get("HUEY_EAGER", False)) + """ we put everything we want to be responsive onto the main_queue, and anything that would disrupt the main_queue by taking too diff --git a/portality/tasks/reporting.py b/portality/tasks/reporting.py index cb479347a2..5c127bcc82 100644 --- a/portality/tasks/reporting.py +++ b/portality/tasks/reporting.py @@ -11,7 +11,7 @@ from portality.lib import dates from portality.lib.dates import DEFAULT_TIMESTAMP_VAL, FMT_DATE_STD, FMT_DATE_YM, FMT_YEAR, FMT_DATETIME_STD from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue def provenance_reports(fr, to, outdir): @@ -411,7 +411,7 @@ def submit(cls, background_job): run_reports.schedule(args=(background_job.id,), delay=10) -huey_helper = ReportingBackgroundTask.create_huey_helper(main_queue) +huey_helper = ReportingBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/request_es_backup.py b/portality/tasks/request_es_backup.py index 57a077a1fd..00c27af0c0 100644 --- a/portality/tasks/request_es_backup.py +++ b/portality/tasks/request_es_backup.py @@ -5,7 +5,7 @@ from portality.background import BackgroundTask, BackgroundApi from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_short_queue as queue # FIXME: update for index-per-type @@ -76,7 +76,7 @@ def submit(cls, background_job): request_es_backup.schedule(args=(background_job.id,), delay=10) -huey_helper = RequestESBackupBackgroundTask.create_huey_helper(main_queue) +huey_helper = RequestESBackupBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/sitemap.py b/portality/tasks/sitemap.py index c888f55036..177dcea909 100644 --- a/portality/tasks/sitemap.py +++ b/portality/tasks/sitemap.py @@ -3,7 +3,7 @@ from portality.bll.doaj import DOAJ from portality.core import app from portality.tasks.helpers import background_helper -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import scheduled_long_queue as queue class SitemapBackgroundTask(BackgroundTask): @@ -64,7 +64,7 @@ def submit(cls, background_job): generate_sitemap.schedule(args=(background_job.id,), delay=10) -huey_helper = SitemapBackgroundTask.create_huey_helper(main_queue) +huey_helper = SitemapBackgroundTask.create_huey_helper(queue) @huey_helper.register_schedule diff --git a/portality/tasks/suggestion_bulk_edit.py b/portality/tasks/suggestion_bulk_edit.py index 78ba37c632..1924091704 100644 --- a/portality/tasks/suggestion_bulk_edit.py +++ b/portality/tasks/suggestion_bulk_edit.py @@ -9,7 +9,7 @@ from portality.forms.application_forms import ApplicationFormFactory from portality.lib import dates from portality.lib.formulaic import FormulaicException -from portality.tasks.redis_huey import main_queue +from portality.tasks.redis_huey import events_queue as queue def suggestion_manage(selection_query, dry_run=True, editor_group='', note='', application_status=''): @@ -204,7 +204,7 @@ def submit(cls, background_job): suggestion_bulk_edit.schedule(args=(background_job.id,), delay=10) -huey_helper = SuggestionBulkEditBackgroundTask.create_huey_helper(main_queue) +huey_helper = SuggestionBulkEditBackgroundTask.create_huey_helper(queue) @huey_helper.register_execute(is_load_config=False)