Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/392 cron check task #27

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions django_project/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
'task': 'remove_layers',
'schedule': crontab(minute='0', hour='1'), # Run everyday at 1am
},
'check-scenario-task': {
'task': 'check_scenario_task',
'schedule': crontab(hour='*'), # Run every hour
},
'clean-multipart-upload': {
'task': 'clean_multipart_upload',
'schedule': crontab(minute='0', hour='2'), # Run everyday at 2am
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ class Migration(migrations.Migration):
name='layer_days_to_keep',
field=models.IntegerField(default=14, help_text='Keep input/output layers until X days.'),
),
]
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.7 on 2024-06-03 06:00

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0005_sitepreferences_layer_days_to_keep'),
]

operations = [
migrations.AddField(
model_name='sitepreferences',
name='task_runtime_threshold',
field=models.PositiveIntegerField(default=120, help_text="Max minutes to pass before a task is marked 'Stopped with error'"),
),
]
1 change: 1 addition & 0 deletions django_project/core/models/base_task_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def task_on_errors(self, exception=None, traceback=None):
self.errors = ex_msg
self.add_log('Task is stopped with errors.', logging.ERROR)
self.add_log(str(exception), logging.ERROR)

self.save(
update_fields=['last_update', 'status', 'errors']
)
Expand Down
7 changes: 7 additions & 0 deletions django_project/core/models/preferences.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ class SitePreferences(SingletonModel):
blank=True,
help_text='Output group to keep from automatic removal.'
)
task_runtime_threshold = models.PositiveIntegerField(
default=120,
help_text=(
"Max minutes to pass before a task "
"is marked 'Stopped with error'"
)
)
layer_days_to_keep = models.IntegerField(
default=14,
help_text='Keep input/output layers until X days.'
Expand Down
2 changes: 1 addition & 1 deletion django_project/cplus_api/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .cleaner import * # noqa
from .check_scenario_task import * # noqa
from .runner import * # noqa
from .remove_layers import * # noqa
71 changes: 71 additions & 0 deletions django_project/cplus_api/tasks/check_scenario_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Task to remove resources."""

import logging

from celery import shared_task
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone

from core.models.base_task_request import TaskStatus
from core.models.preferences import SitePreferences
from core.models.task_log import TaskLog
from cplus_api.models.scenario import ScenarioTask

logger = logging.getLogger(__name__)


@shared_task(name="check_scenario_task")
def check_scenario_task():
"""
Check running Scenario Task and mark them as 'Stopped with error'
if they don't have new log after certain threshold.

Running task without new log after certain threshold could indicate
they were stopped/interrupted halfway.
"""
elapsed_time_threshold = (
SitePreferences.preferences().task_runtime_threshold
)

running_scenarios = ScenarioTask.objects.filter(status=TaskStatus.RUNNING)
for scenario in running_scenarios:
# we check whether the last logs have passed the threshold,
# instead of comparing to scenario's start time.
try:
last_log = TaskLog.objects.filter(
object_id=scenario.id,
content_type=ContentType.objects.get_for_model(scenario).id
).latest('date_time')
elapsed_seconds = (
timezone.now() - last_log.date_time
).total_seconds()
except TaskLog.DoesNotExist:
# check started_at if the task does not have log
# Task without log, that has been running for more
# than task_runtime_threshold, could indicate
# issues during execution.
if scenario.started_at:
elapsed_seconds = (
timezone.now() - scenario.started_at
).total_seconds()
# check submitted_on if the task does not have start_at value
# If a task status is running, but it does not have any log
# and start_at value, there must something wrong and the
# task needs to be marked as error
elif scenario.submitted_on:
elapsed_seconds = (
timezone.now() - scenario.submitted_on
).total_seconds()
else:
continue

Check warning on line 60 in django_project/cplus_api/tasks/check_scenario_task.py

View check run for this annotation

Codecov / codecov/patch

django_project/cplus_api/tasks/check_scenario_task.py#L60

Added line #L60 was not covered by tests

# if elapsed seconds is more than threshold in seconds,
# mark as having errors.
if elapsed_seconds > (elapsed_time_threshold * 60):
err_msg = (
f"Timeout error: Task execution time ({elapsed_seconds}s) "
f"exceeded {elapsed_time_threshold * 60}s."
)
scenario.task_on_errors(
exception=err_msg
)
14 changes: 0 additions & 14 deletions django_project/cplus_api/tasks/cleaner.py

This file was deleted.

2 changes: 1 addition & 1 deletion django_project/cplus_api/tasks/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def create_scenario_task_runner(scenario_task: ScenarioTask):
return analysis_task


@shared_task(name="run_scenario_analysis_task")
@shared_task(name="run_scenario_analysis_task", track_started=True)
def run_scenario_analysis_task(scenario_task_id): # pragma: no cover
scenario_task = ScenarioTask.objects.get(id=scenario_task_id)
scenario_task.task_on_started()
Expand Down
82 changes: 82 additions & 0 deletions django_project/cplus_api/tests/test_check_scenario_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datetime import timedelta
from unittest import mock

from django.utils import timezone

from core.models.base_task_request import TaskStatus
from cplus_api.tasks.check_scenario_task import check_scenario_task
from cplus_api.tests.common import BaseAPIViewTransactionTest
from cplus_api.tests.factories import (
ScenarioTaskF
)


@mock.patch('core.models.base_task_request.timezone')
class TestCheckScenarioTask(BaseAPIViewTransactionTest):
"""
Test checking scenario task if they have run for too long.
"""

def test_non_running_task_not_checked(self, mock_tz):
"""
Test that only running tasks are checked.
In this case, the running task's last log was added 150 minutes ago,
which is more than the threshold (120 minutes), so it will be
marked as 'Stopped with errors'
"""
mock_tz.now.return_value = timezone.now() - timedelta(minutes=150)
task_running = ScenarioTaskF.create(
status=TaskStatus.RUNNING
)
task_running.add_log('Test Log')
non_running_tasks = []
non_running_statuses = [
TaskStatus.PENDING,
TaskStatus.QUEUED,
TaskStatus.COMPLETED,
TaskStatus.CANCELLED,
TaskStatus.INVALIDATED
]
for status in non_running_statuses:
task = ScenarioTaskF.create(
status=status
)
task.add_log('Test Log')
non_running_tasks.append(task)

check_scenario_task()

task_running.refresh_from_db()
self.assertEquals(task_running.status, TaskStatus.STOPPED)

for task in non_running_tasks:
task.refresh_from_db()
self.assertNotEquals(task.status, TaskStatus.STOPPED)

def test_running_task_without_log(self, mock_tz):
"""
Running task without log will have its started_at checked
to be compared against threshold
"""
mock_tz.now.return_value = timezone.now() - timedelta(minutes=150)
task_running = ScenarioTaskF.create(
status=TaskStatus.RUNNING,
started_at=timezone.now() - timedelta(minutes=150)
)
check_scenario_task()
task_running.refresh_from_db()
self.assertEquals(task_running.status, TaskStatus.STOPPED)

def test_running_task_without_log_started_at(self, mock_tz):
"""
Running task without log and started_at will have its
submitted_on checked to be compared against threshold
"""
mock_tz.now.return_value = timezone.now() - timedelta(minutes=150)
task_running = ScenarioTaskF.create(
status=TaskStatus.RUNNING,
submitted_on=timezone.now() - timedelta(minutes=150)
)
check_scenario_task()
task_running.refresh_from_db()
self.assertEquals(task_running.status, TaskStatus.STOPPED)
Loading