Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #16971: Add system jobs #17716

Open
wants to merge 13 commits into
base: feature
Choose a base branch
from
53 changes: 41 additions & 12 deletions docs/plugins/development/background-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class MyTestJob(JobRunner):

You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead.

!!! tip
A set of predefined intervals is available at `core.choices.JobIntervalChoices` for convenience.

### Attributes

`JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged.
Expand All @@ -46,27 +49,53 @@ As described above, jobs can be scheduled for immediate execution or at any late

#### Example

```python title="jobs.py"
from netbox.jobs import JobRunner
```python title="models.py"
from django.db import models
from core.choices import JobIntervalChoices
from netbox.models import NetBoxModel
from .jobs import MyTestJob

class MyModel(NetBoxModel):
foo = models.CharField()

def save(self, *args, **kwargs):
MyTestJob.enqueue_once(instance=self, interval=JobIntervalChoices.INTERVAL_HOURLY)
return super().save(*args, **kwargs)

def sync(self):
MyTestJob.enqueue(instance=self)
```


### System Jobs

Some plugins may implement background jobs that are decoupled from the request/response cycle. Typical use cases would be housekeeping tasks or synchronization jobs. These can be registered as _system jobs_ using the `system_job()` decorator. The job interval must be passed as an integer (in minutes) when registering a system job. System jobs are scheduled automatically when the RQ worker (`manage.py rqworker`) is run.

#### Example

```python title="jobs.py"
from core.choices import JobIntervalChoices
from netbox.jobs import JobRunner, system_job
from .models import MyModel

# Specify a predefined choice or an integer indicating
# the number of minutes between job executions
@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY)
class MyHousekeepingJob(JobRunner):
alehaa marked this conversation as resolved.
Show resolved Hide resolved
class Meta:
name = "Housekeeping"
name = "My Housekeeping Job"

def run(self, *args, **kwargs):
# your logic goes here
```

```python title="__init__.py"
from netbox.plugins import PluginConfig
MyModel.objects.filter(foo='bar').delete()

class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
system_jobs = (
MyHousekeepingJob,
)
```

!!! note
Ensure that any system jobs are imported on initialization. Otherwise, they won't be registered. This can be achieved by extending the PluginConfig's `ready()` method.

## Task queues

Three task queues of differing priority are defined by default:
Expand Down
2 changes: 1 addition & 1 deletion docs/plugins/development/data-backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ backends = [MyDataBackend]
```

!!! tip
The path to the list of search indexes can be modified by setting `data_backends` in the PluginConfig instance.
The path to the list of data backends can be modified by setting `data_backends` in the PluginConfig instance.

::: netbox.data_backends.DataBackend
14 changes: 14 additions & 0 deletions netbox/core/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ class JobStatusChoices(ChoiceSet):
)


class JobIntervalChoices(ChoiceSet):
INTERVAL_MINUTELY = 1
INTERVAL_HOURLY = 60
INTERVAL_DAILY = 60 * 24
INTERVAL_WEEKLY = 60 * 24 * 7

CHOICES = (
(INTERVAL_MINUTELY, _('Minutely')),
(INTERVAL_HOURLY, _('Hourly')),
(INTERVAL_DAILY, _('Daily')),
(INTERVAL_WEEKLY, _('Weekly')),
)


#
# ObjectChanges
#
Expand Down
11 changes: 11 additions & 0 deletions netbox/core/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from django_rq.management.commands.rqworker import Command as _Command

from netbox.registry import registry


DEFAULT_QUEUES = ('high', 'default', 'low')

Expand All @@ -14,6 +16,15 @@ class Command(_Command):
of only the 'default' queue).
"""
def handle(self, *args, **options):
# Setup system jobs.
for job, kwargs in registry['system_jobs'].items():
try:
interval = kwargs['interval']
except KeyError:
raise TypeError("System job must specify an interval (in minutes).")
logger.debug(f"Scheduling system job {job.name} (interval={interval})")
job.enqueue_once(**kwargs)
Comment on lines +25 to +26
Copy link
Contributor Author

@alehaa alehaa Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From 6b192c9 there is a missing if interval:.

The reason for this if-statement is to selectively ignore job registration, e.g. depending on configuration parameters. An example use case would be not to register the changelog cleanup job if the CHANGELOG_RETENTION setting is zero (disabled). If there are other ways to accomplish this (maybe by another decorator before system_jon()?) I'm open to that solution as well.


# Run the worker with scheduler functionality
options['with_scheduler'] = True

Expand Down
21 changes: 20 additions & 1 deletion netbox/netbox/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,38 @@
from abc import ABC, abstractmethod
from datetime import timedelta

from django.core.exceptions import ImproperlyConfigured
from django.utils.functional import classproperty
from django_pglocks import advisory_lock
from rq.timeouts import JobTimeoutException

from core.choices import JobStatusChoices
from core.models import Job, ObjectType
from netbox.constants import ADVISORY_LOCK_KEYS
from netbox.registry import registry

__all__ = (
'JobRunner',
'system_job',
)


def system_job(interval):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding *args, **kwargs as arguments, storing them in the registry and passing them to enqueue_once(). This would allow plugins to reuse jobs for different actions. Practical example: Changelog and journal cleanup could use the same job and pass model/configuration parameter names as arguments.

"""
Decorator for registering a `JobRunner` class as system background job.
"""
if type(interval) is not int:
raise ImproperlyConfigured("System job interval must be an integer (minutes).")

def _wrapper(cls):
registry['system_jobs'][cls] = {
'interval': interval
}
return cls

return _wrapper


class JobRunner(ABC):
"""
Background Job helper class.
Expand Down Expand Up @@ -129,7 +148,7 @@ class scheduled for `instance`, the existing job will be updated if necessary. T
if job:
# If the job parameters haven't changed, don't schedule a new job and keep the current schedule. Otherwise,
# delete the existing job and schedule a new job instead.
if (schedule_at and job.scheduled == schedule_at) and (job.interval == interval):
if (not schedule_at or job.scheduled == schedule_at) and (job.interval == interval):
return job
job.delete()

Expand Down
1 change: 1 addition & 0 deletions netbox/netbox/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __delitem__(self, key):
'models': collections.defaultdict(set),
'plugins': dict(),
'search': dict(),
'system_jobs': dict(),
'tables': collections.defaultdict(dict),
'views': collections.defaultdict(dict),
'widgets': dict(),
Expand Down
5 changes: 5 additions & 0 deletions netbox/netbox/tests/dummy_plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,10 @@ class DummyPluginConfig(PluginConfig):
'netbox.tests.dummy_plugin.events.process_events_queue'
]

def ready(self):
super().ready()

from . import jobs # noqa: F401


config = DummyPluginConfig
9 changes: 9 additions & 0 deletions netbox/netbox/tests/dummy_plugin/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from core.choices import JobIntervalChoices
from netbox.jobs import JobRunner, system_job


@system_job(interval=JobIntervalChoices.INTERVAL_HOURLY)
class DummySystemJob(JobRunner):

def run(self, *args, **kwargs):
pass
36 changes: 36 additions & 0 deletions netbox/netbox/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ def test_enqueue_once_twice_same(self):
self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)

def test_enqueue_once_twice_same_no_schedule_at(self):
instance = DataSource()
schedule_at = self.get_schedule_at()
job1 = TestJobRunner.enqueue_once(instance, schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(instance)

self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)

def test_enqueue_once_twice_different_schedule_at(self):
instance = DataSource()
job1 = TestJobRunner.enqueue_once(instance, schedule_at=self.get_schedule_at())
Expand Down Expand Up @@ -127,3 +136,30 @@ def test_enqueue_once_after_enqueue(self):
self.assertNotEqual(job1, job2)
self.assertRaises(Job.DoesNotExist, job1.refresh_from_db)
self.assertEqual(TestJobRunner.get_jobs(instance).count(), 1)


class SystemJobTest(JobRunnerTestCase):
"""
Test that system jobs can be scheduled.

General functionality already tested by `JobRunnerTest` and `EnqueueTest`.
"""

def test_scheduling(self):
# Can job be enqueued?
job = TestJobRunner.enqueue(schedule_at=self.get_schedule_at())
self.assertIsInstance(job, Job)
self.assertEqual(TestJobRunner.get_jobs().count(), 1)

# Can job be deleted again?
job.delete()
self.assertRaises(Job.DoesNotExist, job.refresh_from_db)
self.assertEqual(TestJobRunner.get_jobs().count(), 0)

def test_enqueue_once(self):
schedule_at = self.get_schedule_at()
job1 = TestJobRunner.enqueue_once(schedule_at=schedule_at)
job2 = TestJobRunner.enqueue_once(schedule_at=schedule_at)

self.assertEqual(job1, job2)
self.assertEqual(TestJobRunner.get_jobs().count(), 1)
9 changes: 9 additions & 0 deletions netbox/netbox/tests/test_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from django.test import Client, TestCase, override_settings
from django.urls import reverse

from core.choices import JobIntervalChoices
from netbox.tests.dummy_plugin import config as dummy_config
from netbox.tests.dummy_plugin.data_backends import DummyBackend
from netbox.tests.dummy_plugin.jobs import DummySystemJob
from netbox.plugins.navigation import PluginMenu
from netbox.plugins.utils import get_plugin_config
from netbox.graphql.schema import Query
Expand Down Expand Up @@ -130,6 +132,13 @@ def test_data_backends(self):
self.assertIn('dummy', registry['data_backends'])
self.assertIs(registry['data_backends']['dummy'], DummyBackend)

def test_system_jobs(self):
"""
Check registered system jobs.
"""
self.assertIn(DummySystemJob, registry['system_jobs'])
self.assertEqual(registry['system_jobs'][DummySystemJob]['interval'], JobIntervalChoices.INTERVAL_HOURLY)

def test_queues(self):
"""
Check that plugin queues are registered with the accurate name.
Expand Down