Skip to content

Commit

Permalink
2386 write failed message to disk (#2580)
Browse files Browse the repository at this point in the history
* Write Kafka persistence failures to file system

* Do not start if alternate storage misconfigured

* Utility to reproces messages written to file system

* BroadwayKafka, Dialyxer updates

* Docs and Changelog

* Run CircleCi tests as nonprivileged user

---------

Co-authored-by: Stuart Corbishley <[email protected]>
  • Loading branch information
rorymckinley and stuartc authored Oct 18, 2024
1 parent d3675e5 commit 0131ac5
Show file tree
Hide file tree
Showing 17 changed files with 1,245 additions and 50 deletions.
33 changes: 18 additions & 15 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ jobs:
- image: cimg/postgres:15.3
environment:
ERL_FLAGS: +S 4:4
working_directory: /home/lightning/project

steps:
- run: adduser --home /home/lightning --system lightning
- checkout
- install_node:
version: << parameters.nodejs_version >>
Expand All @@ -68,14 +70,15 @@ jobs:
- v1-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }}
- v1-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }}
- run:
name: "Install libsodium"
name: "Install libsodium and sudo"
command: |
apt-get update && apt-get install -y libsodium-dev
- run: mix local.hex --force && mix local.rebar --force
- run: cd assets && npm install
- run: MIX_ENV=test mix do deps.get --only test, deps.compile, compile
- run: MIX_ENV=test mix lightning.install_runtime
apt-get update && apt-get install -y libsodium-dev sudo
- run: chown -R lightning /home/lightning
- run: sudo -u lightning mix local.hex --force && mix local.rebar --force
- run: cd assets; sudo -u lightning npm install
- run: sudo -u lightning env MIX_ENV=test mix do deps.get --only test, deps.compile, compile
- run: sudo -u lightning env MIX_ENV=test mix lightning.install_runtime

- save_cache:
key: v1-deps-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }}
Expand All @@ -90,7 +93,7 @@ jobs:
- v1-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }}
- v1-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }}

- run: MIX_ENV=test mix dialyzer --plt
- run: sudo -u lightning env MIX_ENV=test mix dialyzer --plt
- save_cache:
key: v1-plt-{{ arch }}-{{ checksum ".elixir_otp_version" }}-{{ checksum "mix.lock" }}
paths:
Expand All @@ -104,30 +107,30 @@ workflows:
- build:
name: "Check code formatting"
execute:
- run: MIX_ENV=test mix format --check-formatted
- run: sudo -u lightning env MIX_ENV=test mix format --check-formatted
- build:
name: "Check code style"
execute:
- run: MIX_ENV=test mix credo --strict --all
- run: sudo -u lightning env MIX_ENV=test mix credo --strict --all
- build:
name: "Type check"
execute:
- run: MIX_ENV=test mix dialyzer
- run: sudo -u lightning env MIX_ENV=test mix dialyzer
- build:
name: "Check for security vulnerabilities"
execute:
- run: MIX_ENV=test mix sobelow
- run: sudo -u lightning env MIX_ENV=test mix sobelow
- build:
name: "Check Elixir tests (codecov)"
execute:
- run: MIX_ENV=test mix do ecto.create, ecto.migrate
- run: sudo -u lightning env MIX_ENV=test mix do ecto.create, ecto.migrate
- run:
command: MIX_ENV=test mix coveralls.json -o ./test/reports
command: sudo -u lightning env MIX_ENV=test mix coveralls.json -o ./test/reports
- codecov/upload:
file: test/reports/excoveralls.json
- store_test_results:
path: test/reports/
- build:
name: "Check Javascript tests"
execute:
- run: cd assets && npm install && npm run test-report
- run: cd assets; sudo -u lightning npm install && npm run test-report
19 changes: 16 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@
# 1 message every 10 seconds.
# KAFKA_NUMBER_OF_MESSAGES_PER_SECOND=1
#
# The number of Broadway processors that are downstream of the Kafka consumer.
# The number of Broadway processors that are downstream of the Kafka consumer.
# Increasing this number increases the concurrency with which messages are
# processed by the system, once received from the Kafka consumer.
# KAFKA_NUMBER_OF_PROCESSORS=1
Expand All @@ -248,8 +248,21 @@
# time.
# KAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS=3600
#
# Under certain failure conditions, the Kafka pipeline will send an email
# Under certain failure conditions, the Kafka pipeline will send an email
# notification. To prevent flooding the recipients, it will wait for a period
# before it sends the next email (assuming the failure condition persists).
# before it sends the next email (assuming the failure condition persists).
# Changing this setting will affect the frequency of sending.
# KAFKA_NOTIFICATTION_EMBARGO_SECONDS=3600
#
# If the Kafka pipelines failed to persist a message, the message can be
# persisted as JSON to the local file system. To enable this, set
# KAFKA_ALTERNATE_STORAGE_ENABLED to 'yes'. Note if you choose to enable
# this you will also need to set KAFKA_ALTERNATE_STORAGE_FILE_PATH. For more
# details, please refer to the Kafka section of DEPLOYMENT.md.
# KAFKA_ALTERNATE_STORAGE_ENABLED=no
#
# The directory to which failed Kafka messages will be persisted if
# KAFKA_ALTERNATE_STORAGE_ENABLED is enabled. A per-workflow subdirectory
# will be created within this directory. The directory must exist when
# Lightning starts and it must be writable by the user that Lightning runs as.
# KAFKA_ALTERNATE_STORAGE_FILE_PATH=/path/to/alternate/storage
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ and this project adheres to

### Added

- Optionally write Kafka messages that can not be persisted to the file system.
[#2386](https://github.com/OpenFn/lightning/issues/2386)
- Add `MessageRecovery` utility code to restore Kafka messages that were
pesisted to the file system.
[#2386](https://github.com/OpenFn/lightning/issues/2386)
- Projects page welcome section: allow users to learn how to use the app thru
Arcade videos [#2563](https://github.com/OpenFn/lightning/issues/2563)
- Store user preferences in database
Expand Down
37 changes: 37 additions & 0 deletions DEPLOYMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,43 @@ to ensure that Lightning does not flood the recipients with email. The length
of the embargo is controlled by the `KAFKA_NOTIFICATION_EMBARGO_SECONDS` ENV
variable.

#### Persisting Failed Messages

If a Kafka message files to be persisted as a WorkOrder, Run and Dataclip, the
option exists to write the failed message to a location on the local file system.
If this option is enabled by setting `KAFKA_ALTERNATE_STORAGE_ENABLED`, then the
`KAFKA_ALTERNATE_STORAGE_PATH` ENV variable must be set to the path that exists
and is writable by Lightning. The location shoudl also be suitably protected to
prevent data exposure as Lightning **will not encrypt** the message contents when
writing it.

If the option is enabled and a message fails to be persisted, Lightning will
create a subdirectory named with the id if the affected trigger's workflow
in the location specified by `KAFKA_ALTERNATE_STORAGE_PATH` (assuming such a
subdirectory does not already exist). Lightning will serialise the message
headers and data as received by the Kafka pipeline and write this to a file
within the subdirectory. The file will be named based on the pattern
`<trigger_id>_<message_topic>_<message_partition>_<message_offset>.json`.

To recover the persisted messages, it is suggested that the affected triggers
be disabled before commencing. Once this is done, the following code needs to
be run from an IEx console on each node that is running Lightning:

```elixir
Lightning.KafkaTriggers.MessageRecovery.recover_messages(
Lightning.Config.kafka_alternate_storage_file_path()
)
```

Further details regarding the behaviour of `MessageRecovery.recover_messages/1`
can be found in the module documentation of `MessageRecovery`. Recovered
messages will have the `.json` extension modified to `.json.recovered` but they
will be left in place. Future recovery runs will not process files that have
been marked as recovered.

Once all files have either been recovered or discarded, the triggers can be
enabled once more.

### Google Oauth2

Using your Google Cloud account, provision a new OAuth 2.0 Client with the 'Web
Expand Down
20 changes: 20 additions & 0 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ defmodule Lightning.Config do
kafka_trigger_config() |> Keyword.get(:enabled, false)
end

@impl true
def kafka_alternate_storage_enabled? do
kafka_trigger_config() |> Keyword.get(:alternate_storage_enabled)
end

@impl true
def kafka_alternate_storage_file_path do
kafka_trigger_config() |> Keyword.get(:alternate_storage_file_path)
end

@impl true
def kafka_duplicate_tracking_retention_seconds do
kafka_trigger_config()
Expand Down Expand Up @@ -188,6 +198,8 @@ defmodule Lightning.Config do
@callback google(key :: atom()) :: any()
@callback grace_period() :: integer()
@callback instance_admin_email() :: String.t()
@callback kafka_alternate_storage_enabled?() :: boolean()
@callback kafka_alternate_storage_file_path() :: String.t()
@callback kafka_duplicate_tracking_retention_seconds() :: integer()
@callback kafka_notification_embargo_seconds() :: integer()
@callback kafka_number_of_consumers() :: integer()
Expand Down Expand Up @@ -308,6 +320,14 @@ defmodule Lightning.Config do
impl().kafka_triggers_enabled?()
end

def kafka_alternate_storage_enabled? do
impl().kafka_alternate_storage_enabled?()
end

def kafka_alternate_storage_file_path do
impl().kafka_alternate_storage_file_path()
end

def kafka_duplicate_tracking_retention_seconds do
impl().kafka_duplicate_tracking_retention_seconds()
end
Expand Down
32 changes: 32 additions & 0 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,38 @@ defmodule Lightning.Config.Bootstrap do
daily_batch_size: env!("USAGE_TRACKING_DAILY_BATCH_SIZE", :integer, 10)

config :lightning, :kafka_triggers,
alternate_storage_enabled:
env!(
"KAFKA_ALTERNATE_STORAGE_ENABLED",
&Utils.ensure_boolean/1,
false
)
|> tap(fn enabled ->
if enabled do
touch_result =
env!("KAFKA_ALTERNATE_STORAGE_FILE_PATH", :string, nil)
|> to_string()
|> then(fn path ->
if File.exists?(path) do
path
|> Path.join(".lightning_storage_check")
|> File.touch()
else
:error
end
end)

unless touch_result == :ok do
raise """
KAFKA_ALTERNATE_STORAGE_ENABLED is set to yes/true.
KAFKA_ALTERNATE_STORAGE_FILE_PATH must be a writable directory.
"""
end
end
end),
alternate_storage_file_path:
env!("KAFKA_ALTERNATE_STORAGE_FILE_PATH", :string, nil),
duplicate_tracking_retention_seconds:
env!("KAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS", :integer, 3600),
enabled: env!("KAFKA_TRIGGERS_ENABLED", &Utils.ensure_boolean/1, false),
Expand Down
74 changes: 74 additions & 0 deletions lib/lightning/kafka_triggers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Lightning.KafkaTriggers do

alias Ecto.Changeset
alias Lightning.Accounts.UserNotifier
alias Lightning.KafkaTriggers.MessageHandling
alias Lightning.Projects
alias Lightning.Repo
alias Lightning.Workflows.Trigger
Expand Down Expand Up @@ -247,4 +248,77 @@ defmodule Lightning.KafkaTriggers do

DateTime.diff(sending_at, last_sent_at, :second) > embargo_period
end

def maybe_write_to_alternate_storage(trigger_id, %Broadway.Message{} = msg) do
if Lightning.Config.kafka_alternate_storage_enabled?() do
with {:ok, workflow_path} <- build_workflow_storage_path(trigger_id),
:ok <- create_workflow_storage_directory(workflow_path),
path <- build_file_path(workflow_path, trigger_id, msg),
{:ok, data} <- encode_message(msg) do
write_to_file(path, data)
else
error ->
error
end
else
:ok
end
end

defp build_workflow_storage_path(trigger_id) do
with base_path <- Lightning.Config.kafka_alternate_storage_file_path(),
true <- base_path |> to_string() |> File.exists?(),
%{workflow_id: workflow_id} <- Trigger |> Repo.get(trigger_id) do
{:ok, Path.join(base_path, workflow_id)}
else
_anything ->
{:error, :path_error}
end
end

defp create_workflow_storage_directory(workflow_path) do
case File.mkdir(workflow_path) do
resp when resp == :ok or resp == {:error, :eexist} ->
:ok

_anything_else ->
{:error, :workflow_dir_error}
end
end

defp build_file_path(workflow_path, trigger_id, message) do
workflow_path |> Path.join(alternate_storage_file_name(trigger_id, message))
end

def alternate_storage_file_name(trigger_id, message) do
"#{trigger_id}_#{build_topic_partition_offset(message)}.json"
end

defp encode_message(message) do
message
|> Map.filter(fn {key, _val} -> key in [:data, :metadata] end)
|> then(fn %{metadata: metadata} = message_export ->
message_export
|> Map.put(
:metadata,
MessageHandling.convert_headers_for_serialisation(metadata)
)
end)
|> Jason.encode()
|> case do
{:error, _reason} ->
{:error, :serialisation}

ok_response ->
ok_response
end
end

defp write_to_file(path, data) do
if File.write(path, data) == :ok do
:ok
else
{:error, :writing}
end
end
end
16 changes: 13 additions & 3 deletions lib/lightning/kafka_triggers/message_handling.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Lightning.KafkaTriggers.MessageHandling do
%{data: data, metadata: metadata} = message
%{workflow: workflow} = trigger

request = metadata |> convert_for_serialisation()
request = metadata |> convert_headers_for_serialisation()

with {:ok, body} <- Jason.decode(data),
true <- is_map(body),
Expand Down Expand Up @@ -75,7 +75,17 @@ defmodule Lightning.KafkaTriggers.MessageHandling do
end
end

defp convert_for_serialisation(%{headers: headers} = metadata) do
Map.put(metadata, :headers, Enum.map(headers, &Tuple.to_list/1))
def convert_headers_for_serialisation(%{headers: headers} = metadata) do
converted_headers =
headers
|> Enum.map(fn
{key, value} ->
[key, value]

[key, value] ->
[key, value]
end)

Map.put(metadata, :headers, converted_headers)
end
end
Loading

0 comments on commit 0131ac5

Please sign in to comment.