Skip to content

Commit

Permalink
Support for aligning Slack message grouping intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Szefler committed Jun 25, 2024
1 parent 50f7f84 commit d0856f1
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 15 deletions.
58 changes: 51 additions & 7 deletions src/robusta/core/sinks/sink_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import time
from abc import abstractmethod, ABC
from collections import defaultdict
from datetime import datetime
from typing import Any, List, Dict, Tuple, DefaultDict, Optional

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, root_validator

from robusta.core.model.k8s_operation_type import K8sOperationType
from robusta.core.reporting.base import Finding
Expand Down Expand Up @@ -33,20 +34,63 @@ def register_notification(self, interval: int, threshold: int) -> bool:

class NotificationSummary(BaseModel):
message_id: Optional[str] = None # identifier of the summary message
start_ts: float = Field(default_factory=lambda: time.time()) # Timestamp of the first notification
start_ts: float = None
end_ts: float = None
# Keys for the table are determined by grouping.notification_mode.summary.by
summary_table: DefaultDict[KeyT, List[int]] = None

def register_notification(self, summary_key: KeyT, resolved: bool, interval: int):
now_ts = time.time()
def register_notification(self, summary_key: KeyT, resolved: bool, interval: int, aligned: bool):
now_dt = datetime.now()
now_ts = int(now_dt.timestamp())
idx = 1 if resolved else 0
if now_ts - self.start_ts > interval or not self.summary_table:
# Expired or the first summary ever for this group_key, reset the data
if not self.end_ts or now_ts > self.end_ts:
# Group expired or the first summary ever for this group_key, reset the data
self.summary_table = defaultdict(lambda: [0, 0])
self.start_ts = now_ts
self.start_ts, self.end_ts = self.calculate_interval_boundaries(interval, aligned, now_dt)
self.message_id = None
self.summary_table[summary_key][idx] += 1

@classmethod
def calculate_interval_boundaries(cls, interval: int, aligned: bool, now_dt: datetime) -> Tuple[float, float]:
now_ts = int(now_dt.timestamp())
if aligned:
# This handles leap seconds by adjusting the length of the last interval in the
# day to the actual end of day. Note leap seconds are expected to almost always be +1,
# but it's also expected that some -1's will appear in the (far) future, and it's
# not out of the realm of possibility that somewhat larger adjustments will happen
# before the leap second adjustment is phased out around 2035.

start_of_this_day_ts, end_of_this_day_ts = cls.get_day_boundaries(now_dt)
start_ts = now_ts - (now_ts - start_of_this_day_ts) % interval
end_ts = start_ts + interval
if (
end_ts > end_of_this_day_ts # negative leap seconds
or end_of_this_day_ts - end_ts < interval # positive leap seconds
):
end_ts = end_of_this_day_ts
else:
start_ts = now_ts
end_ts = now_ts + interval
return start_ts, end_ts

@staticmethod
def get_day_boundaries(now_dt: datetime) -> Tuple[int, int]:
# Note: we assume day boundaries according to the timezone configured on the pod
# running Robusta runner. A caveat of this is that Slack will show times according
# to the client's timezone, which may differ.

start_of_this_day = now_dt.replace(hour=0, minute=0, second=0, microsecond=0)
start_of_this_day_ts = int(start_of_this_day.timestamp())
try:
end_of_this_day = start_of_this_day.replace(day=start_of_this_day.day + 1)
except ValueError: # end of month
try:
end_of_this_day = start_of_this_day.replace(month=start_of_this_day.month + 1, day=1)
except ValueError: # end of year
end_of_this_day = start_of_this_day.replace(year=start_of_this_day.year + 1, month=1, day=1)
end_of_this_day_ts = int(end_of_this_day.timestamp())
return start_of_this_day_ts, end_of_this_day_ts


class SinkBase(ABC):
grouping_enabled: bool
Expand Down
15 changes: 12 additions & 3 deletions src/robusta/core/sinks/sink_base_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,21 @@ def validate_exactly_one_defined(cls, values: Dict):
class GroupingParams(BaseModel):
group_by: GroupingAttributeSelectorListT = ["cluster"]
interval: int = 15*60 # in seconds
aligned: bool = False
notification_mode: Optional[NotificationModeParams]

@root_validator
def validate_notification_mode(cls, values: Dict):
if values is None:
return {"summary": SummaryNotificationModeParams()}
def validate_interval_alignment(cls, values: Dict):
if values["aligned"]:
if values["interval"] < 24 * 3600:
if (24 * 3600) % values["interval"]:
raise ValueError(f'Unable to properly align time interval of {values["interval"]} seconds')
else:
# TODO do we also want to support automatically aligning intervals longer than
# a day? Using month/year boundaries? This would require additionally handling
# leap years and daytime saving, just as we handle leap seconds in
# NotificationSummary.register_notification
raise ValueError(f"Automatically aligning time intervals longer than 24 hours is not supported")
return values


Expand Down
8 changes: 3 additions & 5 deletions src/robusta/core/sinks/slack/slack_sink.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from robusta.core.model.env_vars import ROBUSTA_UI_DOMAIN
from robusta.core.reporting.base import Finding, FindingStatus
from robusta.core.sinks.sink_base import NotificationGroup, NotificationSummary, SinkBase
from robusta.core.reporting.base import Finding
from robusta.core.sinks.sink_base import SinkBase
from robusta.core.sinks.slack.slack_sink_params import SlackSinkConfigWrapper, SlackSinkParams
from robusta.integrations import slack as slack_module

Expand Down Expand Up @@ -37,7 +37,6 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool)
finding_data["cluster"] = self.cluster_name
resolved = finding.title.startswith("[RESOLVED]")

# 1. Notification accounting
group_key, group_header = self.get_group_key_and_header(
finding_data, self.params.grouping.group_by
)
Expand All @@ -48,7 +47,7 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool)
)
notification_summary = self.summaries[group_key]
notification_summary.register_notification(
summary_key, resolved, self.params.grouping.interval
summary_key, resolved, self.params.grouping.interval, self.params.grouping.aligned
)
slack_thread_ts = self.slack_sender.send_or_update_summary_message(
group_header,
Expand All @@ -75,6 +74,5 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool)
finding, self.params, platform_enabled, thread_ts=slack_thread_ts
)


def get_timeline_uri(self, account_id: str, cluster_name: str) -> str:
return f"{ROBUSTA_UI_DOMAIN}/graphs?account_id={account_id}&cluster={cluster_name}"
16 changes: 16 additions & 0 deletions tests/test_slack_grouping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest

from datetime import datetime

from robusta.core.sinks.sink_base import NotificationSummary


class TestNotificationSummary:
@pytest.mark.parametrize("input_dt,expected_output", [
(datetime(2024, 6, 25, 12, 15, 33), (1719266400, 1719352800)),
(datetime(2024, 6, 30, 17, 22, 19), (1719698400, 1719784800)),
(datetime(2024, 12, 3, 10, 59, 59), (1733180400, 1733266800)),
(datetime(2024, 12, 31, 16, 42, 28), (1735599600, 1735686000)),
])
def test_get_day_boundaries(self, input_dt, expected_output):
assert NotificationSummary.get_day_boundaries(input_dt) == expected_output

0 comments on commit d0856f1

Please sign in to comment.