Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Propagate trace context from Cowboy to Plug #5

Open
wants to merge 1 commit into
base: trace_gateway
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions plug_gateway/lib/plug_gateway/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,25 @@ defmodule PlugGateway.Application do

use Application

alias PlugGateway.{
ContextPropagator,
FinchTelemetryHandler,
PlugTelemetryHandler,
Router,
Tracer
}

def start(_type, _args) do
# List all child processes to be supervised
children = [
{SpandexDatadog.ApiServer, spandex_datadog_options()},
{Plug.Cowboy, scheme: :http, plug: PlugGateway.Router, options: [port: port()]},
ContextPropagator,
{Plug.Cowboy, scheme: :http, plug: Router, options: [port: port()]},
{Finch, name: MyFinch}
]

PlugGateway.FinchTelemetryHandler.install()
FinchTelemetryHandler.install()
PlugTelemetryHandler.install()

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
Expand All @@ -31,7 +41,7 @@ defmodule PlugGateway.Application do
port: String.to_integer(env["TRACING_PORT"] || config[:port] || "8126"),
batch_size: String.to_integer(env["TRACING_BATCH_SIZE"] || config[:batch_size] || "10"),
sync_threshold: String.to_integer(env["TRACING_SYNC_THRESHOLD"] || config[:sync_threshold] || "100"),
http: config[:http] || PlugGateway.Tracer
http: config[:http] || Tracer
]
end
end
85 changes: 85 additions & 0 deletions plug_gateway/lib/plug_gateway/context_propagator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule PlugGateway.ContextPropagator do
@moduledoc """
Facilitate trace content propagation from Cowboy to Plug. These Telemetry
callbacks fire in the Cowboy _connection_ process, not the Plug request
process, so we need to manage an ETS table to allow the request to look up
the context for its parent connection process.
"""

use GenServer

alias PlugGateway.Tracer

def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

def span_context() do
:"$ancestors"
|> Process.get([])
|> Enum.find_value(fn pid ->
case :ets.lookup(__MODULE__, pid) do
[{_pid, ctx}] -> ctx
[] -> nil
end
end)
end

@impl GenServer
def init(_opts) do
:ets.new(__MODULE__, [:named_table, :set, :public, read_concurrency: true])

:telemetry.attach_many(
"cowboy-propagator",
[
[:cowboy, :request, :start],
[:cowboy, :request, :stop],
[:cowboy, :request, :exception],
[:cowboy, :request, :early_error],
],
&__MODULE__.handle_event/4,
nil
)

{:ok, nil}
end

def handle_event([:cowboy, :request, :early_error], _meas, meta, _ctx) do
IO.inspect(meta.req, label: "Cowboy req in :early_error")
Tracer.start_trace("cowboy.request", http: http_meta(meta.req))
Tracer.finish_trace()
end

def handle_event([:cowboy, :request, :start], _meas, meta, _ctx) do
IO.inspect(meta.req, label: "Cowboy req in :start")
Tracer.start_trace("cowboy.request", http: http_meta(meta.req))
{:ok, ctx} = Tracer.current_context()
:ets.insert(__MODULE__, {self(), ctx})
end

def handle_event([:cowboy, :request, :stop], _meas, meta, _ctx) do
IO.inspect(meta.req, label: "Cowboy req in :stop")
Tracer.finish_trace()
:ets.delete(__MODULE__, self())
end

def handle_event([:cowboy, :request, :exception], _meas, meta, _ctx) do
IO.inspect(meta.req, label: "Cowboy req in :exception")
Tracer.finish_trace()
:ets.delete(__MODULE__, self())
end

# Reformat Cowboy request metadata into Spandex metadata
defp http_meta(req) do
uri = %URI{
scheme: to_string(req[:scheme]),
host: req[:host],
port: req[:port],
path: req[:path],
query: req[:qs]
}

[
url: URI.to_string(uri),
method: req[:method]
]
end
end
42 changes: 42 additions & 0 deletions plug_gateway/lib/plug_gateway/plug_telemetry_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule PlugGateway.PlugTelemetryHandler do
@moduledoc "Handles events from Plug.Telemetry"

alias PlugGateway.Tracer

def install do
:telemetry.attach_many(
"plug-router-telemetry",
[
[:plug_gateway, :router, :start],
[:plug_gateway, :router, :stop],
[:plug_gateway, :router, :exception]
],
&__MODULE__.handle_event/4,
nil
)
end

def handle_event([:plug_gateway, :router, :start], _measures, meta, _ctx) do
IO.inspect(meta, label: "plug.router start meta")
case PlugGateway.ContextPropagator.span_context() do
nil -> Tracer.start_trace("plug.router")
ctx -> Tracer.continue_trace("plug.router", ctx)
end
end

def handle_event([:plug_gateway, :router, :stop], _measures, meta, _ctx) do
IO.inspect(meta, label: "plug.router stop meta")
Tracer.finish_trace()
end

# Note: this exception event is fired from this app's handle_errors callback
# via Plug.ErrorHandler. It is not built into the Plug.Telemetry events by
# default.
def handle_event([:plug_gateway, :router, :exception], _measures, meta, _ctx) do
IO.inspect(meta, label: "plug.router exception meta")
exception = Exception.normalize(meta[:kind], meta[:error], meta[:stack])
Tracer.span_error(exception, meta[:stack] || [])
# Note that we don't finish the trace here because after processing the
# error, we will still send the response and end up firing the `:stop`
# event.
end end
8 changes: 5 additions & 3 deletions plug_gateway/lib/plug_gateway/router.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
defmodule PlugGateway.Router do
use Plug.Router
use Plug.ErrorHandler
use SpandexPhoenix

alias PlugGateway.BackendClient

require Logger

plug :match
plug Plug.Telemetry, event_prefix: [:plug_gateway, :router]
plug Plug.Parsers, parsers: [:urlencoded, :json],
pass: ["*/*"],
json_decoder: Jason
Expand Down Expand Up @@ -55,8 +55,10 @@ defmodule PlugGateway.Router do

defp backend_api_endpoint, do: System.get_env("BACKEND_API_URL")

defp handle_errors(conn, %{kind: _kind, reason: reason, stack: _stack}) do
Logger.error("Error: #{inspect reason}")
defp handle_errors(conn, error) do
Logger.error("Error: #{inspect error[:reason]}")
meta = Map.put(error, :conn, conn)
:telemetry.execute([:plug_gateway, :router, :exception], %{}, meta)
send_resp(conn, conn.status, "Internal Server Error")
end
end
4 changes: 2 additions & 2 deletions plug_gateway/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ defmodule PlugGateway.MixProject do
{:decorator, "~> 1.3"},
{:finch, "~> 0.4"},
{:jason, "~> 1.1"},
{:plug_cowboy, "~> 2.0"},
{:plug_cowboy, "~> 2.4"},
{:spandex, "~> 3.0"},
{:spandex_datadog, "~> 1.0"},
{:spandex_phoenix, "~> 0.4"},
{:spandex_phoenix, "~> 1.0"},
{:telemetry, "~> 0.4"}
]
end
Expand Down
2 changes: 1 addition & 1 deletion plug_gateway/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"spandex": {:hex, :spandex, "3.0.2", "6fc99ec4cca6810ffff579cc606dcbf14d54d1419f4738f64f63854aee351fdb", [:mix], [{:decorator, "~> 1.2", [hex: :decorator, repo: "hexpm", optional: true]}, {:optimal, "~> 0.3.3", [hex: :optimal, repo: "hexpm", optional: false]}, {:plug, ">= 1.0.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "b98667d553490d63ab4ae784c4689d41052145aa2ca59f2f59caacd4a1a63d1e"},
"spandex_datadog": {:hex, :spandex_datadog, "1.0.0", "20db83c10600210a36a1ac95ed8cef3efb400ae948f48252437c2cc21dfd3969", [:mix], [{:msgpax, "~> 2.2.1", [hex: :msgpax, repo: "hexpm", optional: false]}, {:spandex, "~> 3.0", [hex: :spandex, repo: "hexpm", optional: false]}], "hexpm", "d1a499ed3e8580b88ca8ba10208004849b3bd6cfa045b90e4b69055b78474045"},
"spandex_phoenix": {:hex, :spandex_phoenix, "0.4.2", "8d63d080b411f43a0b269e19ca2a21d2ed832d93bcf2d1b1f9d13101188a45f1", [:mix], [{:plug, "~> 1.3", [hex: :plug, repo: "hexpm", optional: false]}, {:spandex, "~> 2.2 or ~> 3.0", [hex: :spandex, repo: "hexpm", optional: false]}], "hexpm", "c8e729cba2576bd055fae0b205436d459eeab1e32b0473d662e14a056522153d"},
"spandex_phoenix": {:hex, :spandex_phoenix, "1.0.4", "e42471659c67c02d19fd76fdcce2e044f8f85f050671361324643624aa2ba971", [:mix], [{:plug, "~> 1.3", [hex: :plug, repo: "hexpm", optional: false]}, {:spandex, "~> 2.2 or ~> 3.0", [hex: :spandex, repo: "hexpm", optional: false]}], "hexpm", "d3969e21f93be314b772777924a9b067767b8b9a0464c55a04270c64793aa495"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
}