Skip to content

Commit

Permalink
ex/feat: include :on_prepare_payload option (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc authored Jun 11, 2024
1 parent d3d8df8 commit 675d2fa
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 3 deletions.
17 changes: 16 additions & 1 deletion ex/lib/logflare_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,22 @@ defmodule LogflareEx do
def send_events(%Client{source_token: nil, source_name: nil}, _batch), do: {:error, :no_source}

def send_events(client, [%{} | _] = batch) do
body = Bertex.encode(%{"batch" => batch, "source" => client.source_token})
on_prepare_payload = Map.get(client, :on_prepare_payload)

prepared_batch =
if on_prepare_payload do
Enum.map(batch, fn event ->
case on_prepare_payload do
{m, f, 1} -> apply(m, f, [event])
cb when is_function(cb) -> cb.(event)
_ -> event
end
end)
else
batch
end

body = Bertex.encode(%{"batch" => prepared_batch, "source" => client.source_token})

case Tesla.post(client.tesla_client, "/api/logs", body) do
{:ok, %Tesla.Env{status: status, body: body}} when status < 300 ->
Expand Down
3 changes: 3 additions & 0 deletions ex/lib/logflare_ex/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule LogflareEx.Client do
- `:source_token`: Source UUID. Mutually exclusive with `:source_name`
- `:source_name`: Source name. Mutually exclusive with `:source_token`
- `:on_error`: mfa callback for handling API errors. Must be 1 arity.
- `:on_prepare_payload`: mfa callback or anonymous function for preparing the final payload before sending to API. Must be 1 arity.
- `:auto_flush`: Used for batching. Enables automatic flushing. If disabled, `LogflareEx.flush/1` must be called.
- `:flush_interval`: Used for batching. Flushes cached events at the provided interval.
- `:batch_size`: Used for batching. It is the maximum number of events send per API request.
Expand All @@ -61,6 +62,7 @@ defmodule LogflareEx.Client do
field(:source_token, String.t())
field(:source_name, String.t())
field(:on_error, list() | mfa(), default: nil)
field(:on_prepare_payload, list() | mfa(), default: nil)
# batching
field(:auto_flush, :boolean, default: true)
field(:flush_interval, non_neg_integer(), default: @default_flush_interval)
Expand All @@ -79,6 +81,7 @@ defmodule LogflareEx.Client do
source_name: get_config_value(:source_name),
tesla_client: nil,
on_error: get_config_value(:on_error),
on_prepare_payload: get_config_value(:on_prepare_payload),
flush_interval: get_config_value(:flush_interval) || @default_flush_interval,
batch_size: get_config_value(:batch_size) || @default_batch_size
})
Expand Down
6 changes: 4 additions & 2 deletions ex/lib/logflare_ex/telemetry_reporter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ defmodule LogflareEx.TelemetryReporter do
def handle_attach(event, measurements, metadata, config) when is_list(config) do
# merge configuration
config_file_opts = (Application.get_env(:logflare_ex, __MODULE__) || []) |> Map.new()
opts = Enum.into(config, config_file_opts)

opts =
Enum.into(config, config_file_opts)

payload = %{metadata: metadata, measurements: measurements}
to_include = Map.get(opts, :include, [])

filtered_payload =
for path <- to_include,
String.starts_with?(path, "measurements.") or String.starts_with?(path, "metadata."),
String.starts_with?(path, "measurements") or String.starts_with?(path, "metadata"),
reduce: %{} do
acc -> put_path(acc, path, get_path(payload, path))
end
Expand Down
32 changes: 32 additions & 0 deletions ex/test/logflare_ex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ defmodule LogflareExTest do
end
end

describe "on_prepare_payload" do
test "triggered before payload is sent" do
pid = self()

Tesla
|> expect(:post, 3, fn _client, _path, body ->
%{"batch" => [event]} = Bertex.decode(body)
send(pid, {event.ref, event})
{:ok, %Tesla.Env{status: 500, body: "some server error"}}
end)

LogflareEx.TestUtils
|> expect(:stub_function, 2, fn data ->
%{different: "value", ref: data.ref}
end)

for cb <- [
{LogflareEx.TestUtils, :stub_function, 1},
&LogflareEx.TestUtils.stub_function/1,
fn data -> %{different: "value", ref: data.ref} end
] do
client = LogflareEx.client(api_key: "123", source_token: "123", on_prepare_payload: cb)
ref = make_ref()

assert {:error, %Tesla.Env{}} =
LogflareEx.send_events(client, [%{some: "event", ref: ref}])

assert_receive {^ref, %{different: "value", ref: _}}
end
end
end

describe "batching" do
setup do
pid = start_supervised!(BatcherSup)
Expand Down
36 changes: 36 additions & 0 deletions ex/test/telemetry_reporter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,42 @@ defmodule LogflareEx.TelemetryReporterTest do
assert event[:measurements][:latency] == [123, 223]
refute event[:measurements][:other]
end

test "handle_attach/4 with :on_prepare_payload with anonymous function" do
pid = self()
ref = make_ref()

Tesla
|> expect(:post, fn _client, _path, body ->
decoded = Bertex.decode(body)
send(pid, {ref, decoded})
{:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}}
end)

:telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4,
auto_flush: true,
flush_interval: 50,
include: ["measurements"],
on_prepare_payload: fn payload ->
payload
|> Map.put(:message, "hello!")
|> Map.put(:test, payload.measurements.other)
end
)

:telemetry.execute([:some, :event], %{latency: [123, 223], other: "value"}, %{
some: "metadata",
to_exclude: "this field"
})

Process.sleep(300)

assert_received {^ref, %{"batch" => [event]}}
# other fields will be included
assert event[:message] == "hello!"
assert event[:test] == "value"
assert event[:measurements][:latency] == [123, 223]
end
end

# reporter
Expand Down

0 comments on commit 675d2fa

Please sign in to comment.