diff --git a/django_project/core/celery.py b/django_project/core/celery.py index 1fc56d7..ca0ae73 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -52,6 +52,10 @@ 'task': 'remove_layers', 'schedule': crontab(minute='0', hour='1'), # Run everyday at 1am }, + 'check-scenario-task': { + 'task': 'check_scenario_task', + 'schedule': crontab(hour='*'), # Run every hour + }, 'clean-multipart-upload': { 'task': 'clean_multipart_upload', 'schedule': crontab(minute='0', hour='2'), # Run everyday at 2am diff --git a/django_project/core/migrations/0005_sitepreferences_layer_days_to_keep.py b/django_project/core/migrations/0005_sitepreferences_layer_days_to_keep.py index 5170f8f..c5aeec1 100644 --- a/django_project/core/migrations/0005_sitepreferences_layer_days_to_keep.py +++ b/django_project/core/migrations/0005_sitepreferences_layer_days_to_keep.py @@ -15,4 +15,4 @@ class Migration(migrations.Migration): name='layer_days_to_keep', field=models.IntegerField(default=14, help_text='Keep input/output layers until X days.'), ), - ] + ] \ No newline at end of file diff --git a/django_project/core/migrations/0006_sitepreferences_task_runtime_threshold.py b/django_project/core/migrations/0006_sitepreferences_task_runtime_threshold.py new file mode 100644 index 0000000..3c259e8 --- /dev/null +++ b/django_project/core/migrations/0006_sitepreferences_task_runtime_threshold.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.7 on 2024-06-03 06:00 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0005_sitepreferences_layer_days_to_keep'), + ] + + operations = [ + migrations.AddField( + model_name='sitepreferences', + name='task_runtime_threshold', + field=models.PositiveIntegerField(default=120, help_text="Max minutes to pass before a task is marked 'Stopped with error'"), + ), + ] diff --git a/django_project/core/models/base_task_request.py b/django_project/core/models/base_task_request.py index 8b3a713..1c14c32 100644 --- a/django_project/core/models/base_task_request.py +++ b/django_project/core/models/base_task_request.py @@ -215,6 +215,7 @@ def task_on_errors(self, exception=None, traceback=None): self.errors = ex_msg self.add_log('Task is stopped with errors.', logging.ERROR) self.add_log(str(exception), logging.ERROR) + self.save( update_fields=['last_update', 'status', 'errors'] ) diff --git a/django_project/core/models/preferences.py b/django_project/core/models/preferences.py index f445997..1b3bb83 100644 --- a/django_project/core/models/preferences.py +++ b/django_project/core/models/preferences.py @@ -47,6 +47,13 @@ class SitePreferences(SingletonModel): blank=True, help_text='Output group to keep from automatic removal.' ) + task_runtime_threshold = models.PositiveIntegerField( + default=120, + help_text=( + "Max minutes to pass before a task " + "is marked 'Stopped with error'" + ) + ) layer_days_to_keep = models.IntegerField( default=14, help_text='Keep input/output layers until X days.' diff --git a/django_project/cplus_api/tasks/__init__.py b/django_project/cplus_api/tasks/__init__.py index 1aa6d21..f821caa 100644 --- a/django_project/cplus_api/tasks/__init__.py +++ b/django_project/cplus_api/tasks/__init__.py @@ -1,3 +1,3 @@ -from .cleaner import * # noqa +from .check_scenario_task import * # noqa from .runner import * # noqa from .remove_layers import * # noqa diff --git a/django_project/cplus_api/tasks/check_scenario_task.py b/django_project/cplus_api/tasks/check_scenario_task.py new file mode 100644 index 0000000..4ae3ad8 --- /dev/null +++ b/django_project/cplus_api/tasks/check_scenario_task.py @@ -0,0 +1,71 @@ +"""Task to remove resources.""" + +import logging + +from celery import shared_task +from django.contrib.contenttypes.models import ContentType +from django.utils import timezone + +from core.models.base_task_request import TaskStatus +from core.models.preferences import SitePreferences +from core.models.task_log import TaskLog +from cplus_api.models.scenario import ScenarioTask + +logger = logging.getLogger(__name__) + + +@shared_task(name="check_scenario_task") +def check_scenario_task(): + """ + Check running Scenario Task and mark them as 'Stopped with error' + if they don't have new log after certain threshold. + + Running task without new log after certain threshold could indicate + they were stopped/interrupted halfway. + """ + elapsed_time_threshold = ( + SitePreferences.preferences().task_runtime_threshold + ) + + running_scenarios = ScenarioTask.objects.filter(status=TaskStatus.RUNNING) + for scenario in running_scenarios: + # we check whether the last logs have passed the threshold, + # instead of comparing to scenario's start time. + try: + last_log = TaskLog.objects.filter( + object_id=scenario.id, + content_type=ContentType.objects.get_for_model(scenario).id + ).latest('date_time') + elapsed_seconds = ( + timezone.now() - last_log.date_time + ).total_seconds() + except TaskLog.DoesNotExist: + # check started_at if the task does not have log + # Task without log, that has been running for more + # than task_runtime_threshold, could indicate + # issues during execution. + if scenario.started_at: + elapsed_seconds = ( + timezone.now() - scenario.started_at + ).total_seconds() + # check submitted_on if the task does not have start_at value + # If a task status is running, but it does not have any log + # and start_at value, there must something wrong and the + # task needs to be marked as error + elif scenario.submitted_on: + elapsed_seconds = ( + timezone.now() - scenario.submitted_on + ).total_seconds() + else: + continue + + # if elapsed seconds is more than threshold in seconds, + # mark as having errors. + if elapsed_seconds > (elapsed_time_threshold * 60): + err_msg = ( + f"Timeout error: Task execution time ({elapsed_seconds}s) " + f"exceeded {elapsed_time_threshold * 60}s." + ) + scenario.task_on_errors( + exception=err_msg + ) diff --git a/django_project/cplus_api/tasks/cleaner.py b/django_project/cplus_api/tasks/cleaner.py deleted file mode 100644 index b9bb1ee..0000000 --- a/django_project/cplus_api/tasks/cleaner.py +++ /dev/null @@ -1,14 +0,0 @@ -"""Task to remove resources.""" - -from celery import shared_task -import logging - - -logger = logging.getLogger(__name__) -# remove tasks with two months old -REMOVE_AFTER_DAYS = 60 - - -@shared_task(name="check_celery_background_tasks") -def check_celery_background_tasks(): - logger.info('Triggered check_celery_background_tasks') diff --git a/django_project/cplus_api/tasks/runner.py b/django_project/cplus_api/tasks/runner.py index d0e4bf6..b1d20fd 100644 --- a/django_project/cplus_api/tasks/runner.py +++ b/django_project/cplus_api/tasks/runner.py @@ -26,7 +26,7 @@ def create_scenario_task_runner(scenario_task: ScenarioTask): return analysis_task -@shared_task(name="run_scenario_analysis_task") +@shared_task(name="run_scenario_analysis_task", track_started=True) def run_scenario_analysis_task(scenario_task_id): # pragma: no cover scenario_task = ScenarioTask.objects.get(id=scenario_task_id) scenario_task.task_on_started() diff --git a/django_project/cplus_api/tests/test_check_scenario_task.py b/django_project/cplus_api/tests/test_check_scenario_task.py new file mode 100644 index 0000000..60afe63 --- /dev/null +++ b/django_project/cplus_api/tests/test_check_scenario_task.py @@ -0,0 +1,82 @@ +from datetime import timedelta +from unittest import mock + +from django.utils import timezone + +from core.models.base_task_request import TaskStatus +from cplus_api.tasks.check_scenario_task import check_scenario_task +from cplus_api.tests.common import BaseAPIViewTransactionTest +from cplus_api.tests.factories import ( + ScenarioTaskF +) + + +@mock.patch('core.models.base_task_request.timezone') +class TestCheckScenarioTask(BaseAPIViewTransactionTest): + """ + Test checking scenario task if they have run for too long. + """ + + def test_non_running_task_not_checked(self, mock_tz): + """ + Test that only running tasks are checked. + In this case, the running task's last log was added 150 minutes ago, + which is more than the threshold (120 minutes), so it will be + marked as 'Stopped with errors' + """ + mock_tz.now.return_value = timezone.now() - timedelta(minutes=150) + task_running = ScenarioTaskF.create( + status=TaskStatus.RUNNING + ) + task_running.add_log('Test Log') + non_running_tasks = [] + non_running_statuses = [ + TaskStatus.PENDING, + TaskStatus.QUEUED, + TaskStatus.COMPLETED, + TaskStatus.CANCELLED, + TaskStatus.INVALIDATED + ] + for status in non_running_statuses: + task = ScenarioTaskF.create( + status=status + ) + task.add_log('Test Log') + non_running_tasks.append(task) + + check_scenario_task() + + task_running.refresh_from_db() + self.assertEquals(task_running.status, TaskStatus.STOPPED) + + for task in non_running_tasks: + task.refresh_from_db() + self.assertNotEquals(task.status, TaskStatus.STOPPED) + + def test_running_task_without_log(self, mock_tz): + """ + Running task without log will have its started_at checked + to be compared against threshold + """ + mock_tz.now.return_value = timezone.now() - timedelta(minutes=150) + task_running = ScenarioTaskF.create( + status=TaskStatus.RUNNING, + started_at=timezone.now() - timedelta(minutes=150) + ) + check_scenario_task() + task_running.refresh_from_db() + self.assertEquals(task_running.status, TaskStatus.STOPPED) + + def test_running_task_without_log_started_at(self, mock_tz): + """ + Running task without log and started_at will have its + submitted_on checked to be compared against threshold + """ + mock_tz.now.return_value = timezone.now() - timedelta(minutes=150) + task_running = ScenarioTaskF.create( + status=TaskStatus.RUNNING, + submitted_on=timezone.now() - timedelta(minutes=150) + ) + check_scenario_task() + task_running.refresh_from_db() + self.assertEquals(task_running.status, TaskStatus.STOPPED)