Skip to content

Commit

Permalink
feat(workflow_engine): Add way to hook logic into Detector (#78540)
Browse files Browse the repository at this point in the history
This adds some ways that we can hook custom logic into the detector.
Looking at the options, it feels like splitting this logic into
different handlers makes sense. Doesn't have to be a function - we can
implement classes so that we can share logic.

<!-- Describe your PR here. -->
  • Loading branch information
wedamija authored Oct 16, 2024
1 parent ca4f5d1 commit 939be86
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 9 deletions.
32 changes: 32 additions & 0 deletions src/sentry/incidents/grouptype.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dataclasses import dataclass

from sentry.incidents.utils.types import QuerySubscriptionUpdate
from sentry.issues.grouptype import GroupCategory, GroupType
from sentry.ratelimits.sliding_windows import Quota
from sentry.types.group import PriorityLevel
from sentry.workflow_engine.models import DataPacket
from sentry.workflow_engine.models.detector import DetectorEvaluationResult, DetectorHandler


# TODO: This will be a stateful detector when we build that abstraction
class MetricAlertDetectorHandler(DetectorHandler[QuerySubscriptionUpdate]):
def evaluate(
self, data_packet: DataPacket[QuerySubscriptionUpdate]
) -> list[DetectorEvaluationResult]:
# TODO: Implement
return []


# Example GroupType and detector handler for metric alerts. We don't create these issues yet, but we'll use something
# like these when we're sending issues as alerts
@dataclass(frozen=True)
class MetricAlertFire(GroupType):
type_id = 8001
slug = "metric_alert_fire"
description = "Metric alert fired"
category = GroupCategory.METRIC_ALERT.value
creation_quota = Quota(3600, 60, 100)
default_priority = PriorityLevel.HIGH
enable_auto_resolve = False
enable_escalation_detection = False
detector_handler = MetricAlertDetectorHandler
4 changes: 4 additions & 0 deletions src/sentry/issues/grouptype.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.users.models.user import User
from sentry.workflow_engine.models.detector import DetectorHandler
import logging

logger = logging.getLogger(__name__)
Expand All @@ -35,6 +36,7 @@ class GroupCategory(Enum):
REPLAY = 5
FEEDBACK = 6
UPTIME = 7
METRIC_ALERT = 8


GROUP_CATEGORIES_CUSTOM_EMAIL = (
Expand Down Expand Up @@ -152,8 +154,10 @@ class GroupType:
enable_auto_resolve: bool = True
# Allow escalation forecasts and detection
enable_escalation_detection: bool = True
# Quota around many of these issue types can be created per project in a given time window
creation_quota: Quota = Quota(3600, 60, 5) # default 5 per hour, sliding window of 60 seconds
notification_config: NotificationConfig = NotificationConfig()
detector_handler: type[DetectorHandler] | None = None

def __init_subclass__(cls: type[GroupType], **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
Expand Down
6 changes: 5 additions & 1 deletion src/sentry/testutils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,11 @@ def create_detector(
if name is None:
name = petname.generate(2, " ", letters=10).title()
return Detector.objects.create(
organization=organization, name=name, owner_user_id=owner_user_id, owner_team=owner_team
organization=organization,
name=name,
owner_user_id=owner_user_id,
owner_team=owner_team,
**kwargs,
)

@staticmethod
Expand Down
11 changes: 8 additions & 3 deletions src/sentry/workflow_engine/models/data_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Protocol
import dataclasses
from typing import Generic, TypeVar

from django.db import models

Expand All @@ -11,9 +12,13 @@
)
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector

T = TypeVar("T")

class DataPacket(Protocol):
query_id: int

@dataclasses.dataclass
class DataPacket(Generic[T]):
query_id: str
packet: T


@region_silo_model
Expand Down
80 changes: 78 additions & 2 deletions src/sentry/workflow_engine/models/detector.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
from __future__ import annotations

import abc
import dataclasses
import logging
from typing import TYPE_CHECKING, Any, Generic, TypeVar

from django.db import models
from django.db.models import UniqueConstraint

from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model
from sentry.issues import grouptype
from sentry.models.owner_base import OwnerModel
from sentry.workflow_engine.models.data_source_detector import DataSourceDetector
from sentry.types.group import PriorityLevel
from sentry.workflow_engine.models import DataPacket

if TYPE_CHECKING:
from sentry.workflow_engine.models.detector_state import DetectorStatus

logger = logging.getLogger(__name__)


@region_silo_model
Expand All @@ -15,7 +29,9 @@ class Detector(DefaultFieldsModel, OwnerModel):
name = models.CharField(max_length=200)

# The data sources that the detector is watching
data_sources = models.ManyToManyField("workflow_engine.DataSource", through=DataSourceDetector)
data_sources = models.ManyToManyField(
"workflow_engine.DataSource", through="workflow_engine.DataSourceDetector"
)

# The conditions that must be met for the detector to be considered 'active'
# This will emit an event for the workflow to process
Expand All @@ -35,3 +51,63 @@ class Meta(OwnerModel.Meta):
name="workflow_engine_detector_org_name",
)
]

@property
def detector_handler(self) -> DetectorHandler | None:
group_type = grouptype.registry.get_by_slug(self.type)
if not group_type:
logger.error(
"No registered grouptype for detector",
extra={
"group_type": str(group_type),
"detector_id": self.id,
"detector_type": self.type,
},
)
return None

if not group_type.detector_handler:
logger.error(
"Registered grouptype for detector has no detector_handler",
extra={
"group_type": str(group_type),
"detector_id": self.id,
"detector_type": self.type,
},
)
return None
return group_type.detector_handler(self)


@dataclasses.dataclass(frozen=True)
class DetectorStateData:
group_key: str | None
active: bool
status: DetectorStatus
# Stateful detectors always process data packets in order. Once we confirm that a data packet has been fully
# processed and all workflows have been done, this value will be used by the stateful detector to prevent
# reprocessing
dedupe_value: int
# Stateful detectors allow various counts to be tracked. We need to update these after we process workflows, so
# include the updates in the state
counter_updates: dict[str, int]


@dataclasses.dataclass(frozen=True)
class DetectorEvaluationResult:
is_active: bool
priority: PriorityLevel
data: Any
state_update_data: DetectorStateData | None = None


T = TypeVar("T")


class DetectorHandler(abc.ABC, Generic[T]):
def __init__(self, detector: Detector):
self.detector = detector

@abc.abstractmethod
def evaluate(self, data_packet: DataPacket[T]) -> list[DetectorEvaluationResult]:
pass
4 changes: 1 addition & 3 deletions src/sentry/workflow_engine/models/detector_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from sentry.backup.scopes import RelocationScope
from sentry.db.models import DefaultFieldsModel, FlexibleForeignKey, region_silo_model

from .detector import Detector


class DetectorStatus(StrEnum):
OK = "ok"
Expand All @@ -16,7 +14,7 @@ class DetectorStatus(StrEnum):
class DetectorState(DefaultFieldsModel):
__relocation_scope__ = RelocationScope.Organization

detector = FlexibleForeignKey(Detector)
detector = FlexibleForeignKey("workflow_engine.Detector")

# This key is used when a detector is using group-by
# allows us to link to a specific group from a single detector
Expand Down
36 changes: 36 additions & 0 deletions src/sentry/workflow_engine/processors/detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging

from sentry.workflow_engine.models import Detector
from sentry.workflow_engine.models.data_source import DataPacket
from sentry.workflow_engine.models.detector import DetectorEvaluationResult

logger = logging.getLogger(__name__)


def process_detectors(
data_packet: DataPacket, detectors: list[Detector]
) -> list[tuple[Detector, list[DetectorEvaluationResult]]]:
results = []
for detector in detectors:
handler = detector.detector_handler
if not handler:
continue
detector_results = handler.evaluate(data_packet)
detector_group_keys = set()
for result in detector_results:
if result.state_update_data:
if result.state_update_data.group_key in detector_group_keys:
# This shouldn't happen - log an error and continue on, but we should investigate this.
logger.error(
"Duplicate detector state group keys found",
extra={
"detector_id": detector.id,
"group_key": result.state_update_data.group_key,
},
)
detector_group_keys.add(result.state_update_data.group_key)

if detector_results:
results.append((detector, detector_results))

return results
Loading

0 comments on commit 939be86

Please sign in to comment.