diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ef22d8e15..e92aeb897a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ and this project adheres to ### Fixed +- Fix issue with the persisting of a Kafka message with headers. + [#2402](https://github.com/OpenFn/lightning/issues/2402) - Protect against race conditions when updating partition timestamps for a Kafka trigger. [#2378](https://github.com/OpenFn/lightning/issues/2378) diff --git a/lib/lightning/kafka_triggers/message_handling.ex b/lib/lightning/kafka_triggers/message_handling.ex index c53db31c1c..2d3da83ccc 100644 --- a/lib/lightning/kafka_triggers/message_handling.ex +++ b/lib/lightning/kafka_triggers/message_handling.ex @@ -22,9 +22,11 @@ defmodule Lightning.KafkaTriggers.MessageHandling do end defp create_work_order(%Broadway.Message{} = message, trigger, multi) do - %{data: data, metadata: request} = message + %{data: data, metadata: metadata} = message %{workflow: workflow} = trigger + request = metadata |> convert_for_serialisation() + with {:ok, body} <- Jason.decode(data), true <- is_map(body), {:ok, without_run?} <- assess_workorder_creation(workflow.project_id), @@ -72,4 +74,8 @@ defmodule Lightning.KafkaTriggers.MessageHandling do {:error, message} end end + + defp convert_for_serialisation(%{headers: headers} = metadata) do + Map.put(metadata, :headers, Enum.map(headers, &Tuple.to_list/1)) + end end diff --git a/test/lightning/kafka_triggers/message_handling_test.exs b/test/lightning/kafka_triggers/message_handling_test.exs index 36c49f3e3d..673b130745 100644 --- a/test/lightning/kafka_triggers/message_handling_test.exs +++ b/test/lightning/kafka_triggers/message_handling_test.exs @@ -1,5 +1,5 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do - use LightningWeb.ConnCase, async: false + use LightningWeb.ConnCase, async: true import Lightning.Factories @@ -19,6 +19,15 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do setup do message = build_broadway_message() + message_with_headers = + build_broadway_message( + headers: [ + {"foo_header", "foo_value"}, + {"bar_header", "bar_value"}, + {"foo_header", "other_foo_value"} + ] + ) + trigger = insert(:trigger, type: :kafka) record_changeset = @@ -31,10 +40,15 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do Multi.new() |> Multi.insert(:record, record_changeset) - %{message: message, multi: multi, trigger: trigger} + %{ + message: message, + message_with_headers: message_with_headers, + multi: multi, + trigger: trigger + } end - test "creates the WorkOrder", %{ + test "creates the WorkOrder for a message without headers", %{ message: message, multi: multi, trigger: trigger @@ -57,7 +71,35 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do %{dataclip: dataclip} = created_workorder assert dataclip.body["data"] == %{"interesting" => "stuff"} - assert dataclip.body["request"] == message.metadata |> stringify_keys() + assert dataclip.body["request"] == message.metadata |> persisted_metadata() + assert dataclip.type == :kafka + assert dataclip.project_id == workflow.project_id + end + + test "creates the WorkOrder for a message with headers", %{ + message_with_headers: message, + multi: multi, + trigger: trigger + } do + %{workflow: workflow} = trigger + MessageHandling.persist_message(multi, trigger.id, message) + + created_workorder = + WorkOrder + |> Repo.one() + |> Repo.preload([ + :trigger, + :workflow, + dataclip: Invocation.Query.dataclip_with_body() + ]) + + assert created_workorder.trigger_id == trigger.id + assert created_workorder.workflow_id == workflow.id + assert created_workorder.state == :pending + + %{dataclip: dataclip} = created_workorder + assert dataclip.body["data"] == %{"interesting" => "stuff"} + assert dataclip.body["request"] == message.metadata |> persisted_metadata() assert dataclip.type == :kafka assert dataclip.project_id == workflow.project_id end @@ -235,6 +277,7 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do %{interesting: "stuff"} |> Jason.encode!() ) + headers = Keyword.get(opts, :headers, []) key = Keyword.get(opts, :key, "abc_123_def") offset = Keyword.get(opts, :offset, 11) @@ -244,7 +287,7 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do offset: offset, partition: 2, key: key, - headers: [], + headers: headers, ts: 1_715_164_718_283, topic: "bar_topic" }, @@ -255,6 +298,27 @@ defmodule Lightning.KafkaTriggers.MessageHandlingTest do status: :ok } end + + defp persisted_metadata(metadata) do + metadata + |> Enum.reduce(%{}, fn {key, val}, acc -> + persisted_value = + key + |> case do + :headers -> + val |> convert_list_of_tuples_to_list_of_lists() + + _ -> + val + end + + acc |> Map.put(Atom.to_string(key), persisted_value) + end) + end + + defp convert_list_of_tuples_to_list_of_lists(list) do + list |> Enum.map(fn {key, val} -> [key, val] end) + end end # Put this in a helper