Skip to content

Commit

Permalink
Correctly persist Kafka messages with headers (#2404)
Browse files Browse the repository at this point in the history
* Correctly persist Kafka messages with headers

---------

Co-authored-by: Stuart Corbishley <[email protected]>
  • Loading branch information
rorymckinley and stuartc authored Aug 23, 2024
1 parent a44af14 commit 4e08300
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ and this project adheres to

### Fixed

- Fix issue with the persisting of a Kafka message with headers.
[#2402](https://github.com/OpenFn/lightning/issues/2402)
- Protect against race conditions when updating partition timestamps for a
Kafka trigger. [#2378](https://github.com/OpenFn/lightning/issues/2378)

Expand Down
8 changes: 7 additions & 1 deletion lib/lightning/kafka_triggers/message_handling.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ defmodule Lightning.KafkaTriggers.MessageHandling do
end

defp create_work_order(%Broadway.Message{} = message, trigger, multi) do
%{data: data, metadata: request} = message
%{data: data, metadata: metadata} = message
%{workflow: workflow} = trigger

request = metadata |> convert_for_serialisation()

with {:ok, body} <- Jason.decode(data),
true <- is_map(body),
{:ok, without_run?} <- assess_workorder_creation(workflow.project_id),
Expand Down Expand Up @@ -72,4 +74,8 @@ defmodule Lightning.KafkaTriggers.MessageHandling do
{:error, message}
end
end

defp convert_for_serialisation(%{headers: headers} = metadata) do
Map.put(metadata, :headers, Enum.map(headers, &Tuple.to_list/1))
end
end
74 changes: 69 additions & 5 deletions test/lightning/kafka_triggers/message_handling_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Lightning.KafkaTriggers.MessageHandlingTest do
use LightningWeb.ConnCase, async: false
use LightningWeb.ConnCase, async: true

import Lightning.Factories

Expand All @@ -19,6 +19,15 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do
setup do
message = build_broadway_message()

message_with_headers =
build_broadway_message(
headers: [
{"foo_header", "foo_value"},
{"bar_header", "bar_value"},
{"foo_header", "other_foo_value"}
]
)

trigger = insert(:trigger, type: :kafka)

record_changeset =
Expand All @@ -31,10 +40,15 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do
Multi.new()
|> Multi.insert(:record, record_changeset)

%{message: message, multi: multi, trigger: trigger}
%{
message: message,
message_with_headers: message_with_headers,
multi: multi,
trigger: trigger
}
end

test "creates the WorkOrder", %{
test "creates the WorkOrder for a message without headers", %{
message: message,
multi: multi,
trigger: trigger
Expand All @@ -57,7 +71,35 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do

%{dataclip: dataclip} = created_workorder
assert dataclip.body["data"] == %{"interesting" => "stuff"}
assert dataclip.body["request"] == message.metadata |> stringify_keys()
assert dataclip.body["request"] == message.metadata |> persisted_metadata()
assert dataclip.type == :kafka
assert dataclip.project_id == workflow.project_id
end

test "creates the WorkOrder for a message with headers", %{
message_with_headers: message,
multi: multi,
trigger: trigger
} do
%{workflow: workflow} = trigger
MessageHandling.persist_message(multi, trigger.id, message)

created_workorder =
WorkOrder
|> Repo.one()
|> Repo.preload([
:trigger,
:workflow,
dataclip: Invocation.Query.dataclip_with_body()
])

assert created_workorder.trigger_id == trigger.id
assert created_workorder.workflow_id == workflow.id
assert created_workorder.state == :pending

%{dataclip: dataclip} = created_workorder
assert dataclip.body["data"] == %{"interesting" => "stuff"}
assert dataclip.body["request"] == message.metadata |> persisted_metadata()
assert dataclip.type == :kafka
assert dataclip.project_id == workflow.project_id
end
Expand Down Expand Up @@ -235,6 +277,7 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do
%{interesting: "stuff"} |> Jason.encode!()
)

headers = Keyword.get(opts, :headers, [])
key = Keyword.get(opts, :key, "abc_123_def")
offset = Keyword.get(opts, :offset, 11)

Expand All @@ -244,7 +287,7 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do
offset: offset,
partition: 2,
key: key,
headers: [],
headers: headers,
ts: 1_715_164_718_283,
topic: "bar_topic"
},
Expand All @@ -255,6 +298,27 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do
status: :ok
}
end

defp persisted_metadata(metadata) do
metadata
|> Enum.reduce(%{}, fn {key, val}, acc ->
persisted_value =
key
|> case do
:headers ->
val |> convert_list_of_tuples_to_list_of_lists()

_ ->
val
end

acc |> Map.put(Atom.to_string(key), persisted_value)
end)
end

defp convert_list_of_tuples_to_list_of_lists(list) do
list |> Enum.map(fn {key, val} -> [key, val] end)
end
end

# Put this in a helper
Expand Down

0 comments on commit 4e08300

Please sign in to comment.