Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for aligning Slack message grouping intervals #1473

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions src/robusta/core/sinks/sink_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import time
from abc import abstractmethod, ABC
from collections import defaultdict
from datetime import datetime
from typing import Any, List, Dict, Tuple, DefaultDict, Optional

from pydantic import BaseModel, Field
from pydantic import BaseModel

from robusta.core.model.k8s_operation_type import K8sOperationType
from robusta.core.reporting.base import Finding
Expand Down Expand Up @@ -33,20 +34,63 @@ def register_notification(self, interval: int, threshold: int) -> bool:

class NotificationSummary(BaseModel):
message_id: Optional[str] = None # identifier of the summary message
start_ts: float = Field(default_factory=lambda: time.time()) # Timestamp of the first notification
start_ts: float = None
end_ts: float = None
# Keys for the table are determined by grouping.notification_mode.summary.by
summary_table: DefaultDict[KeyT, List[int]] = None

def register_notification(self, summary_key: KeyT, resolved: bool, interval: int):
now_ts = time.time()
def register_notification(self, summary_key: KeyT, resolved: bool, interval: int, aligned: bool):
now_dt = datetime.now()
now_ts = int(now_dt.timestamp())
idx = 1 if resolved else 0
if now_ts - self.start_ts > interval or not self.summary_table:
# Expired or the first summary ever for this group_key, reset the data
if not self.end_ts or now_ts > self.end_ts:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if not self.summary_table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If end_ts is not set it means that the whole object is not initialized, and so summary_table is also not set. I basically simplified the condition here.

# Group expired or the first summary ever for this group_key, reset the data
self.summary_table = defaultdict(lambda: [0, 0])
self.start_ts = now_ts
self.start_ts, self.end_ts = self.calculate_interval_boundaries(interval, aligned, now_dt)
self.message_id = None
self.summary_table[summary_key][idx] += 1

@classmethod
def calculate_interval_boundaries(cls, interval: int, aligned: bool, now_dt: datetime) -> Tuple[float, float]:
now_ts = int(now_dt.timestamp())
if aligned:
# This handles leap seconds by adjusting the length of the last interval in the
# day to the actual end of day. Note leap seconds are expected to almost always be +1,
# but it's also expected that some -1's will appear in the (far) future, and it's
# not out of the realm of possibility that somewhat larger adjustments will happen
# before the leap second adjustment is phased out around 2035.

start_of_this_day_ts, end_of_this_day_ts = cls.get_day_boundaries(now_dt)
start_ts = now_ts - (now_ts - start_of_this_day_ts) % interval
end_ts = start_ts + interval
if (
end_ts > end_of_this_day_ts # negative leap seconds
or end_of_this_day_ts - end_ts < interval # positive leap seconds
):
end_ts = end_of_this_day_ts
else:
start_ts = now_ts
end_ts = now_ts + interval
return start_ts, end_ts

@staticmethod
def get_day_boundaries(now_dt: datetime) -> Tuple[int, int]:
# Note: we assume day boundaries according to the timezone configured on the pod
# running Robusta runner. A caveat of this is that Slack will show times according
# to the client's timezone, which may differ.

start_of_this_day = now_dt.replace(hour=0, minute=0, second=0, microsecond=0)
start_of_this_day_ts = int(start_of_this_day.timestamp())
try:
end_of_this_day = start_of_this_day.replace(day=start_of_this_day.day + 1)
except ValueError: # end of month
try:
end_of_this_day = start_of_this_day.replace(month=start_of_this_day.month + 1, day=1)
except ValueError: # end of year
end_of_this_day = start_of_this_day.replace(year=start_of_this_day.year + 1, month=1, day=1)
end_of_this_day_ts = int(end_of_this_day.timestamp())
return start_of_this_day_ts, end_of_this_day_ts


class SinkBase(ABC):
grouping_enabled: bool
Expand Down
15 changes: 12 additions & 3 deletions src/robusta/core/sinks/sink_base_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,21 @@ def validate_exactly_one_defined(cls, values: Dict):
class GroupingParams(BaseModel):
group_by: GroupingAttributeSelectorListT = ["cluster"]
interval: int = 15*60 # in seconds
aligned: bool = False
notification_mode: Optional[NotificationModeParams]

@root_validator
def validate_notification_mode(cls, values: Dict):
if values is None:
return {"summary": SummaryNotificationModeParams()}
def validate_interval_alignment(cls, values: Dict):
if values["aligned"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if values is none? wont this throw? i would handle that case first

Copy link
Contributor Author

@RobertSzefler RobertSzefler Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's marked as required, so the only possible values should be True and False. If somehow it's not set (should not be possible but can be forced), I think it should indeed throw an exception. If somehow it's set to None (again, should not be possible..) it will be evaluated to False, which I think is sensible here.

if values["interval"] < 24 * 3600:
if (24 * 3600) % values["interval"]:
Copy link
Contributor

@Avi-Robusta Avi-Robusta Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i understand, we allow the interval to be seconds but we dont support seconds, shouln't we change it to minutes or have it round up to the nearest minute instead of failing ? (inorder to support backward compatibility)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be no backwards compatibility problems, as prior users don't use the interval aligning mechanism (it's not been released yet).

Also, we support aligning on thresholds shorter than a minutes here without problem. For example, if the user specs interval as 15 seconds, the intervals will be automatically aligned at x:00, x:15, x:30, x:45 (for consecutive minutes x).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

raise ValueError(f'Unable to properly align time interval of {values["interval"]} seconds')
else:
# TODO do we also want to support automatically aligning intervals longer than
# a day? Using month/year boundaries? This would require additionally handling
# leap years and daytime saving, just as we handle leap seconds in
# NotificationSummary.calculate_interval_boundaries
raise ValueError(f"Automatically aligning time intervals longer than 24 hours is not supported")
return values


Expand Down
8 changes: 3 additions & 5 deletions src/robusta/core/sinks/slack/slack_sink.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from robusta.core.model.env_vars import ROBUSTA_UI_DOMAIN
from robusta.core.reporting.base import Finding, FindingStatus
from robusta.core.sinks.sink_base import NotificationGroup, NotificationSummary, SinkBase
from robusta.core.reporting.base import Finding
from robusta.core.sinks.sink_base import SinkBase
from robusta.core.sinks.slack.slack_sink_params import SlackSinkConfigWrapper, SlackSinkParams
from robusta.integrations import slack as slack_module

Expand Down Expand Up @@ -37,7 +37,6 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool)
finding_data["cluster"] = self.cluster_name
resolved = finding.title.startswith("[RESOLVED]")

# 1. Notification accounting
group_key, group_header = self.get_group_key_and_header(
finding_data, self.params.grouping.group_by
)
Expand All @@ -48,7 +47,7 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool)
)
notification_summary = self.summaries[group_key]
notification_summary.register_notification(
summary_key, resolved, self.params.grouping.interval
summary_key, resolved, self.params.grouping.interval, self.params.grouping.aligned
)
slack_thread_ts = self.slack_sender.send_or_update_summary_message(
group_header,
Expand All @@ -75,6 +74,5 @@ def handle_notification_grouping(self, finding: Finding, platform_enabled: bool)
finding, self.params, platform_enabled, thread_ts=slack_thread_ts
)


def get_timeline_uri(self, account_id: str, cluster_name: str) -> str:
return f"{ROBUSTA_UI_DOMAIN}/graphs?account_id={account_id}&cluster={cluster_name}"
19 changes: 19 additions & 0 deletions tests/test_slack_grouping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pytest

from datetime import datetime, timezone

from robusta.core.sinks.sink_base import NotificationSummary


utc = timezone.utc


class TestNotificationSummary:
@pytest.mark.parametrize("input_dt,expected_output", [
(datetime(2024, 6, 25, 12, 15, 33, tzinfo=utc), (1719273600, 1719360000)),
(datetime(2024, 6, 30, 17, 22, 19, tzinfo=utc), (1719705600, 1719792000)),
(datetime(2024, 12, 3, 10, 59, 59, tzinfo=utc), (1733184000, 1733270400)),
(datetime(2024, 12, 31, 16, 42, 28, tzinfo=utc), (1735603200, 1735689600)),
])
def test_get_day_boundaries(self, input_dt, expected_output):
assert NotificationSummary.get_day_boundaries(input_dt) == expected_output
Loading