Skip to content

Commit

Permalink
Only bootstrap new oids during describe
Browse files Browse the repository at this point in the history
Carry out full bootstrap on first connect and then fetch new types when
trying to describe a query. This means reconnects don't run a bootstrap
query and describe runs minimal query.
  • Loading branch information
fishcakez committed Nov 25, 2017
1 parent 443e9da commit 53585e7
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 159 deletions.
159 changes: 92 additions & 67 deletions lib/postgrex/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,6 @@ defmodule Postgrex.Protocol do
{:lock, ref, types} ->
status = %{status | types_lock: {server, ref}}
bootstrap_send(%{s | types: types}, status, types, buffer)
{:go, types} ->
bootstrap_done(%{s | types: types}, status, buffer)
:noproc ->
bootstrap(s, status, buffer)
:error ->
Expand All @@ -662,12 +660,18 @@ defmodule Postgrex.Protocol do

defp bootstrap_send(s, status, types, buffer) do
%{parameters: parameters} = s
bootstrap_send(s, status, types, parameters, buffer, &bootstrap_sync_recv/3)
version = Postgrex.Utils.parse_version(parameters["server_version"])
statement = Types.bootstrap_query(version, types)
if statement do
bootstrap_send(s, status, statement, buffer, &bootstrap_sync_recv/3)
else
%{types_lock: {server, ref}} = status
TypeServer.done(server, ref)
bootstrap_done(s, status, buffer)
end
end

defp bootstrap_send(s, status, types, parameters, buffer, next) do
version = parameters["server_version"] |> Postgrex.Utils.parse_version
statement = Types.bootstrap_query(version, types)
defp bootstrap_send(s, status, statement, buffer, next) do
msg = msg_query(statement: statement)
case msg_send(s, msg, buffer) do
:ok ->
Expand Down Expand Up @@ -698,7 +702,6 @@ defmodule Postgrex.Protocol do
end

defp bootstrap_types(s, status, type_infos, buffer, next) do
query_delete(s, %Query{name: "", statement: ""})
%{types_lock: {server, ref}} = status
TypeServer.update(server, ref, type_infos)
next.(s, status, buffer)
Expand All @@ -723,7 +726,7 @@ defmodule Postgrex.Protocol do
do: activate(%{s | queries: queries_new()}, buffer)

defp bootstrap_fail(s, err, %{types_lock: {server, ref}}) do
TypeServer.fail(server, ref)
TypeServer.done(server, ref)
{:disconnect, err, s}
end

Expand Down Expand Up @@ -798,8 +801,8 @@ defmodule Postgrex.Protocol do
{:ok, s} <- recv_ready(s, status, buffer) do
{:ok, query, s}
else
{:reload, oid, s, buffer} ->
reload_ready(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_ready(s, status, query, oids, buffer)
{:disconnect, err, s} ->
{:disconnect, err, s}
{:error, %Postgrex.Error{} = err, s, buffer} ->
Expand All @@ -820,8 +823,8 @@ defmodule Postgrex.Protocol do
{:ok, _, s} <- recv_transaction(s, status, buffer) do
{:ok, query, s}
else
{:reload, oid, s, buffer} ->
reload_transaction(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_transaction(s, status, query, oids, buffer)
{:disconnect, err, s} ->
{:disconnect, err, s}
{:error, %Postgrex.Error{} = err, s, buffer} ->
Expand All @@ -848,8 +851,8 @@ defmodule Postgrex.Protocol do
{:ok, s} <- recv_ready(s, status, buffer) do
{:ok, query, s}
else
{:reload, oid, s, buffer} ->
reload_closed(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_closed(s, status, query, oids, buffer)
{:disconnect, err, s} ->
{:disconnect, err, s}
{:error, %Postgrex.Error{} = err, s, buffer} ->
Expand All @@ -874,8 +877,8 @@ defmodule Postgrex.Protocol do
else
{:error, err, s, buffer} ->
error_flushed(s, status, err, buffer)
{:reload, oid, s, buffer} ->
reload_flushed(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_flushed(s, status, query, oids, buffer)
{:disconnect, _err, _s} = disconnect ->
disconnect
end
Expand All @@ -894,8 +897,8 @@ defmodule Postgrex.Protocol do
else
{:error, err, s, buffer} ->
rollback_flushed(s, status, err, buffer)
{:reload, oid, s, buffer} ->
reload_flushed(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_flushed(s, status, query, oids, buffer)
{:disconnect, _err, _s} = disconnect ->
disconnect
end
Expand All @@ -920,8 +923,8 @@ defmodule Postgrex.Protocol do
{:ok, s} <- recv_ready(s, status, buffer) do
{:ok, query, s}
else
{:reload, oid, s, buffer} ->
reload_ready(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_ready(s, status, query, oids, buffer)
{:disconnect, err, s} ->
{:disconnect, err, s}
{:error, %Postgrex.Error{} = err, s, buffer} ->
Expand All @@ -946,8 +949,8 @@ defmodule Postgrex.Protocol do
{:ok, _, s} <- recv_transaction(s, status, buffer) do
{:ok, query, s}
else
{:reload, oid, s, buffer} ->
reload_transaction(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_transaction(s, status, query, oids, buffer)
{:disconnect, err, s} ->
{:disconnect, err, s}
{:error, %Postgrex.Error{} = err, s, buffer} ->
Expand Down Expand Up @@ -976,8 +979,8 @@ defmodule Postgrex.Protocol do
else
{:error, err, s, buffer} ->
error_flushed(s, status, err, buffer)
{:reload, oid, s, buffer} ->
reload_flushed(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_flushed(s, status, query, oids, buffer)
{:disconnect, _err, _s} = disconnect ->
disconnect
end
Expand All @@ -1000,8 +1003,8 @@ defmodule Postgrex.Protocol do
else
{:error, err, s, buffer} ->
rollback_flushed(s, status, err, buffer)
{:reload, oid, s, buffer} ->
reload_flushed(s, status, query, oid, buffer)
{:reload, oids, s, buffer} ->
reload_flushed(s, status, query, oids, buffer)
{:disconnect, _err, _s} = disconnect ->
disconnect
end
Expand Down Expand Up @@ -1080,13 +1083,11 @@ defmodule Postgrex.Protocol do
end

defp describe(s, query, param_oids, result_oids, columns, buffer) do
with {:ok, query} <- describe_params(s, query, param_oids),
{:ok, query} <- describe_result(s, query, result_oids, columns) do
query_put(s, query)
{:ok, query, s, buffer}
else
{:reload, oid} ->
{:reload, oid, s, buffer}
case describe_params(s, query, param_oids) do
{:ok, query} ->
redescribe(s, query, result_oids, columns, buffer)
{:reload, oids} ->
reload_describe_result(s, oids, result_oids, buffer)
{:error, err} ->
{:disconnect, err, %{s | buffer: buffer}}
end
Expand All @@ -1097,8 +1098,8 @@ defmodule Postgrex.Protocol do
query_put(s, query)
{:ok, query, s, buffer}
else
{:reload, oid} ->
{:reload, oid, s, buffer}
{:reload, oids} ->
{:reload, oids, s, buffer}
{:error, err} ->
{:disconnect, err, %{s | buffer: buffer}}
end
Expand All @@ -1113,6 +1114,20 @@ defmodule Postgrex.Protocol do
end
end

defp reload_describe_result(s, param_oids, nil, buffer) do
{:reload, param_oids, s, buffer}
end
defp reload_describe_result(%{types: types} = s, param_oids, result_oids, buffer) do
case fetch_type_info(result_oids, types) do
{:ok, _} ->
{:reload, param_oids, s, buffer}
{:reload, reload_oids} ->
{:reload, Enum.uniq(param_oids ++ reload_oids), s, buffer}
{:error, err} ->
{:disconnect, err, %{s | buffer: buffer}}
end
end

defp describe_result(%{types: types}, query, nil, nil) do
query = %Query{query | ref: make_ref(), types: types, columns: nil,
result_oids: nil, result_formats: [], result_types: nil}
Expand Down Expand Up @@ -1154,31 +1169,31 @@ defmodule Postgrex.Protocol do
end
end

defp reload_transaction(s, status, query, oid, buffer) do
defp reload_transaction(s, status, query, oids, buffer) do
%Query{name: name} = query
msgs = [msg_close(type: :statement, name: name),
msg_sync()]
with {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer),
:ok <- msg_send(s, msgs, buffer) do
reload_closed(s, status, query, oid, buffer)
reload_closed(s, status, query, oids, buffer)
else
{:disconnect, _err, _s} = disconnect ->
disconnect
end
end

defp reload_flushed(s, %{mode: :transaction} = status, query, oid, buffer) do
defp reload_flushed(s, %{mode: :transaction} = status, query, oids, buffer) do
%Query{name: name} = query
msgs = [msg_close(type: :statement, name: name),
msg_sync()]
with :ok <- msg_send(s, msgs, buffer) do
reload_closed(s, status, query, oid, buffer)
reload_closed(s, status, query, oids, buffer)
else
{:disconnect, _err, _s} = disconnect ->
disconnect
end
end
defp reload_flushed(s, %{mode: :savepoint} = status, query, oid, buffer) do
defp reload_flushed(s, %{mode: :savepoint} = status, query, oids, buffer) do
%Query{name: name} = query
rollback_release =
"ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query"
Expand All @@ -1189,76 +1204,78 @@ defmodule Postgrex.Protocol do
{:ok, s, buffer} <- recv_close(s, status, buffer),
{:ok, _, %{buffer: buffer} = s}
<- recv_transaction(s, status, buffer) do
reload_spawn(%{s | buffer: nil}, status, query, oid, buffer)
reload_spawn(%{s | buffer: nil}, status, query, oids, buffer)
else
{:disconnect, _err, _s} = disconnect ->
disconnect
end
end

defp reload_ready(s, status, query, oid, buffer) do
defp reload_ready(s, status, query, oids, buffer) do
%Query{name: name} = query
msgs = [msg_close(type: :statement, name: name),
msg_sync()]
with {:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer),
:ok <- msg_send(s, msgs, buffer) do
reload_closed(s, status, query, oid, buffer)
reload_closed(s, status, query, oids, buffer)
else
{:disconnect, _err, _s} = disconnect ->
disconnect
end
end

defp reload_closed(s, status, query, oid, buffer) do
defp reload_closed(s, status, query, oids, buffer) do
with {:ok, s, buffer} <- recv_close(s, status, buffer),
{:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer) do
reload_spawn(%{s | buffer: nil}, status, query, oid, buffer)
reload_spawn(%{s | buffer: nil}, status, query, oids, buffer)
else
{:disconnect, _err, _s} = disconnect ->
disconnect
end
end

defp fetch_type_info(oids, types, infos \\ [])
defp fetch_type_info([], _, infos) do
defp fetch_type_info(oids, types, infos \\ [], reload \\ [])
defp fetch_type_info([], _, infos, []) do
{:ok, Enum.reverse(infos)}
end
defp fetch_type_info([oid | oids], types, infos) do
defp fetch_type_info([], _, _, reloads) do
{:reload, Enum.uniq(reloads)}
end
defp fetch_type_info([oid | oids], types, infos, reloads) do
case Postgrex.Types.fetch(oid, types) do
{:ok, info} ->
fetch_type_info(oids, types, [info | infos])
fetch_type_info(oids, types, [info | infos], reloads)
{:error, %Postgrex.TypeInfo{} = info, mod} ->
msg = Postgrex.Utils.type_msg(info, mod)
{:error, RuntimeError.exception(message: msg)}
{:error, nil, _} ->
{:reload, oid}
fetch_type_info(oids, types, infos, [oid | reloads])
end
end

defp reload_spawn(s, status, query, oid, buffer) do
defp reload_spawn(s, status, query, oids, buffer) do
Logger.warn(fn() ->
[inspect(query) | " uses unknown oid `#{oid}` causing bootstrap"]
[inspect(query) |
" uses unknown oid(s) #{Enum.join(oids, ", ")} causing bootstrap"]
end)
ref = make_ref()
{_, mon} = spawn_monitor(fn() -> reload_lock(s, status, ref, buffer) end)
{_, mon} = spawn_monitor(fn() -> reload_init(s, status, oids, ref, buffer) end)
receive do
{:DOWN, ^mon, _, _, {^ref, s, buffer}} ->
reload_fetch(s, status, query, oid, buffer)
reload_fetch(s, status, query, oids, buffer)
{:DOWN, ^mon, _, _, _} ->
{:disconnect, type_fetch_error(), %{s | buffer: buffer}}
end
end

defp reload_lock(%{types: types} = s, status, exit_ref, buffer) do
defp reload_init(%{types: types} = s, status, oids, exit_ref, buffer) do
with {:ok, server} <- Postgrex.Types.owner(types),
{:lock, lock_ref, ^types} <- TypeServer.fetch(server),
status = Map.put(status, :types_lock, {server, lock_ref}),
{:ok, s} <- reload_send(s, status, types, buffer) do
{:ok, s} <- reload(s, status, types, oids, buffer) do
%{buffer: buffer} = s
exit({exit_ref, %{s | buffer: nil}, buffer})
else
{:go, ^types} ->
exit({exit_ref, s, buffer})
:noproc ->
exit(:normal)
:error ->
Expand All @@ -1268,27 +1285,35 @@ defmodule Postgrex.Protocol do
end
end

defp reload_send(s, status, types, buffer) do
defp reload(s, status, types, oids, buffer) do
%{parameters: parameters} = s
case Postgrex.Parameters.fetch(parameters) do
{:ok, parameters} ->
status = %{status | mode: :transaction}
bootstrap_send(s, status, types, parameters, buffer, &sync_recv/3)
with {:ok, parameters} <- Postgrex.Parameters.fetch(parameters) do
version = Postgrex.Utils.parse_version(parameters["server_version"])
statement = Types.reload_query(version, oids, types)
if statement do
status = %{status | mode: :transaction}
bootstrap_send(s, status, statement, buffer, &sync_recv/3)
else
%{types_lock: {server, ref}} = status
TypeServer.done(server, ref)
{:ok, %{s | buffer: buffer}}
end
else
:error ->
s = %{s | buffer: buffer}
{:error, %Postgrex.Error{message: "parameters not available"}, s}
end
end

defp reload_fetch(%{types: types} = s, status, query, oid, buffer) do
case Postgrex.Types.fetch(oid, types) do
defp reload_fetch(%{types: types} = s, status, query, oids, buffer) do
case fetch_type_info(oids, types) do
{:ok, _} ->
reload_prepare(%{s | buffer: buffer}, status, query)
{:error, %Postgrex.TypeInfo{} = info, mod} ->
msg = Postgrex.Utils.type_msg(info, mod)
reload_error(s, msg, buffer)
{:error, nil, _} ->
msg = "oid `#{oid}` lacks type information after bootstrap"
{:reload, oids} ->
msg = "oid(s) #{Enum.join(oids, ", ")} lack type information after bootstrap"
reload_error(s, msg, buffer)
end
end
Expand Down
Loading

0 comments on commit 53585e7

Please sign in to comment.