diff --git a/lib/postgrex/protocol.ex b/lib/postgrex/protocol.ex index 24c24d0eb..73fd388e3 100644 --- a/lib/postgrex/protocol.ex +++ b/lib/postgrex/protocol.ex @@ -651,8 +651,6 @@ defmodule Postgrex.Protocol do {:lock, ref, types} -> status = %{status | types_lock: {server, ref}} bootstrap_send(%{s | types: types}, status, types, buffer) - {:go, types} -> - bootstrap_done(%{s | types: types}, status, buffer) :noproc -> bootstrap(s, status, buffer) :error -> @@ -662,12 +660,18 @@ defmodule Postgrex.Protocol do defp bootstrap_send(s, status, types, buffer) do %{parameters: parameters} = s - bootstrap_send(s, status, types, parameters, buffer, &bootstrap_sync_recv/3) + version = Postgrex.Utils.parse_version(parameters["server_version"]) + statement = Types.bootstrap_query(version, types) + if statement do + bootstrap_send(s, status, statement, buffer, &bootstrap_sync_recv/3) + else + %{types_lock: {server, ref}} = status + TypeServer.done(server, ref) + bootstrap_done(s, status, buffer) + end end - defp bootstrap_send(s, status, types, parameters, buffer, next) do - version = parameters["server_version"] |> Postgrex.Utils.parse_version - statement = Types.bootstrap_query(version, types) + defp bootstrap_send(s, status, statement, buffer, next) do msg = msg_query(statement: statement) case msg_send(s, msg, buffer) do :ok -> @@ -698,7 +702,6 @@ defmodule Postgrex.Protocol do end defp bootstrap_types(s, status, type_infos, buffer, next) do - query_delete(s, %Query{name: "", statement: ""}) %{types_lock: {server, ref}} = status TypeServer.update(server, ref, type_infos) next.(s, status, buffer) @@ -723,7 +726,7 @@ defmodule Postgrex.Protocol do do: activate(%{s | queries: queries_new()}, buffer) defp bootstrap_fail(s, err, %{types_lock: {server, ref}}) do - TypeServer.fail(server, ref) + TypeServer.done(server, ref) {:disconnect, err, s} end @@ -798,8 +801,8 @@ defmodule Postgrex.Protocol do {:ok, s} <- recv_ready(s, status, buffer) do {:ok, query, s} else - {:reload, oid, s, buffer} -> - reload_ready(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_ready(s, status, query, oids, buffer) {:disconnect, err, s} -> {:disconnect, err, s} {:error, %Postgrex.Error{} = err, s, buffer} -> @@ -820,8 +823,8 @@ defmodule Postgrex.Protocol do {:ok, _, s} <- recv_transaction(s, status, buffer) do {:ok, query, s} else - {:reload, oid, s, buffer} -> - reload_transaction(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_transaction(s, status, query, oids, buffer) {:disconnect, err, s} -> {:disconnect, err, s} {:error, %Postgrex.Error{} = err, s, buffer} -> @@ -848,8 +851,8 @@ defmodule Postgrex.Protocol do {:ok, s} <- recv_ready(s, status, buffer) do {:ok, query, s} else - {:reload, oid, s, buffer} -> - reload_closed(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_closed(s, status, query, oids, buffer) {:disconnect, err, s} -> {:disconnect, err, s} {:error, %Postgrex.Error{} = err, s, buffer} -> @@ -874,8 +877,8 @@ defmodule Postgrex.Protocol do else {:error, err, s, buffer} -> error_flushed(s, status, err, buffer) - {:reload, oid, s, buffer} -> - reload_flushed(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_flushed(s, status, query, oids, buffer) {:disconnect, _err, _s} = disconnect -> disconnect end @@ -894,8 +897,8 @@ defmodule Postgrex.Protocol do else {:error, err, s, buffer} -> rollback_flushed(s, status, err, buffer) - {:reload, oid, s, buffer} -> - reload_flushed(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_flushed(s, status, query, oids, buffer) {:disconnect, _err, _s} = disconnect -> disconnect end @@ -920,8 +923,8 @@ defmodule Postgrex.Protocol do {:ok, s} <- recv_ready(s, status, buffer) do {:ok, query, s} else - {:reload, oid, s, buffer} -> - reload_ready(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_ready(s, status, query, oids, buffer) {:disconnect, err, s} -> {:disconnect, err, s} {:error, %Postgrex.Error{} = err, s, buffer} -> @@ -946,8 +949,8 @@ defmodule Postgrex.Protocol do {:ok, _, s} <- recv_transaction(s, status, buffer) do {:ok, query, s} else - {:reload, oid, s, buffer} -> - reload_transaction(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_transaction(s, status, query, oids, buffer) {:disconnect, err, s} -> {:disconnect, err, s} {:error, %Postgrex.Error{} = err, s, buffer} -> @@ -976,8 +979,8 @@ defmodule Postgrex.Protocol do else {:error, err, s, buffer} -> error_flushed(s, status, err, buffer) - {:reload, oid, s, buffer} -> - reload_flushed(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_flushed(s, status, query, oids, buffer) {:disconnect, _err, _s} = disconnect -> disconnect end @@ -1000,8 +1003,8 @@ defmodule Postgrex.Protocol do else {:error, err, s, buffer} -> rollback_flushed(s, status, err, buffer) - {:reload, oid, s, buffer} -> - reload_flushed(s, status, query, oid, buffer) + {:reload, oids, s, buffer} -> + reload_flushed(s, status, query, oids, buffer) {:disconnect, _err, _s} = disconnect -> disconnect end @@ -1080,13 +1083,11 @@ defmodule Postgrex.Protocol do end defp describe(s, query, param_oids, result_oids, columns, buffer) do - with {:ok, query} <- describe_params(s, query, param_oids), - {:ok, query} <- describe_result(s, query, result_oids, columns) do - query_put(s, query) - {:ok, query, s, buffer} - else - {:reload, oid} -> - {:reload, oid, s, buffer} + case describe_params(s, query, param_oids) do + {:ok, query} -> + redescribe(s, query, result_oids, columns, buffer) + {:reload, oids} -> + reload_describe_result(s, oids, result_oids, buffer) {:error, err} -> {:disconnect, err, %{s | buffer: buffer}} end @@ -1097,8 +1098,8 @@ defmodule Postgrex.Protocol do query_put(s, query) {:ok, query, s, buffer} else - {:reload, oid} -> - {:reload, oid, s, buffer} + {:reload, oids} -> + {:reload, oids, s, buffer} {:error, err} -> {:disconnect, err, %{s | buffer: buffer}} end @@ -1113,6 +1114,20 @@ defmodule Postgrex.Protocol do end end + defp reload_describe_result(s, param_oids, nil, buffer) do + {:reload, param_oids, s, buffer} + end + defp reload_describe_result(%{types: types} = s, param_oids, result_oids, buffer) do + case fetch_type_info(result_oids, types) do + {:ok, _} -> + {:reload, param_oids, s, buffer} + {:reload, reload_oids} -> + {:reload, Enum.uniq(param_oids ++ reload_oids), s, buffer} + {:error, err} -> + {:disconnect, err, %{s | buffer: buffer}} + end + end + defp describe_result(%{types: types}, query, nil, nil) do query = %Query{query | ref: make_ref(), types: types, columns: nil, result_oids: nil, result_formats: [], result_types: nil} @@ -1154,31 +1169,31 @@ defmodule Postgrex.Protocol do end end - defp reload_transaction(s, status, query, oid, buffer) do + defp reload_transaction(s, status, query, oids, buffer) do %Query{name: name} = query msgs = [msg_close(type: :statement, name: name), msg_sync()] with {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer), :ok <- msg_send(s, msgs, buffer) do - reload_closed(s, status, query, oid, buffer) + reload_closed(s, status, query, oids, buffer) else {:disconnect, _err, _s} = disconnect -> disconnect end end - defp reload_flushed(s, %{mode: :transaction} = status, query, oid, buffer) do + defp reload_flushed(s, %{mode: :transaction} = status, query, oids, buffer) do %Query{name: name} = query msgs = [msg_close(type: :statement, name: name), msg_sync()] with :ok <- msg_send(s, msgs, buffer) do - reload_closed(s, status, query, oid, buffer) + reload_closed(s, status, query, oids, buffer) else {:disconnect, _err, _s} = disconnect -> disconnect end end - defp reload_flushed(s, %{mode: :savepoint} = status, query, oid, buffer) do + defp reload_flushed(s, %{mode: :savepoint} = status, query, oids, buffer) do %Query{name: name} = query rollback_release = "ROLLBACK TO SAVEPOINT postgrex_query;RELEASE SAVEPOINT postgrex_query" @@ -1189,76 +1204,78 @@ defmodule Postgrex.Protocol do {:ok, s, buffer} <- recv_close(s, status, buffer), {:ok, _, %{buffer: buffer} = s} <- recv_transaction(s, status, buffer) do - reload_spawn(%{s | buffer: nil}, status, query, oid, buffer) + reload_spawn(%{s | buffer: nil}, status, query, oids, buffer) else {:disconnect, _err, _s} = disconnect -> disconnect end end - defp reload_ready(s, status, query, oid, buffer) do + defp reload_ready(s, status, query, oids, buffer) do %Query{name: name} = query msgs = [msg_close(type: :statement, name: name), msg_sync()] with {:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer), :ok <- msg_send(s, msgs, buffer) do - reload_closed(s, status, query, oid, buffer) + reload_closed(s, status, query, oids, buffer) else {:disconnect, _err, _s} = disconnect -> disconnect end end - defp reload_closed(s, status, query, oid, buffer) do + defp reload_closed(s, status, query, oids, buffer) do with {:ok, s, buffer} <- recv_close(s, status, buffer), {:ok, %{buffer: buffer} = s} <- recv_ready(s, status, buffer) do - reload_spawn(%{s | buffer: nil}, status, query, oid, buffer) + reload_spawn(%{s | buffer: nil}, status, query, oids, buffer) else {:disconnect, _err, _s} = disconnect -> disconnect end end - defp fetch_type_info(oids, types, infos \\ []) - defp fetch_type_info([], _, infos) do + defp fetch_type_info(oids, types, infos \\ [], reload \\ []) + defp fetch_type_info([], _, infos, []) do {:ok, Enum.reverse(infos)} end - defp fetch_type_info([oid | oids], types, infos) do + defp fetch_type_info([], _, _, reloads) do + {:reload, Enum.uniq(reloads)} + end + defp fetch_type_info([oid | oids], types, infos, reloads) do case Postgrex.Types.fetch(oid, types) do {:ok, info} -> - fetch_type_info(oids, types, [info | infos]) + fetch_type_info(oids, types, [info | infos], reloads) {:error, %Postgrex.TypeInfo{} = info, mod} -> msg = Postgrex.Utils.type_msg(info, mod) {:error, RuntimeError.exception(message: msg)} {:error, nil, _} -> - {:reload, oid} + fetch_type_info(oids, types, infos, [oid | reloads]) end end - defp reload_spawn(s, status, query, oid, buffer) do + defp reload_spawn(s, status, query, oids, buffer) do Logger.warn(fn() -> - [inspect(query) | " uses unknown oid `#{oid}` causing bootstrap"] + [inspect(query) | + " uses unknown oid(s) #{Enum.join(oids, ", ")} causing bootstrap"] end) ref = make_ref() - {_, mon} = spawn_monitor(fn() -> reload_lock(s, status, ref, buffer) end) + {_, mon} = spawn_monitor(fn() -> reload_init(s, status, oids, ref, buffer) end) receive do {:DOWN, ^mon, _, _, {^ref, s, buffer}} -> - reload_fetch(s, status, query, oid, buffer) + reload_fetch(s, status, query, oids, buffer) {:DOWN, ^mon, _, _, _} -> {:disconnect, type_fetch_error(), %{s | buffer: buffer}} end end - defp reload_lock(%{types: types} = s, status, exit_ref, buffer) do + defp reload_init(%{types: types} = s, status, oids, exit_ref, buffer) do with {:ok, server} <- Postgrex.Types.owner(types), {:lock, lock_ref, ^types} <- TypeServer.fetch(server), status = Map.put(status, :types_lock, {server, lock_ref}), - {:ok, s} <- reload_send(s, status, types, buffer) do + {:ok, s} <- reload(s, status, types, oids, buffer) do %{buffer: buffer} = s exit({exit_ref, %{s | buffer: nil}, buffer}) else - {:go, ^types} -> - exit({exit_ref, s, buffer}) :noproc -> exit(:normal) :error -> @@ -1268,27 +1285,35 @@ defmodule Postgrex.Protocol do end end - defp reload_send(s, status, types, buffer) do + defp reload(s, status, types, oids, buffer) do %{parameters: parameters} = s - case Postgrex.Parameters.fetch(parameters) do - {:ok, parameters} -> - status = %{status | mode: :transaction} - bootstrap_send(s, status, types, parameters, buffer, &sync_recv/3) + with {:ok, parameters} <- Postgrex.Parameters.fetch(parameters) do + version = Postgrex.Utils.parse_version(parameters["server_version"]) + statement = Types.reload_query(version, oids, types) + if statement do + status = %{status | mode: :transaction} + bootstrap_send(s, status, statement, buffer, &sync_recv/3) + else + %{types_lock: {server, ref}} = status + TypeServer.done(server, ref) + {:ok, %{s | buffer: buffer}} + end + else :error -> s = %{s | buffer: buffer} {:error, %Postgrex.Error{message: "parameters not available"}, s} end end - defp reload_fetch(%{types: types} = s, status, query, oid, buffer) do - case Postgrex.Types.fetch(oid, types) do + defp reload_fetch(%{types: types} = s, status, query, oids, buffer) do + case fetch_type_info(oids, types) do {:ok, _} -> reload_prepare(%{s | buffer: buffer}, status, query) {:error, %Postgrex.TypeInfo{} = info, mod} -> msg = Postgrex.Utils.type_msg(info, mod) reload_error(s, msg, buffer) - {:error, nil, _} -> - msg = "oid `#{oid}` lacks type information after bootstrap" + {:reload, oids} -> + msg = "oid(s) #{Enum.join(oids, ", ")} lack type information after bootstrap" reload_error(s, msg, buffer) end end diff --git a/lib/postgrex/type_server.ex b/lib/postgrex/type_server.ex index baee00873..c6b6b42e3 100644 --- a/lib/postgrex/type_server.ex +++ b/lib/postgrex/type_server.ex @@ -19,12 +19,10 @@ defmodule Postgrex.TypeServer do Fetches a lock for the given type server. We attempt to achieve a lock on the type server for updating the entries. - If another process got the lock, we wait until the entries are available - or the other process crashes. If the module is unknown returns `:not_found`. + If another process got the lock we wait for it to finish. """ @spec fetch(pid) :: - {:lock, reference, Postgrex.Types.state} | {:go, Postgrex.Types.state} | - :noproc | :error + {:lock, reference, Postgrex.Types.state} | :noproc | :error def fetch(server) do try do GenServer.call(server, :fetch, @timeout) @@ -44,11 +42,11 @@ defmodule Postgrex.TypeServer do end @doc """ - Unlocks the given reference for a given module after types query failed. + Unlocks the given reference for a given module if no update. """ - @spec fail(pid, reference) :: :ok - def fail(server, ref) do - GenServer.cast(server, {:fail, ref}) + @spec done(pid, reference) :: :ok + def done(server, ref) do + GenServer.cast(server, {:done, ref}) end ## Callbacks @@ -57,7 +55,8 @@ defmodule Postgrex.TypeServer do _ = Process.flag(:trap_exit, true) Process.link(starter) state = %__MODULE__{types: Postgrex.Types.new(module), - connections: MapSet.new([starter]), waiting: %{}} + connections: MapSet.new([starter]), + waiting: :queue.new()} {:ok, state} end @@ -68,25 +67,19 @@ defmodule Postgrex.TypeServer do wait(state, from) end - def handle_call({:update, ref, type_infos}, _, %{lock: ref} = state) + def handle_call({:update, ref, type_infos}, from, %{lock: ref} = state) when is_reference(ref) do - associate(state, type_infos) + associate(state, type_infos, from) end - def handle_cast({:fail, ref}, %{lock: ref} = state) when is_reference(ref) do + def handle_cast({:done, ref}, %{lock: ref} = state) when is_reference(ref) do Process.demonitor(ref, [:flush]) - failure(state) - end - - - def handle_info({:go, ref}, %{lock: ref} = state) - when is_reference(ref) do - go(state) + next(state) end def handle_info({:DOWN, ref, _, _, _}, %{lock: ref} = state) when is_reference(ref) do - failure(state) + next(state) end def handle_info({:DOWN, ref, _, _, _}, state) do down(state, ref) @@ -114,39 +107,29 @@ defmodule Postgrex.TypeServer do Process.link(pid) mref = Process.monitor(pid) state = %{state | connections: MapSet.put(connections, pid), - waiting: Map.put(waiting, mref, from)} + waiting: :queue.in({mref, from}, waiting)} {:noreply, state} end - defp associate(%{types: types, lock: ref} = state, type_infos) do + defp associate(%{types: types, lock: ref} = state, type_infos, from) do Postgrex.Types.associate_type_infos(type_infos, types) Process.demonitor(ref, [:flush]) - # flush message queue of waiters - send(self(), {:go, ref}) - {:reply, :go, state} - end - - defp go(%{types: types} = state) do - %{state | lock: nil} - |> reply({:go, types}) - |> check_processes() - end - - defp failure(state) do - %{state | lock: nil} - |> reply(:error) - |> check_processes() + GenServer.reply(from, :go) + next(state) end - defp reply(%{waiting: waiting} = state, resp) do - _ = for {mref, from} <- waiting do - Process.demonitor(mref, [:flush, :info]) && GenServer.reply(from, resp) + defp next(%{types: types, waiting: waiting} = state) do + case :queue.out(waiting) do + {{:value, {mref, from}}, waiting} -> + GenServer.reply(from, {:lock, mref, types}) + {:noreply, %{state | lock: mref, waiting: waiting}} + {:empty, waiting} -> + check_processes(%{state | lock: nil, waiting: waiting}) end - %{state | waiting: %{}} end defp down(%{waiting: waiting} = state, ref) do - check_processes(%{state | waiting: Map.delete(waiting, ref)}) + check_processes(%{state | waiting: :queue.filter(fn {mref, _} -> mref != ref end, waiting)}) end defp exit(%{connections: connections} = state, pid) do diff --git a/lib/postgrex/types.ex b/lib/postgrex/types.ex index 7924f7557..167d8fb18 100644 --- a/lib/postgrex/types.ex +++ b/lib/postgrex/types.ex @@ -42,10 +42,26 @@ defmodule Postgrex.Types do end @doc false - @spec bootstrap_query({pos_integer, non_neg_integer, non_neg_integer}, state) :: binary + @spec bootstrap_query({pos_integer, non_neg_integer, non_neg_integer}, state) :: binary | nil def bootstrap_query(version, {_, table}) do - oids = :ets.select(table, [{{:"$1", :_, :_}, [], [:"$1"]}]) + case :ets.info(table, :size) do + 0 -> + # avoid loading information about table-types + # since there might be a lot them and most likely + # they won't be used; subsequent bootstrap will + # fetch them along with any other "new" types + filter_oids = + """ + WHERE (t.typrelid = 0) + AND (t.typelem = 0 OR t.typelem NOT IN (SELECT oid FROM pg_catalog.pg_type WHERE typrelid!=0)) + """ + build_bootstrap_query(version, filter_oids) + _ -> + nil + end + end + defp build_bootstrap_query(version, filter_oids) do {typelem, join_domain} = if version >= {9, 0, 0} do {"coalesce(d.typelem, t.typelem)", @@ -62,28 +78,6 @@ defmodule Postgrex.Types do {"0", ""} end - filter_oids = - case oids do - [] -> - # avoid loading information about table-types - # since there might be a lot them and most likely - # they won't be used; subsequent bootstrap will - # fetch them along with any other "new" types - """ - WHERE (t.typrelid=0) - AND (t.typelem = 0 OR t.typelem NOT IN (SELECT oid FROM pg_catalog.pg_type WHERE typrelid!=0)) - """ - _ -> - # equiv to `WHERE t.oid NOT IN (SELECT unnest(ARRAY[#{Enum.join(oids, ",")}]))` - # `unnest` is not supported in redshift or postgres version prior to 8.4 - """ - WHERE t.oid NOT IN ( - SELECT (ARRAY[#{Enum.join(oids, ",")}])[i] - FROM generate_series(1, #{length(oids)}) AS i - ) - """ - end - """ SELECT t.oid, t.typname, t.typsend, t.typreceive, t.typoutput, t.typinput, #{typelem}, #{rngsubtype}, ARRAY ( @@ -99,6 +93,17 @@ defmodule Postgrex.Types do """ end + @doc false + @spec reload_query({pos_integer, non_neg_integer, non_neg_integer}, [oid, ...], state) :: binary | nil + def reload_query(version, oids, {_, table}) do + case Enum.reject(oids, &:ets.member(table, &1)) do + [] -> + nil + oids -> + build_bootstrap_query(version, "WHERE t.oid IN (#{Enum.join(oids, ", ")})") + end + end + @doc false @spec build_type_info(binary) :: TypeInfo.t def build_type_info(row) do diff --git a/test/alter_test.exs b/test/alter_test.exs index 44196f2ab..21e95c175 100644 --- a/test/alter_test.exs +++ b/test/alter_test.exs @@ -200,6 +200,22 @@ defmodule AlterTest do assert [[{1, 2}]] = query("SELECT a FROM missing_comp_table", []) end + test "new oids in param and result are bootstrapped", context do + assert :ok = query("CREATE TYPE missing_comp AS (a int, b int)", []) + assert :ok = query("CREATE TYPE missing_enum AS ENUM ('missing')", []) + assert :ok = query("CREATE TABLE missing_comp_table (a missing_comp, b missing_enum DEFAULT 'missing')", []) + + assert [["missing"]] = query("INSERT INTO missing_comp_table VALUES ($1, DEFAULT) RETURNING b", [{1, 2}]) + end + + test "duplicate oid is bootstrapped", context do + assert :ok = query("CREATE TYPE missing_comp AS (a int, b int)", []) + assert :ok = query("CREATE TABLE missing_comp_table (a missing_comp, b missing_comp)", []) + + assert :ok = query("INSERT INTO missing_comp_table VALUES ($1, $2)", [{1, 2}, {3, 4}]) + assert [[{1, 2}, {3, 4}]] = query("SELECT a, b FROM missing_comp_table", []) + end + @tag prepare: :unnamed test "new oid is bootstrapped with unnamed", context do assert :ok = query("CREATE TYPE missing_enum AS ENUM ('missing')", []) diff --git a/test/query_test.exs b/test/query_test.exs index 75a282b89..97c87162d 100644 --- a/test/query_test.exs +++ b/test/query_test.exs @@ -242,6 +242,7 @@ defmodule QueryTest do assert [["x"]] == query("SELECT $1::\"char\"", ["x"]) end + @tag :capture_log test "decode record", context do assert [[{1, "2"}]] = query("SELECT (1, '2')::composite1", []) assert [[[{1, "2"}]]] = query("SELECT ARRAY[(1, '2')::composite1]", []) @@ -537,6 +538,7 @@ defmodule QueryTest do assert [[[1, nil, 3]]] = query("SELECT $1::integer[]", [[1, nil, 3]]) end + @tag :capture_log test "encode record", context do assert [[{1, "2"}]] = query("SELECT $1::composite1", [{1, "2"}]) assert [[[{1, "2"}]]] = query("SELECT $1::composite1[]", [[{1, "2"}]]) diff --git a/test/type_server_test.exs b/test/type_server_test.exs index 94ceafc61..a9c11a32a 100644 --- a/test/type_server_test.exs +++ b/test/type_server_test.exs @@ -22,7 +22,7 @@ defmodule TypeServerTest do assert {:lock, _, ^types} = TS.fetch(server) end - test "blocks on initial fetch until lock is returned" do + test "blocks on initial fetch until update returns lock" do key = make_ref() server = TM.get(@types, key) {:lock, ref, types} = TS.fetch(server) @@ -30,10 +30,10 @@ defmodule TypeServerTest do task = Task.async fn -> TS.fetch(server) end :timer.sleep(100) TS.update(server, ref, []) - assert {:go, ^types} = Task.await(task) + assert {:lock, _, ^types} = Task.await(task) end - test "blocks on later fetch until lock is returned" do + test "blocks on later fetch until update returns lock" do key = make_ref() server = TM.get(@types, key) {:lock, ref, types} = TS.fetch(server) @@ -43,22 +43,24 @@ defmodule TypeServerTest do task = Task.async fn -> TS.fetch(server) end :timer.sleep(100) + assert Task.yield(task, 0) == nil TS.update(server, ref, []) - assert {:go, ^types} = Task.await(task) + assert {:lock, _, ^types} = Task.await(task) end - test "blocks on initial fetch until fail is returned" do + test "blocks on initial fetch until done returns lock" do key = make_ref() server = TM.get(@types, key) - {:lock, ref, _} = TS.fetch(server) + {:lock, ref, types} = TS.fetch(server) task = Task.async fn -> TS.fetch(server) end :timer.sleep(100) - TS.fail(server, ref) - assert :error = Task.await(task) + assert Task.yield(task, 0) == nil + TS.done(server, ref) + assert {:lock, _, ^types} = Task.await(task) end - test "blocks on later fetch until fail is returned" do + test "blocks on later fetch until done returns lock" do key = make_ref() server = TM.get(@types, key) {:lock, ref, types} = TS.fetch(server) @@ -68,9 +70,9 @@ defmodule TypeServerTest do task = Task.async fn -> TS.fetch(server) end :timer.sleep(100) - nil = Task.yield(task, 0) - TS.fail(server, ref) - assert :error = Task.await(task) + assert Task.yield(task, 0) == nil + TS.done(server, ref) + assert {:lock, _, ^types} = Task.await(task) end test "fetches existing table even if parent crashes" do @@ -89,31 +91,38 @@ defmodule TypeServerTest do assert {:lock, _, ^types} = Task.await(task) end - test "locks existing table even if other waiting processes crash" do + test "the lock is granted to single process one by one" do key = make_ref() server = TM.get(@types, key) {:lock, ref, types} = TS.fetch(server) TS.update(server, ref, []) + parent = self() task = fn() -> - case TS.fetch(server) do - {:lock, ref2, _} = result -> - :timer.sleep(100) + result = TS.fetch(server) + send(parent, {self(), result}) + case result do + {:lock, ref2, _} -> + assert_receive {^parent, :go} TS.update(server, ref2, []) - result - result -> - result + _ -> + :ok end end - task1 = Task.async(task) - task2 = Task.async(task) - task3 = Task.async(task) + {:ok, _} = Task.start_link(task) + {:ok, _} = Task.start_link(task) + {:ok, _} = Task.start_link(task) - assert [{:go, ^types}, {:go, ^types}, {:lock, _, ^types}] = - Enum.sort([Task.await(task1), Task.await(task2), Task.await(task3)]) + for _ <- 1..3 do + assert_receive {pid, {:lock, _, ^types}} + :timer.sleep(100) + :sys.get_state(server) + refute_received _ + send(pid, {parent, :go}) + end assert {:lock, _, ^types} = TS.fetch(server) end @@ -167,12 +176,12 @@ defmodule TypeServerTest do :timer.sleep(:infinity) end - assert_receive {:lock, _, _} + assert_receive {:lock, _, types} task = Task.async(fn -> TS.fetch(server) end) Process.exit(pid, :kill) wait_until_dead(pid) - assert :error = Task.await(task) + assert {:lock, _, ^types} = Task.await(task) end defp wait_until_dead(pid) do