diff --git a/ex/lib/logflare_ex.ex b/ex/lib/logflare_ex.ex index d5b4669..31f1152 100644 --- a/ex/lib/logflare_ex.ex +++ b/ex/lib/logflare_ex.ex @@ -60,7 +60,22 @@ defmodule LogflareEx do def send_events(%Client{source_token: nil, source_name: nil}, _batch), do: {:error, :no_source} def send_events(client, [%{} | _] = batch) do - body = Bertex.encode(%{"batch" => batch, "source" => client.source_token}) + on_prepare_payload = Map.get(client, :on_prepare_payload) + + prepared_batch = + if on_prepare_payload do + Enum.map(batch, fn event -> + case on_prepare_payload do + {m, f, 1} -> apply(m, f, [event]) + cb when is_function(cb) -> cb.(event) + _ -> event + end + end) + else + batch + end + + body = Bertex.encode(%{"batch" => prepared_batch, "source" => client.source_token}) case Tesla.post(client.tesla_client, "/api/logs", body) do {:ok, %Tesla.Env{status: status, body: body}} when status < 300 -> diff --git a/ex/lib/logflare_ex/client.ex b/ex/lib/logflare_ex/client.ex index 6ac3381..390e951 100644 --- a/ex/lib/logflare_ex/client.ex +++ b/ex/lib/logflare_ex/client.ex @@ -41,6 +41,7 @@ defmodule LogflareEx.Client do - `:source_token`: Source UUID. Mutually exclusive with `:source_name` - `:source_name`: Source name. Mutually exclusive with `:source_token` - `:on_error`: mfa callback for handling API errors. Must be 1 arity. + - `:on_prepare_payload`: mfa callback or anonymous function for preparing the final payload before sending to API. Must be 1 arity. - `:auto_flush`: Used for batching. Enables automatic flushing. If disabled, `LogflareEx.flush/1` must be called. - `:flush_interval`: Used for batching. Flushes cached events at the provided interval. - `:batch_size`: Used for batching. It is the maximum number of events send per API request. @@ -61,6 +62,7 @@ defmodule LogflareEx.Client do field(:source_token, String.t()) field(:source_name, String.t()) field(:on_error, list() | mfa(), default: nil) + field(:on_prepare_payload, list() | mfa(), default: nil) # batching field(:auto_flush, :boolean, default: true) field(:flush_interval, non_neg_integer(), default: @default_flush_interval) @@ -79,6 +81,7 @@ defmodule LogflareEx.Client do source_name: get_config_value(:source_name), tesla_client: nil, on_error: get_config_value(:on_error), + on_prepare_payload: get_config_value(:on_prepare_payload), flush_interval: get_config_value(:flush_interval) || @default_flush_interval, batch_size: get_config_value(:batch_size) || @default_batch_size }) diff --git a/ex/lib/logflare_ex/telemetry_reporter.ex b/ex/lib/logflare_ex/telemetry_reporter.ex index f34ddbc..76c57d2 100644 --- a/ex/lib/logflare_ex/telemetry_reporter.ex +++ b/ex/lib/logflare_ex/telemetry_reporter.ex @@ -60,14 +60,16 @@ defmodule LogflareEx.TelemetryReporter do def handle_attach(event, measurements, metadata, config) when is_list(config) do # merge configuration config_file_opts = (Application.get_env(:logflare_ex, __MODULE__) || []) |> Map.new() - opts = Enum.into(config, config_file_opts) + + opts = + Enum.into(config, config_file_opts) payload = %{metadata: metadata, measurements: measurements} to_include = Map.get(opts, :include, []) filtered_payload = for path <- to_include, - String.starts_with?(path, "measurements.") or String.starts_with?(path, "metadata."), + String.starts_with?(path, "measurements") or String.starts_with?(path, "metadata"), reduce: %{} do acc -> put_path(acc, path, get_path(payload, path)) end diff --git a/ex/test/logflare_ex_test.exs b/ex/test/logflare_ex_test.exs index 3578034..a713cc1 100644 --- a/ex/test/logflare_ex_test.exs +++ b/ex/test/logflare_ex_test.exs @@ -85,6 +85,38 @@ defmodule LogflareExTest do end end + describe "on_prepare_payload" do + test "triggered before payload is sent" do + pid = self() + + Tesla + |> expect(:post, 3, fn _client, _path, body -> + %{"batch" => [event]} = Bertex.decode(body) + send(pid, {event.ref, event}) + {:ok, %Tesla.Env{status: 500, body: "some server error"}} + end) + + LogflareEx.TestUtils + |> expect(:stub_function, 2, fn data -> + %{different: "value", ref: data.ref} + end) + + for cb <- [ + {LogflareEx.TestUtils, :stub_function, 1}, + &LogflareEx.TestUtils.stub_function/1, + fn data -> %{different: "value", ref: data.ref} end + ] do + client = LogflareEx.client(api_key: "123", source_token: "123", on_prepare_payload: cb) + ref = make_ref() + + assert {:error, %Tesla.Env{}} = + LogflareEx.send_events(client, [%{some: "event", ref: ref}]) + + assert_receive {^ref, %{different: "value", ref: _}} + end + end + end + describe "batching" do setup do pid = start_supervised!(BatcherSup) diff --git a/ex/test/telemetry_reporter_test.exs b/ex/test/telemetry_reporter_test.exs index 9788498..ab379cf 100644 --- a/ex/test/telemetry_reporter_test.exs +++ b/ex/test/telemetry_reporter_test.exs @@ -119,6 +119,42 @@ defmodule LogflareEx.TelemetryReporterTest do assert event[:measurements][:latency] == [123, 223] refute event[:measurements][:other] end + + test "handle_attach/4 with :on_prepare_payload with anonymous function" do + pid = self() + ref = make_ref() + + Tesla + |> expect(:post, fn _client, _path, body -> + decoded = Bertex.decode(body) + send(pid, {ref, decoded}) + {:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}} + end) + + :telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4, + auto_flush: true, + flush_interval: 50, + include: ["measurements"], + on_prepare_payload: fn payload -> + payload + |> Map.put(:message, "hello!") + |> Map.put(:test, payload.measurements.other) + end + ) + + :telemetry.execute([:some, :event], %{latency: [123, 223], other: "value"}, %{ + some: "metadata", + to_exclude: "this field" + }) + + Process.sleep(300) + + assert_received {^ref, %{"batch" => [event]}} + # other fields will be included + assert event[:message] == "hello!" + assert event[:test] == "value" + assert event[:measurements][:latency] == [123, 223] + end end # reporter