Skip to content

Commit

Permalink
Return a result tuple in telemetry event
Browse files Browse the repository at this point in the history
  • Loading branch information
Schultzer committed Dec 9, 2023
1 parent 732a434 commit 5e9ba59
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions lib/ecto/adapters/qlc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ defmodule EctoQLC.Adapters.QLC do
query_handle = to_query_handle(operator, prepareed, qlc)
{query_time, values} = :timer.tc(:qlc, :eval, [query_handle, []])
{decode_time, value} = :timer.tc(__MODULE__, :select, [values, operator, prepareed])
log(value, get_source(query.sources), qlc, query_time, decode_time, 0, 0, operator, adapter_meta.telemetry, params, options ++ adapter_meta.opts)
log(value, get_source(query.sources), qlc, query_time, decode_time, 0, 0, operator, adapter_meta.telemetry, params, query, options ++ adapter_meta.opts)
end)
|> elem(1)
end
Expand All @@ -226,7 +226,7 @@ defmodule EctoQLC.Adapters.QLC do
query_handle = to_query_handle(operator, prepareed, qlc)
{query_time, values} = :timer.tc(:qlc, :eval, [query_handle, []])
{decode_time, value} = :timer.tc(__MODULE__, :select, [values, operator, prepareed])
log(value, get_source(query.sources), qlc, query_time, decode_time, 0, 0, operator, adapter_meta.telemetry, params, options ++ adapter_meta.opts)
log(value, get_source(query.sources), qlc, query_time, decode_time, 0, 0, operator, adapter_meta.telemetry, params, query, options ++ adapter_meta.opts)
end

@doc false
Expand All @@ -249,7 +249,7 @@ defmodule EctoQLC.Adapters.QLC do
fn cursor ->
result = :qlc.delete_cursor(cursor)
query_time = :timer.now_diff(:erlang.timestamp, Process.get(key))
log(result, get_source(query.sources), qlc, query_time, 0, 0, 0, operator, adapter_meta.telemetry, params, options ++ adapter_meta.opts)
log(result, get_source(query.sources), qlc, query_time, 0, 0, 0, operator, adapter_meta.telemetry, params, query, options ++ adapter_meta.opts)
end
)
end
Expand Down Expand Up @@ -1041,7 +1041,7 @@ defmodule EctoQLC.Adapters.QLC do
else
{:error, :stale}
end
|> log(source, "DELETE #{inspect source} #{inspect filters} MATCHSPEC #{inspect ms}", query_time, 0, 0, 0, :delete_all, adapter_meta.telemetry, filters, options ++ adapter_meta.opts)
|> log(source, "DELETE #{inspect source} #{inspect filters} MATCHSPEC #{inspect ms}", query_time, 0, 0, 0, :delete_all, adapter_meta.telemetry, filters, [], options ++ adapter_meta.opts)
end
def delete(:mnesia = mod, adapter_meta, %{source: source, prefix: prefix, schema: schema}, filters, _returning, options) do
ms = to_match_spec(adapter_meta, schema, filters)
Expand All @@ -1055,7 +1055,7 @@ defmodule EctoQLC.Adapters.QLC do
end
end
{query_time, {:atomic, result}} = :timer.tc(mod, :transaction, [fun])
log(result, source, "DELETE #{inspect source} #{inspect filters} MATCHSPEC #{inspect ms}", query_time, 0, 0, 0, :delete_all, adapter_meta.telemetry, filters, options ++ adapter_meta.opts)
log(result, source, "DELETE #{inspect source} #{inspect filters} MATCHSPEC #{inspect ms}", query_time, 0, 0, 0, :delete_all, adapter_meta.telemetry, filters, [], options ++ adapter_meta.opts)
end

@doc false
Expand All @@ -1074,7 +1074,7 @@ defmodule EctoQLC.Adapters.QLC do
else
{query_time, false} when is_integer(query_time) -> {query_time, {:invalid, [unique: "primary_key"]}}
end
log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, options ++ adapter_meta.opts)
log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, [], options ++ adapter_meta.opts)
end
def insert(:ets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, _on_conflict, returning, options) do
table = to_table(adapter_meta, source, prefix, options)
Expand All @@ -1087,7 +1087,7 @@ defmodule EctoQLC.Adapters.QLC do
{query_time, true} -> {query_time, {:ok, Enum.map(returning, &fields[&1])}}
{query_time, false} -> {query_time, {:invalid, [unique: "primary_key"]}}
end
log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, options ++ adapter_meta.opts)
log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, [], options ++ adapter_meta.opts)
end
def insert(:mnesia = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, _on_conflict, returning, options) do
table = to_table(adapter_meta, source, prefix, options)
Expand All @@ -1114,7 +1114,7 @@ defmodule EctoQLC.Adapters.QLC do
{query_time, {:atomic, result}} -> {query_time, result}
end

log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, options ++ adapter_meta.opts)
log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, fields, [], options ++ adapter_meta.opts)
end

@doc false
Expand All @@ -1141,7 +1141,7 @@ defmodule EctoQLC.Adapters.QLC do
end


log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect rows}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, options ++ adapter_meta.opts)
log(result, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect rows}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, [], options ++ adapter_meta.opts)
end
def insert_all(:ets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, _header, rows, _on_conflict, returning, _placeholders, options) do
table = to_table(adapter_meta, source, prefix, options)
Expand All @@ -1157,7 +1157,7 @@ defmodule EctoQLC.Adapters.QLC do
else
{0, result}
end
|> log(source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect rows}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, options ++ adapter_meta.opts)
|> log(source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect rows}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, [], options ++ adapter_meta.opts)
end
def insert_all(:mnesia = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, _header, rows, _on_conflict, returning, _placeholders, options) do
table = to_table(adapter_meta, source, prefix, options)
Expand All @@ -1172,7 +1172,7 @@ defmodule EctoQLC.Adapters.QLC do
query_time = query_time + insert_time
if value == :ok do
acc = if returning != [], do: acc ++ [Enum.map(returning, fn k -> row[k] end)]
log({count + 1, acc}, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect row}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, options ++ adapter_meta.opts)
log({count + 1, acc}, source, "INSERT INTO #{inspect source} RETURNING #{inspect returning} #{inspect row}", query_time, 0, 0, 0, :insert, adapter_meta.telemetry, rows, [], options ++ adapter_meta.opts)
else
{count, acc}
end
Expand All @@ -1196,7 +1196,7 @@ defmodule EctoQLC.Adapters.QLC do
else
{query_time, _error} when is_integer(query_time) -> {query_time, {:invalid, [unique: "primary_key"]}}
end
log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, options ++ adapter_meta.opts)
log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, [], options ++ adapter_meta.opts)
end
def update(:ets = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, params, returning, options) do
key = to_key(params)
Expand All @@ -1208,7 +1208,7 @@ defmodule EctoQLC.Adapters.QLC do
else
{query_time, _} -> {query_time, {:invalid, [unique: "primary_key"]}}
end
log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, options ++ adapter_meta.opts)
log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, [], options ++ adapter_meta.opts)
end
def update(:mnesia = mod, adapter_meta, %{schema: schema, source: source, prefix: prefix}, fields, params, returning, options) do
table = to_table(adapter_meta, source, prefix, options)
Expand All @@ -1227,7 +1227,7 @@ defmodule EctoQLC.Adapters.QLC do
{query_time, {:atomic, :ok}} -> {query_time, {:ok, Enum.map(fields, &Enum.map(returning, fn k -> &1[k] end))}}
{query_time, {:atomic, result}} -> {query_time, result}
end
log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, options ++ adapter_meta.opts)
log(result, source, "UPDATE INTO #{inspect source} RETURNING #{inspect returning} #{inspect fields}", query_time, 0, 0, 0, :update_all, adapter_meta.telemetry, params, [], options ++ adapter_meta.opts)
end

defp distinct(rows, %Ecto.Query{distinct: nil}, _adapter_meta), do: rows
Expand Down Expand Up @@ -1365,13 +1365,17 @@ defmodule EctoQLC.Adapters.QLC do
Module.concat([adapter_meta.adapter, options[:prefix] || prefix, source])
end

defp log(result, source, query, query_time, decode_time, queue_time, idle_time, operator, {repo, log, event_name} = _telemetry, params, opts) do
defp log(:ok, source, query, query_time, decode_time, queue_time, idle_time, operator, telemetry, params, columns, opts) do
log({0, []}, source, query, query_time, decode_time, queue_time, idle_time, operator, telemetry, params, columns, opts)
end
defp log({num_rows, rows} = result, source, query, query_time, decode_time, queue_time, idle_time, operator, {repo, log, event_name} = _telemetry, params, columns, opts) do
columns = if is_struct(columns) and columns.select, do: columns.select.fields, else: []
query = String.Chars.to_string(query)
stacktrace = Keyword.get(opts, :stacktrace)
if event_name = Keyword.get(opts, :telemetry_event, event_name) do
:telemetry.execute(event_name,
%{query_time: query_time, decode_time: decode_time, queue_time: queue_time, idle_time: idle_time, total_time: query_time + decode_time + queue_time + idle_time},
%{type: :ecto_qlc_query, repo: repo, result: result, params: params, query: query, source: source, stacktrace: stacktrace, options: Keyword.get(opts, :telemetry_options, [])})
%{type: :ecto_qlc_query, repo: repo, result: {:ok, %{command: to_command(operator), rows: rows, num_rows: num_rows, columns: columns}}, params: params, query: query, source: source, stacktrace: stacktrace, options: Keyword.get(opts, :telemetry_options, [])})
end
fun = fn -> log_iodata(query_time, decode_time, queue_time, idle_time, repo, source, query, opts[:cast_params] || params, result, stacktrace) end
case Keyword.get(opts, :log, log) do
Expand All @@ -1387,6 +1391,9 @@ defmodule EctoQLC.Adapters.QLC do
end
end

defp to_command(:all), do: :select
defp to_command(command), do: command

defp log_iodata(query_time, decode_time, queue_time, idle_time, repo, source, query, params, result, stacktrace) do
result = if is_tuple(result) and is_atom(elem(result, 0)), do: String.upcase("#{elem(result, 0)}"), else: "OK"
stacktrace = case stacktrace do
Expand Down

0 comments on commit 5e9ba59

Please sign in to comment.