Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/pip/valkey-6.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
cunla authored Oct 4, 2024
2 parents aeb87ed + 800dea6 commit dbe25b8
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 247 deletions.
1 change: 0 additions & 1 deletion .github/actions/test-coverage/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,5 @@ runs:
${{ env.REPORT }}
```
repo-token: ${{ inputs.repoToken }}
repo-token-user-login: 'github-actions[bot]'
allow-repeats: true
update-only: true
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__pycache__/
*.py[cod]
*$py.class

tags
*.so

.Python
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
mkdocs==1.6.1
mkdocs-material==9.5.34
mkdocs-material==9.5.39
260 changes: 120 additions & 140 deletions poetry.lock

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions scheduler/admin/ephemeral_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ def has_change_permission(self, request, obj=None):
return True

def has_module_permission(self, request):
"""
return True if the given request has any permission in the given
app label.
Can be overridden by the user in subclasses. In such case it should
return True if the given request has permission to view the module on
the admin index page and access the module's index page. Overriding it
does not restrict access to the add, change or delete views. Use
`ModelAdmin.has_(add|change|delete)_permission` for that.
"""Returns True if the given request has any permission in the given app label.
Can be overridden by the user in subclasses. In such case, it should return True if the given request has
permission to view the module on the admin index page and access the module's index page. Overriding it does
not restrict access to the add, change or delete views. Use `ModelAdmin.has_(add|change|delete)_permission` for
that.
"""
return request.user.has_module_perms("django-tasks-scheduler")

Expand Down
4 changes: 2 additions & 2 deletions scheduler/admin/task_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from scheduler import tools
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask
from scheduler.settings import SCHEDULER_CONFIG, logger
from scheduler.tools import get_job_executions
from scheduler.tools import get_job_executions_for_task


class HiddenMixin(object):
Expand Down Expand Up @@ -185,7 +185,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
extra = extra_context or {}
obj = self.get_object(request, object_id)
try:
execution_list = get_job_executions(obj.queue, obj)
execution_list = get_job_executions_for_task(obj.queue, obj)
except (redis.ConnectionError, valkey.ConnectionError) as e:
logger.warn(f"Could not get job executions: {e}")
execution_list = list()
Expand Down
2 changes: 1 addition & 1 deletion scheduler/management/commands/import.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def create_job_from_dict(job_dict: Dict[str, Any], update):
target = timezone.make_naive(target)
kwargs["scheduled_time"] = target
model_fields = set(map(lambda field: field.attname, model._meta.get_fields()))
keys_to_ignore = list(filter(lambda k: k not in model_fields, kwargs.keys()))
keys_to_ignore = list(filter(lambda _k: _k not in model_fields, kwargs.keys()))
for k in keys_to_ignore:
del kwargs[k]
scheduled_job = model.objects.create(**kwargs)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/management/commands/rqstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _print_stats_dashboard(self, statistics, prev_stats=None):
# Deal with colors
if prev_stats and len(prev_stats["queues"]) > ind:
prev = prev_stats["queues"][ind]
prev_vals = (prev[k] for k in KEYS)
prev_vals = tuple(prev[k] for k in KEYS)
colors = [
ANSI_LIGHT_GREEN if vals[i] != prev_vals[i] else ANSI_LIGHT_WHITE for i in range(len(prev_vals))
]
Expand Down
31 changes: 24 additions & 7 deletions scheduler/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@
3: logging.DEBUG,
}

WORKER_ARGUMENTS = {
"name",
"default_result_ttl",
"connection",
"exc_handler",
"exception_handlers",
"default_worker_ttl",
"maintenance_interval",
"job_class",
"queue_class",
"log_job_description",
"job_monitoring_interval",
"disable_default_exception_handler",
"prepare_for_work",
"serializer",
"work_horse_killed_handler",
}


def reset_db_connections():
for c in connections.all():
Expand Down Expand Up @@ -80,30 +98,29 @@ def add_arguments(self, parser):
parser.add_argument("--sentry-ca-certs", action="store", dest="sentry_ca_certs", help="Path to CA certs file")

def handle(self, **options):
queues = options.get("queues", [])
queues = options.pop("queues", [])
if not queues:
queues = [
"default",
]
click.echo(f"Starting worker for queues {queues}")
pidfile = options.get("pidfile")
pidfile = options.pop("pidfile")
if pidfile:
with open(os.path.expanduser(pidfile), "w") as fp:
fp.write(str(os.getpid()))

# Verbosity is defined by default in BaseCommand for all commands
verbosity = options.get("verbosity", 1)
verbosity = options.pop("verbosity", 1)
log_level = VERBOSITY_TO_LOG_LEVEL.get(verbosity, logging.INFO)
setup_loghandlers(log_level)

init_options = {k: v for k, v in options.items() if k in WORKER_ARGUMENTS}

try:
# Instantiate a worker
w = create_worker(
*queues,
name=options["name"],
job_class=options.get("job_class"),
default_worker_ttl=options["worker_ttl"],
fork_job_execution=options["fork_job_execution"],
**init_options
)

# Close any opened DB connection before any fork
Expand Down
2 changes: 1 addition & 1 deletion scheduler/models/scheduled_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def __str__(self):
def save(self, **kwargs):
schedule_job = kwargs.pop("schedule_job", True)
update_fields = kwargs.get("update_fields", None)
if update_fields:
if update_fields is not None:
kwargs["update_fields"] = set(update_fields).union({"modified"})
super(BaseTask, self).save(**kwargs)
if schedule_job:
Expand Down
14 changes: 7 additions & 7 deletions scheduler/queues.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Dict
from typing import List, Dict, Set

import redis
import valkey
Expand Down Expand Up @@ -87,7 +87,7 @@ def get_connection(queue_settings, use_strict_redis=False):


def get_queue(
name="default", default_timeout=None, is_async=None, autocommit=None, connection=None, **kwargs
name="default", default_timeout=None, is_async=None, autocommit=None, connection=None, **kwargs
) -> DjangoQueue:
"""Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`"""
from .settings import QUEUES
Expand All @@ -107,18 +107,18 @@ def get_queue(
)


def get_all_workers():
def get_all_workers() -> Set[DjangoWorker]:
from .settings import QUEUES

workers = set()
workers_set: Set[DjangoWorker] = set()
for queue_name in QUEUES:
connection = get_connection(QUEUES[queue_name])
try:
curr_workers = set(DjangoWorker.all(connection=connection))
workers.update(curr_workers)
curr_workers: Set[DjangoWorker] = set(DjangoWorker.all(connection=connection))
workers_set.update(curr_workers)
except (redis.ConnectionError, valkey.ConnectionError) as e:
logger.error(f"Could not connect for queue {queue_name}: {e}")
return workers
return workers_set


def _queues_share_connection_params(q1_params: Dict, q2_params: Dict):
Expand Down
Loading

0 comments on commit dbe25b8

Please sign in to comment.