Skip to content

Commit

Permalink
feat: 订阅巡检支持根据业务分发任务到不同队列 (closed TencentBlueKing#2061)
Browse files Browse the repository at this point in the history
# Reviewed, transaction id: 6122
  • Loading branch information
Huayeaaa authored and ZhuoZhuoCrayon committed Apr 17, 2024
1 parent 013a530 commit baccd69
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
20 changes: 16 additions & 4 deletions apps/backend/periodic_tasks/update_subscription_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@
import logging

from celery.task import periodic_task
from django.db.models import Value

from apps.backend.subscription.constants import SUBSCRIPTION_UPDATE_INTERVAL
from apps.backend.subscription.tasks import update_subscription_instances_chunk
from apps.backend.subscription.tools import (
by_biz_dispatch_task_queue,
get_biz_ids_gby_queue,
)
from apps.node_man import models
from apps.utils.periodic_task import calculate_countdown

Expand All @@ -24,12 +29,19 @@ def update_subscription_instances():
# 关闭订阅自动巡检
return

subscription_ids = list(
models.Subscription.objects.filter(enable=True, is_deleted=False).values_list("id", flat=True)
subscriptions = models.Subscription.objects.filter(enable=Value(1), is_deleted=Value(0)).values(
"id", "bk_biz_id", "bk_biz_scope"
)
subscription_ids = [subscription["id"] for subscription in subscriptions]
subscription_id__biz_ids_map = {
subscription["id"]: subscription["bk_biz_scope"] + [subscription["bk_biz_id"]] for subscription in subscriptions
}
biz_ids_gby_queue = get_biz_ids_gby_queue()

count = len(subscription_ids)
for index, subscription_id in enumerate(subscription_ids):
# 把订阅平均分布到10分钟内执行,用于削峰
countdown = calculate_countdown(count=count, index=index, duration=SUBSCRIPTION_UPDATE_INTERVAL)
logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds.")
update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown)
task_queue = by_biz_dispatch_task_queue(biz_ids_gby_queue, subscription_id__biz_ids_map[subscription_id])
logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds in queue ({task_queue}).")
update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown, queue=task_queue)
18 changes: 18 additions & 0 deletions apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,3 +1512,21 @@ def check_subscription_is_disabled(

logger.info(f"[check_subscription_is_disabled] {subscription_identity}: not in the disable list, skipping")
return False


def get_biz_ids_gby_queue() -> Dict[str, List[int]]:
"""返回任务队列与业务ID列表的映射"""
biz_ids_gby_queue: Dict[str, List[int]] = models.GlobalSettings.get_config(
key=models.GlobalSettings.KeyEnum.SUBSCRIPTION_UPDATE_TASK_QUEUE.value, default={}
)
return biz_ids_gby_queue


def by_biz_dispatch_task_queue(biz_ids_gby_queue: Dict[str, List[int]], bk_biz_ids: List[Union[int, None]]) -> str:
"""通过业务ID列表分配任务队列"""
default_task_queue: str = "backend_additional_task"
for task_queue, partial_biz_ids in biz_ids_gby_queue.items():
if set(partial_biz_ids) & set(bk_biz_ids):
return task_queue

return default_task_queue
2 changes: 2 additions & 0 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class KeyEnum(Enum):
ENABLE_NOTICE_CENTER = "ENABLE_NOTICE_CENTER"
# 禁用已停用插件
DISABLE_STOPPED_PLUGIN = "DISABLE_STOPPED_PLUGIN"
# 根据订阅分配任务队列
SUBSCRIPTION_UPDATE_TASK_QUEUE = "SUBSCRIPTION_UPDATE_TASK_QUEUE"

key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True)
v_json = JSONField(_("值"))
Expand Down
10 changes: 9 additions & 1 deletion apps/node_man/periodic_tasks/resource_watch_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from django.db.models import Q
from django.db.utils import IntegrityError

from apps.backend.subscription.tools import (
by_biz_dispatch_task_queue,
get_biz_ids_gby_queue,
)
from apps.component.esbclient import client_v2
from apps.node_man import constants
from apps.node_man.models import GlobalSettings, Host, ResourceWatchEvent, Subscription
Expand Down Expand Up @@ -427,11 +431,15 @@ def trigger_nodeman_subscription(bk_biz_id, debounce_time=0):
method="subscription", bk_biz_id=bk_biz_id, debounce_time=debounce_time
).inc()

biz_ids_gby_queue = get_biz_ids_gby_queue()
task_queue: str = by_biz_dispatch_task_queue(biz_ids_gby_queue, [bk_biz_id])

update_subscription_instances_chunk.apply_async(
kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time
kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time, queue=task_queue
)

logger.info(
f"[trigger_nodeman_subscription] following subscriptions "
f"will be run -> ({subscription_ids}) after {debounce_time} s"
f" in queue -> ({task_queue})"
)

0 comments on commit baccd69

Please sign in to comment.