Skip to content

Commit

Permalink
feat(telemetry): Update opentelemetry events
Browse files Browse the repository at this point in the history
  • Loading branch information
juancgalvis committed Mar 3, 2025
1 parent 01231aa commit ee6f757
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 12 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ jobs:
if: ${{ !contains(github.event.head_commit.message, '[skip ci]') }}
name: Build and test
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: write
actions: read

steps:
- name: Generate a token
Expand Down
4 changes: 3 additions & 1 deletion lib/reactive_commons/listeners/reply_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ defmodule ReplyListener do
def consume(props, payload, %{chan: chan}) do
correlation_id = get_correlation_id(props)
:ok = AMQP.Basic.ack(chan, props.delivery_tag)
ReplyRouter.route_reply(correlation_id, payload)
result = ReplyRouter.route_reply(correlation_id, payload)
:telemetry.execute([:async, :message, :replied], %{}, %{meta: props, result: result})
result
end
end
14 changes: 10 additions & 4 deletions lib/reactive_commons/messaging/generic_executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ defmodule GenericExecutor do
) do
t0 = :erlang.monotonic_time()

report_to_telemetry(msg)

try do
event = decode(msg)
handler_path = get_handler_path(msg, event)
[{_path, handler_fn}] = :ets.lookup(table, handler_path)
handler_result = handler_fn.(event)
on_post_process(handler_result, msg)
report_to_telemetry(@message_type, handler_path, calc_duration(t0), :success)
report_to_telemetry(msg, @message_type, handler_path, calc_duration(t0), :success)
:ok = ack(chan, tag)
catch
info, error ->
Expand All @@ -61,7 +63,7 @@ defmodule GenericExecutor do
_type, _err -> :erlang.atom_to_binary(@message_type) <> ".unknown"
end

report_to_telemetry(@message_type, handler_path, duration, :failure)
report_to_telemetry(msg, @message_type, handler_path, duration, :failure)
end)
end

Expand Down Expand Up @@ -117,18 +119,22 @@ defmodule GenericExecutor do
)
end

def report_to_telemetry(type, handler_path, duration, result)
def report_to_telemetry(msg, type, handler_path, duration, result)
when result in [:success, :failure] do
type_str = :erlang.atom_to_binary(type)
transaction = "#{type_str}.#{handler_path}"

:telemetry.execute(
[:async, :message, :completed],
%{duration: duration},
%{transaction: transaction, result: :erlang.atom_to_binary(result)}
%{msg: msg, transaction: transaction, result: :erlang.atom_to_binary(result)}
)
end

def report_to_telemetry(msg) do
:telemetry.execute([:async, :message, :start], %{}, %{msg: msg})
end

def calc_duration(t0) do
t1 = :erlang.monotonic_time()
:erlang.convert_time_unit(t1 - t0, :native, :microsecond)
Expand Down
44 changes: 38 additions & 6 deletions lib/reactive_commons/messaging/message_sender.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule MessageSender do
@moduledoc false
use GenServer
alias ReactiveCommons.Utils.SpanUtils
require Logger

defstruct [:chan, :conn]
Expand Down Expand Up @@ -29,10 +30,10 @@ defmodule MessageSender do
@impl true
def handle_call(
message = %OutMessage{headers: _, content_encoding: _},
_from,
from,
state = %{chan: chan}
) do
publish(message, chan)
publish(message, chan, from)
{:reply, :ok, state}
end

Expand All @@ -54,14 +55,14 @@ defmodule MessageSender do

@impl true
def handle_info({:retry, message = %OutMessage{}, from, _count}, state = %{chan: chan}) do
publish(message, chan)
publish(message, chan, from)
GenServer.reply(from, :ok)
{:noreply, state}
end

defp publish(message = %OutMessage{headers: headers, content_encoding: encoding}, chan) do
defp publish(message = %OutMessage{headers: headers, content_encoding: encoding}, chan, from) do
options = [
headers: headers,
headers: SpanUtils.inject(headers, from),
content_encoding: encoding,
content_type: message.content_type,
persistent: message.persistent,
Expand All @@ -70,7 +71,38 @@ defmodule MessageSender do
app_id: MessageContext.config().application_name
]

AMQP.Basic.publish(chan, message.exchange_name, message.routing_key, message.payload, options)
result =
AMQP.Basic.publish(
chan,
message.exchange_name,
message.routing_key,
message.payload,
options
)

send_telemetry(
System.monotonic_time(),
message,
options,
result,
from
)

result
end

defp send_telemetry(start, message = %OutMessage{}, options, result, {caller, _}) do
:telemetry.execute(
[:async, :message, :sent],
%{duration: System.monotonic_time() - start},
%{
exchange: message.exchange_name,
routing_key: message.routing_key,
options: options,
result: result,
caller: caller
}
)
end

defp create_topology(chan) do
Expand Down
10 changes: 10 additions & 0 deletions lib/reactive_commons/utils/span_utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule ReactiveCommons.Utils.SpanUtils do
@moduledoc false
def inject(headers, from) do
if Code.ensure_loaded?(OpentelemetryReactiveCommons.Utils) do
OpentelemetryReactiveCommons.Utils.inject(headers, from)
else
headers
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ defmodule ReactiveCommons.MixProject do
defp deps do
[
{:poison, "~> 6.0 or ~> 5.0"},
{:amqp, "~> 3.3 or ~> 4.0"},
{:amqp, "~> 4.0 or ~> 3.3"},
{:uuid, "~> 1.1"},
{:telemetry, "~> 1.3"},
{:ex_doc, ">= 0.0.0", only: [:dev, :test], runtime: false},
Expand Down

0 comments on commit ee6f757

Please sign in to comment.