Skip to content

Commit

Permalink
telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanjos committed Jun 20, 2024
1 parent 44ea718 commit f4db5d9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 10 deletions.
70 changes: 61 additions & 9 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,26 +149,78 @@ defmodule Absinthe.Subscription do
}

storage_implementation = storage_implementation(pubsub)
storage_implementation.subscribe(pubsub, doc_id, doc_value, field_keys)

:telemetry.span(
[:absinthe, :subscription, :storage, :subscribe],
%{
doc_id: doc_id,
doc: doc,
field_keys: field_keys,
storage_implementation: storage_implementation
},
fn ->
result = storage_implementation.subscribe(pubsub, doc_id, doc_value, field_keys)

{result,
%{
doc_id: doc_id,
doc: doc,
field_keys: field_keys,
storage_implementation: storage_implementation
}}
end
)
end

@doc false
def unsubscribe(pubsub, doc_id) do
storage_implementation = storage_implementation(pubsub)
storage_implementation.unsubscribe(pubsub, doc_id)

:telemetry.span(
[:absinthe, :subscription, :storage, :unsubscribe],
%{
doc_id: doc_id,
storage_implementation: storage_implementation
},
fn ->
result = storage_implementation.unsubscribe(pubsub, doc_id)

{result,
%{
doc_id: doc_id,
storage_implementation: storage_implementation
}}
end
)
end

@doc false
def get(pubsub, key) do
storage_implementation = storage_implementation(pubsub)

pubsub
|> storage_implementation.get_docs_by_field_key(key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
{doc_id, Map.put(doc, :initial_phases, initial_phases)}
end)
|> Map.new()
:telemetry.span(
[:absinthe, :subscription, :storage, :get],
%{
key: key,
storage_implementation: storage_implementation
},
fn ->
result =
pubsub
|> storage_implementation.get_docs_by_field_key(key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
{doc_id, Map.put(doc, :initial_phases, initial_phases)}
end)
|> Map.new()

{result,
%{
key: key,
storage_implementation: storage_implementation
}}
end
)
end

@doc false
Expand Down
1 change: 0 additions & 1 deletion lib/absinthe/subscription/document_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Absinthe.Subscription.DocumentStorage do
Absinthe how to store documents and the field keys subcribed to those
documents.
"""
alias Module.Behaviour

@doc """
Child spec to determine how to start the
Expand Down

0 comments on commit f4db5d9

Please sign in to comment.