Skip to content

Commit

Permalink
Notification for Kafka persistence failures. (#2568)
Browse files Browse the repository at this point in the history
* Notification for Kafka persistence failures.

* Remove use of legacy route helpers and endpoint argument

* Use retry in favour of dynamically_absorb_delay

---------

Co-authored-by: Stuart Corbishley <[email protected]>
  • Loading branch information
rorymckinley and stuartc authored Oct 14, 2024
1 parent c0dea80 commit 8c9b9b0
Show file tree
Hide file tree
Showing 20 changed files with 714 additions and 38 deletions.
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,9 @@
# the cost of increased database storage and eventually increased processing
# time.
# KAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS=3600
#
# Under certain failure conditions, the Kafka pipeline will send an email
# notification. To prevent flooding the recipients, it will wait for a period
# before it sends the next email (assuming the failure condition persists).
# Changing this setting will affect the frequency of sending.
# KAFKA_NOTIFICATTION_EMBARGO_SECONDS=3600
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ and this project adheres to

### Added

- Notify users when a Kafka trigger can not persist a message to the database.
[#2386](https://github.com/OpenFn/lightning/issues/2386)

### Changed

### Fixed
Expand Down
8 changes: 8 additions & 0 deletions DEPLOYMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ It is recommended that you check the value of the `offsets.retention.minutes` fo
the Kafka cluster to determine what the cluster's retention period is, and
consider this when disabling a Kafka trigger for an extended period.

#### Failure notifications

Under certain failure conditions, a Kafka trigger will send an email to certain
user that are associated with a project. After each email an embargo is applied
to ensure that Lightning does not flood the recipients with email. The length
of the embargo is controlled by the `KAFKA_NOTIFICATION_EMBARGO_SECONDS` ENV
variable.

### Google Oauth2

Using your Google Cloud account, provision a new OAuth 2.0 Client with the 'Web
Expand Down
37 changes: 22 additions & 15 deletions lib/lightning/accounts/user_notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule Lightning.Accounts.UserNotifier do
Welcome to OpenFn. Please confirm your account by visiting the URL below:
#{url(LightningWeb.Endpoint, ~p"/users/confirm/#{token}")}
#{url(~p"/users/confirm/#{token}")}
If you didn't create an account with us, please ignore this.
Expand All @@ -80,7 +80,7 @@ defmodule Lightning.Accounts.UserNotifier do
#{enroller.first_name} has just created an OpenFn account for you. You can complete your registration by visiting the URL below:
#{url(LightningWeb.Endpoint, ~p"/users/confirm/#{token}")}
#{url(~p"/users/confirm/#{token}")}
If you think this account was created by mistake, you can contact #{enroller.first_name} (#{enroller.email}) or ignore this email.
Expand All @@ -94,7 +94,7 @@ defmodule Lightning.Accounts.UserNotifier do
Please confirm your OpenFn account by clicking on the URL below:
#{url(LightningWeb.Endpoint, ~p"/users/confirm/#{token}")}
#{url(~p"/users/confirm/#{token}")}
If you have not requested an account confirmation email, please ignore this.
Expand All @@ -107,14 +107,15 @@ defmodule Lightning.Accounts.UserNotifier do
"""
def deliver_project_addition_notification(user, project) do
role = Projects.get_project_user_role(user, project) |> Atom.to_string()
url = LightningWeb.RouteHelpers.project_dashboard_url(project.id)

deliver(user, "You now have access to \"#{project.name}\"", """
Hi #{user.first_name},
You've been granted "#{role}" access to the "#{project.name}" project on OpenFn.
Visit the URL below to check it out:\n\n#{url}
Visit the URL below to check it out:
#{url(~p"/projects/#{project}/w")}
OpenFn
""")
Expand Down Expand Up @@ -145,12 +146,6 @@ defmodule Lightning.Accounts.UserNotifier do

io_data_saved = updated_project.retention_policy != :erase_all

settings_page_url =
url(
LightningWeb.Endpoint,
~p"/projects/#{updated_project.id}/settings#data-storage"
)

deliver(
user,
"The data retention policy for #{updated_project.name} has been modified",
Expand All @@ -165,7 +160,7 @@ defmodule Lightning.Accounts.UserNotifier do
This policy can be changed by owners and administrators. If you haven't approved this change, please reset the policy by visiting the URL below:
#{settings_page_url}
#{url(~p"/projects/#{updated_project.id}/settings#data-storage")}
OpenFn
"""
Expand Down Expand Up @@ -279,7 +274,7 @@ defmodule Lightning.Accounts.UserNotifier do
You history export started requested on #{Helpers.format_date(project_file.inserted_at)} is completed. Please visit this URL to download the file:acceptor
#{url(LightningWeb.Endpoint, ~p"/project_files/#{project_file.id}/download")}
#{url(~p"/project_files/#{project_file.id}/download")}
OpenFn
""")
Expand All @@ -294,7 +289,6 @@ defmodule Lightning.Accounts.UserNotifier do
})

url(
LightningWeb.Endpoint,
~p"/projects/#{workflow.project_id}/history?#{%{"filters" => uri_params}}"
)
end
Expand Down Expand Up @@ -387,14 +381,27 @@ defmodule Lightning.Accounts.UserNotifier do
#{inviter.first_name} has invited you to join project "#{project.name}" and granted you "#{role}" access. Since you don't have an OpenFn account yet, we've set one up for you.
Please click the link below to complete your account setup: #{url(LightningWeb.Endpoint, ~p"/users/reset_password/#{token}")}
Please click the link below to complete your account setup: #{url(~p"/users/reset_password/#{token}")}
If you did not request to join this project, please ignore this email.
OpenFn
""")
end

def send_trigger_failure_mail(user, workflow, timestamp) do
display_timestamp =
timestamp |> DateTime.truncate(:second) |> DateTime.to_iso8601()

deliver(user, "Kafka trigger failure on #{workflow.name}", """
As of #{display_timestamp}, the Kafka trigger associated with the workflow `#{workflow.name}` (#{url(~p"/projects/#{workflow.project_id}/w/#{workflow.id}")}) has failed to persist at least one message.
If you have access to the system logs, please look for entries containing 'Kafka Pipeline Error'.
OpenFn
""")
end

defp pluralize_with_s(1, string), do: string
defp pluralize_with_s(_integer, string), do: "#{string}s"
end
10 changes: 10 additions & 0 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ defmodule Lightning.Config do
|> Keyword.get(:duplicate_tracking_retention_seconds)
end

@impl true
def kafka_notification_embargo_seconds do
kafka_trigger_config() |> Keyword.get(:notification_embargo_seconds)
end

@impl true
def kafka_number_of_consumers do
kafka_trigger_config() |> Keyword.get(:number_of_consumers)
Expand Down Expand Up @@ -184,6 +189,7 @@ defmodule Lightning.Config do
@callback grace_period() :: integer()
@callback instance_admin_email() :: String.t()
@callback kafka_duplicate_tracking_retention_seconds() :: integer()
@callback kafka_notification_embargo_seconds() :: integer()
@callback kafka_number_of_consumers() :: integer()
@callback kafka_number_of_messages_per_second() :: float()
@callback kafka_number_of_processors() :: integer()
Expand Down Expand Up @@ -310,6 +316,10 @@ defmodule Lightning.Config do
impl().kafka_number_of_consumers()
end

def kafka_notification_embargo_seconds do
impl().kafka_notification_embargo_seconds()
end

def kafka_number_of_messages_per_second do
impl().kafka_number_of_messages_per_second()
end
Expand Down
2 changes: 2 additions & 0 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ defmodule Lightning.Config.Bootstrap do
duplicate_tracking_retention_seconds:
env!("KAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS", :integer, 3600),
enabled: env!("KAFKA_TRIGGERS_ENABLED", &Utils.ensure_boolean/1, false),
notification_embargo_seconds:
env!("KAFKA_NOTIFICATION_EMBARGO_SECONDS", :integer, 3600),
number_of_consumers: env!("KAFKA_NUMBER_OF_CONSUMERS", :integer, 1),
number_of_messages_per_second:
env!("KAFKA_NUMBER_OF_MESSAGES_PER_SECOND", :float, 1),
Expand Down
56 changes: 56 additions & 0 deletions lib/lightning/kafka_triggers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ defmodule Lightning.KafkaTriggers do
import Ecto.Query

alias Ecto.Changeset
alias Lightning.Accounts.UserNotifier
alias Lightning.Projects
alias Lightning.Repo
alias Lightning.Workflows.Trigger
alias Lightning.Workflows.Triggers.Events

def start_triggers do
if supervisor = GenServer.whereis(:kafka_pipeline_supervisor) do
Expand Down Expand Up @@ -191,4 +194,57 @@ defmodule Lightning.KafkaTriggers do

%{interval: 10_000, messages_per_interval: messages_per_interval}
end

def notify_users_of_trigger_failure(trigger_id) do
now = DateTime.utc_now()

if send_notification?(now, last_notification_sent_at(trigger_id)) do
notify_users(trigger_id, now)

track_notification_sent(trigger_id, now)

notify_any_other_nodes(trigger_id, now)
end
end

defp last_notification_sent_at(trigger_id) do
:persistent_term.get(failure_notification_tracking_key(trigger_id), nil)
end

def failure_notification_tracking_key(trigger_id) do
{:kafka_trigger_failure_notification_sent_at, trigger_id}
end

defp notify_users(trigger_id, timestamp) do
%{workflow: workflow} =
Trigger
|> Repo.get(trigger_id)
|> Repo.preload(:workflow)

workflow.project_id
|> Projects.find_users_to_notify_of_trigger_failure()
|> Enum.each(fn user ->
UserNotifier.send_trigger_failure_mail(user, workflow, timestamp)
end)
end

defp track_notification_sent(trigger_id, sent_at) do
:persistent_term.put(
failure_notification_tracking_key(trigger_id),
sent_at
)
end

defp notify_any_other_nodes(trigger_id, sent_at) do
Events.kafka_trigger_notification_sent(trigger_id, sent_at)
end

def send_notification?(_sending_at, nil), do: true

def send_notification?(sending_at, last_sent_at) do
embargo_period =
Lightning.Config.kafka_notification_embargo_seconds()

DateTime.diff(sending_at, last_sent_at, :second) > embargo_period
end
end
16 changes: 15 additions & 1 deletion lib/lightning/kafka_triggers/event_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ defmodule Lightning.KafkaTriggers.EventListener do

alias Lightning.KafkaTriggers
alias Lightning.Workflows.Triggers.Events
alias Lightning.Workflows.Triggers.Events.KafkaTriggerNotificationSent
alias Lightning.Workflows.Triggers.Events.KafkaTriggerUpdated

def start_link(_opts) do
GenServer.start_link(__MODULE__, [], name: :kafka_event_listener)
Expand All @@ -20,14 +22,26 @@ defmodule Lightning.KafkaTriggers.EventListener do
end

@impl true
def handle_info(%Events.KafkaTriggerUpdated{trigger_id: trigger_id}, state) do
def handle_info(%KafkaTriggerUpdated{trigger_id: trigger_id}, state) do
if supervisor = GenServer.whereis(:kafka_pipeline_supervisor) do
supervisor |> KafkaTriggers.update_pipeline(trigger_id)
end

{:noreply, state}
end

@impl true
def handle_info(%KafkaTriggerNotificationSent{} = notification, state) do
%{trigger_id: trigger_id, sent_at: sent_at} = notification

:persistent_term.put(
KafkaTriggers.failure_notification_tracking_key(trigger_id),
sent_at
)

{:noreply, state}
end

@impl true
def handle_info(_msg, state) do
{:noreply, state}
Expand Down
10 changes: 10 additions & 0 deletions lib/lightning/kafka_triggers/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,16 @@ defmodule Lightning.KafkaTriggers.Pipeline do

@impl true
def handle_failed(messages, context) do
trigger_id = context.trigger_id |> Atom.to_string()

messages
|> Enum.each(fn message ->
create_log_entry(message, context)
notify_sentry(message, context)
end)

maybe_notify_users(messages, trigger_id)

messages
end

Expand Down Expand Up @@ -232,4 +236,10 @@ defmodule Lightning.KafkaTriggers.Pipeline do
[{:sasl, sasl} | base_config]
end
end

def maybe_notify_users(messages, trigger_id) do
if messages |> Enum.any?(&(&1.status == {:failed, :persistence})) do
KafkaTriggers.notify_users_of_trigger_failure(trigger_id)
end
end
end
11 changes: 11 additions & 0 deletions lib/lightning/projects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -897,4 +897,15 @@ defmodule Lightning.Projects do
)
|> Repo.all()
end

def find_users_to_notify_of_trigger_failure(project_id) do
query =
from u in User,
join: pu in assoc(u, :project_users),
where:
pu.project_id == ^project_id and
(pu.role in ^[:admin, :owner] or u.role == ^:superuser)

query |> Repo.all()
end
end
12 changes: 12 additions & 0 deletions lib/lightning/workflows/triggers/events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ defmodule Lightning.Workflows.Triggers.Events do
defstruct trigger_id: nil
end

defmodule KafkaTriggerNotificationSent do
@moduledoc false
defstruct trigger_id: nil, sent_at: nil
end

def kafka_trigger_updated(trigger_id) do
Lightning.broadcast(
kafka_trigger_updated_topic(),
Expand All @@ -19,4 +24,11 @@ defmodule Lightning.Workflows.Triggers.Events do
end

def kafka_trigger_updated_topic, do: "kafka_trigger_updated"

def kafka_trigger_notification_sent(trigger_id, sent_at) do
Lightning.broadcast(
kafka_trigger_updated_topic(),
%KafkaTriggerNotificationSent{trigger_id: trigger_id, sent_at: sent_at}
)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ defmodule Lightning.MixProject do
{:postgrex, ">= 0.0.0"},
{:prom_ex, "~> 1.8.0"},
{:rambo, "~> 0.3.4"},
{:retry, "~> 0.18"},
{:scrivener, "~> 2.7"},
{:sentry, "~> 10.5.0"},
{:sobelow, "~> 0.13.0", only: [:test, :dev]},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
"rambo": {:hex, :rambo, "0.3.4", "8962ac3bd1a633ee9d0e8b44373c7913e3ce3d875b4151dcd060886092d2dce7", [:mix], [], "hexpm", "0cc54ed089fbbc84b65f4b8a774224ebfe60e5c80186fafc7910b3e379ad58f1"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"replug": {:hex, :replug, "0.1.0", "61d35f8c873c0078a23c49579a48f36e45789414b1ec0daee3fd5f4e34221f23", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "f71f7a57e944e854fe4946060c6964098e53958074c69fb844b96e0bd58cfa60"},
"retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"},
"scrivener": {:hex, :scrivener, "2.7.2", "1d913c965ec352650a7f864ad7fd8d80462f76a32f33d57d1e48bc5e9d40aba2", [:mix], [], "hexpm", "7866a0ec4d40274efbee1db8bead13a995ea4926ecd8203345af8f90d2b620d9"},
"sentry": {:hex, :sentry, "10.5.0", "d64a13a4f7564b3447afdd54fddbbf3ea0dc30aa6266706dff64c51d61f15063", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "375e2e3dee2bc42da2ebee88a9d8fc65ccd26451da2588b94c6b166eb869f147"},
"sleeplocks": {:hex, :sleeplocks, "1.1.2", "d45aa1c5513da48c888715e3381211c859af34bee9b8290490e10c90bb6ff0ca", [:rebar3], [], "hexpm", "9fe5d048c5b781d6305c1a3a0f40bb3dfc06f49bf40571f3d2d0c57eaa7f59a5"},
Expand Down
Loading

0 comments on commit 8c9b9b0

Please sign in to comment.