From 793ed69bf2dd032fab08004236447c80611299b0 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Fri, 28 Jun 2024 11:14:22 -0500 Subject: [PATCH] Let user start their own storage if needed --- lib/absinthe/subscription.ex | 13 +- .../subscription/default_document_storage.ex | 30 +- lib/absinthe/subscription/document_storage.ex | 53 +-- lib/absinthe/subscription/supervisor.ex | 36 +- .../subscription/document_storage_test.exs | 373 ++++++++++++++++++ 5 files changed, 412 insertions(+), 93 deletions(-) create mode 100644 test/absinthe/subscription/document_storage_test.exs diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index c1ec45e6..c360a5d6 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -159,18 +159,13 @@ defmodule Absinthe.Subscription do Module.concat([pubsub, :Registry]) end - @doc false - def document_storage_name(pubsub) do - Module.concat([pubsub, :Storage]) - end - - def document_storage(pubsub) do - {:ok, document_storage} = + def storage_module(pubsub) do + {:ok, storage} = pubsub |> registry_name - |> Registry.meta(:document_storage) + |> Registry.meta(:storage) - document_storage + storage end @doc false diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex index d6ffea10..ee48af54 100644 --- a/lib/absinthe/subscription/default_document_storage.ex +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -1,47 +1,49 @@ defmodule Absinthe.Subscription.DefaultDocumentStorage do @behaviour Absinthe.Subscription.DocumentStorage - @moduledoc """ Default document storage for Absinthe. Stores subscription documents and field keys in a Registry. """ - @impl Absinthe.Subscription.DocumentStorage - def child_spec(opts) do - Registry.child_spec(opts) - end + alias Absinthe.Subscription @impl Absinthe.Subscription.DocumentStorage - def put(storage_process_name, doc_id, doc_value, field_keys) do + def put(pubsub, doc_id, doc_value, field_keys) do + registry = Subscription.registry_name(pubsub) + pdict_add_fields(doc_id, field_keys) for field_key <- field_keys do - {:ok, _} = Registry.register(storage_process_name, field_key, doc_id) + {:ok, _} = Registry.register(registry, field_key, doc_id) end - {:ok, _} = Registry.register(storage_process_name, doc_id, doc_value) + {:ok, _} = Registry.register(registry, doc_id, doc_value) end @impl Absinthe.Subscription.DocumentStorage - def delete(storage_process_name, doc_id) do + def delete(pubsub, doc_id) do + registry = Subscription.registry_name(pubsub) + for field_key <- pdict_fields(doc_id) do - Registry.unregister(storage_process_name, field_key) + Registry.unregister(registry, field_key) end pdict_delete_fields(doc_id) - Registry.unregister(storage_process_name, doc_id) + Registry.unregister(registry, doc_id) :ok end @impl Absinthe.Subscription.DocumentStorage - def get_docs_by_field_key(storage_process_name, field_key) do - storage_process_name + def get_docs_by_field_key(pubsub, field_key) do + registry = Subscription.registry_name(pubsub) + + registry |> Registry.lookup(field_key) |> MapSet.new(fn {_pid, doc_id} -> doc_id end) |> Enum.reduce(%{}, fn doc_id, acc -> - case Registry.lookup(storage_process_name, doc_id) do + case Registry.lookup(registry, doc_id) do [] -> acc diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index 17821a3b..7f20f7b1 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -8,47 +8,22 @@ defmodule Absinthe.Subscription.DocumentStorage do the storage for subscription documents. This behaviour can be implemented to allow for a custom storage solution if needed. - The `child_spec` is used so that Absinthe can start your process when starting `Absinthe.Subscription`. - - To tell `Absinthe.Subscription` to use your custom storage, make sure to pass in `document_storage` and `storage_opts` - when adding `Absinthe.Subscription` to your application supervisor. - - ```elixir - {Absinthe.Subscription, pubsub: MyApp.Pubsub, document_storage: MyApp.DocumentStorage, storage_opts: [key1: value1, key2: value2]} - ``` - - Absinthe.Subscription will update `storage_opts` to include a `name` key. This will be the name `Absinthe.Subscription` uses to - reference the process. + When starting `Absinthe.Subscription`, include `storage`. Defaults to `Absinthe.Subscription.DefaultDocumentStorage` ```elixir - @impl Absinthe.Subscription.DocumentStorage - def child_spec(opts) do - # opts is the `storage_opts` with the `name` key added - { - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]} - } - end + {Absinthe.Subscription, pubsub: MyApp.Pubsub, storage: MyApp.DocumentStorage} ``` """ alias Absinthe.Subscription alias Absinthe.Subscription.PipelineSerializer - @doc """ - Child spec to determine how to start the - Document storage process. This will be supervised. Absinthe will give - the process a name and that name will be passed in the other callbacks - in order to reference it there. - """ - @callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() - @doc """ Adds `doc` to storage with `doc_id` as the key. Associates the given `field_keys` with `doc_id`. """ @callback put( - storage_process_name :: atom, + pubsub :: atom, doc_id :: term, doc :: %{ initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(), @@ -61,20 +36,20 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc """ Removes the document. Along with any field_keys associated with it """ - @callback delete(storage_process_name :: atom, doc_id :: term) :: :ok + @callback delete(pubsub :: atom, doc_id :: term) :: :ok @doc """ Get all docs associated with `field_key` """ @callback get_docs_by_field_key( - storage_process_name :: atom, + pubsub :: atom, field_key :: {field :: term, key :: term} ) :: map() @doc false def put(pubsub, doc_id, doc, field_keys) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :put], @@ -92,7 +67,7 @@ defmodule Absinthe.Subscription.DocumentStorage do source: doc.source } - result = storage_module.put(storage_process_name, doc_id, doc_value, field_keys) + result = storage_module.put(pubsub, doc_id, doc_value, field_keys) {result, %{ @@ -107,7 +82,7 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc false def delete(pubsub, doc_id) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :delete], @@ -116,7 +91,7 @@ defmodule Absinthe.Subscription.DocumentStorage do storage_module: storage_module }, fn -> - result = storage_module.delete(storage_process_name, doc_id) + result = storage_module.delete(pubsub, doc_id) {result, %{ @@ -129,7 +104,7 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc false def get_docs_by_field_key(pubsub, field_key) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :get_docs_by_field_key], @@ -139,7 +114,7 @@ defmodule Absinthe.Subscription.DocumentStorage do }, fn -> result = - storage_process_name + pubsub |> storage_module.get_docs_by_field_key(field_key) |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> initial_phases = PipelineSerializer.unpack(initial_phases) @@ -155,10 +130,4 @@ defmodule Absinthe.Subscription.DocumentStorage do end ) end - - defp storage_info(pubsub) do - storage_module = Subscription.document_storage(pubsub) - storage_process_name = Subscription.document_storage_name(pubsub) - {storage_module, storage_process_name} - end end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 1d85cd7a..d56ae97a 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -23,44 +23,24 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) + storage = Keyword.get(opts, :storage, Absinthe.Subscription.DefaultDocumentStorage) - document_storage = - Keyword.get(opts, :document_storage, Absinthe.Subscription.DefaultDocumentStorage) - - storage_opts = - case document_storage do - Absinthe.Subscription.DefaultDocumentStorage -> - [ - keys: :duplicate, - partitions: System.schedulers_online(), - compressed: compress_registry? - ] - - _ -> - Keyword.get(opts, :storage_opts, Keyword.new()) - end - - Supervisor.start_link( - __MODULE__, - {pubsub, pool_size, document_storage, storage_opts} - ) + Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, storage}) end - def init({pubsub, pool_size, document_storage, storage_opts}) do + def init({pubsub, pool_size, compress_registry?, storage}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - meta = [pool_size: pool_size, document_storage: document_storage] - - storage_opts = - Keyword.put(storage_opts, :name, Absinthe.Subscription.document_storage_name(pubsub)) + meta = [pool_size: pool_size, storage: storage] children = [ {Registry, [ - keys: :unique, + keys: :duplicate, name: registry_name, - meta: meta + partitions: System.schedulers_online(), + meta: meta, + compressed: compress_registry? ]}, - document_storage.child_spec(storage_opts), {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} ] diff --git a/test/absinthe/subscription/document_storage_test.exs b/test/absinthe/subscription/document_storage_test.exs new file mode 100644 index 00000000..970a909b --- /dev/null +++ b/test/absinthe/subscription/document_storage_test.exs @@ -0,0 +1,373 @@ +defmodule Absinthe.Subscription.DocumentStorageTest do + use Absinthe.Case + + defmodule TestDocumentStorageSchema do + use Absinthe.Schema + + query do + field :foo, :string + end + + object :user do + field :id, :id + field :name, :string + + field :group, :group do + resolve fn user, _, %{context: %{test_pid: pid}} -> + batch({__MODULE__, :batch_get_group, pid}, nil, fn _results -> + {:ok, user.group} + end) + end + end + end + + object :group do + field :name, :string + end + + def batch_get_group(test_pid, _) do + # send a message to the test process every time we access this function. + # if batching is working properly, it should only happen once. + send(test_pid, :batch_get_group) + %{} + end + + subscription do + field :raises, :string do + config fn _, _ -> + {:ok, topic: "*"} + end + + resolve fn _, _, _ -> + raise "boom" + end + end + + field :user, :user do + arg :id, :id + + config fn args, _ -> + {:ok, topic: args[:id] || "*"} + end + + trigger :update_user, + topic: fn user -> + [user.id, "*"] + end + end + + field :thing, :string do + arg :client_id, non_null(:id) + + config fn + _args, %{context: %{authorized: false}} -> + {:error, "unauthorized"} + + args, _ -> + { + :ok, + topic: args.client_id + } + end + end + + field :multiple_topics, :string do + config fn _, _ -> + {:ok, topic: ["topic_1", "topic_2", "topic_3"]} + end + end + + field :other_user, :user do + arg :id, :id + + config fn + args, %{context: %{context_id: context_id, document_id: document_id}} -> + {:ok, topic: args[:id] || "*", context_id: context_id, document_id: document_id} + + args, %{context: %{context_id: context_id}} -> + {:ok, topic: args[:id] || "*", context_id: context_id} + end + end + + field :relies_on_document, :string do + config fn _, %{document: %Absinthe.Blueprint{} = document} -> + %{type: :subscription, name: op_name} = Absinthe.Blueprint.current_operation(document) + {:ok, topic: "*", context_id: "*", document_id: op_name} + end + end + end + + mutation do + field :update_user, :user do + arg :id, non_null(:id) + + resolve fn _, %{id: id}, _ -> + {:ok, %{id: id, name: "foo"}} + end + end + end + end + + defmodule TestDocumentStoragePubSub do + @behaviour Absinthe.Subscription.Pubsub + + def start_link() do + Registry.start_link(keys: :duplicate, name: __MODULE__) + end + + def node_name() do + node() + end + + def subscribe(topic) do + Registry.register(__MODULE__, topic, []) + :ok + end + + def publish_subscription(topic, data) do + message = %{ + topic: topic, + event: "subscription:data", + result: data + } + + Registry.dispatch(__MODULE__, topic, fn entries -> + for {pid, _} <- entries, do: send(pid, {:broadcast, message}) + end) + end + + def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do + # this pubsub is local and doesn't support clusters + :ok + end + end + + defmodule TestDocumentStorage do + @behaviour Absinthe.Subscription.DocumentStorage + use GenServer + + def start_link(_) do + GenServer.start_link(__MODULE__, nil, name: __MODULE__) + end + + @impl GenServer + def init(_) do + {:ok, %{docs: %{}, field_keys: %{}}} + end + + @impl GenServer + def handle_cast({:put, doc_id, doc_value, field_keys}, %{ + docs: docs, + field_keys: field_keys_map + }) do + docs = Map.put_new(docs, doc_id, doc_value) + + field_keys_map = + Enum.reduce(field_keys, field_keys_map, fn field_key, field_keys_map -> + Map.update(field_keys_map, field_key, [doc_id], fn doc_ids -> [doc_id | doc_ids] end) + end) + + {:noreply, %{docs: docs, field_keys: field_keys_map}} + end + + @impl GenServer + def handle_cast({:delete, doc_id}, %{ + docs: docs, + field_keys: field_keys_map + }) do + docs = Map.delete(docs, doc_id) + + field_keys_map = + Enum.map(field_keys_map, fn {field_key, doc_ids} -> + doc_ids = List.delete(doc_ids, doc_id) + {field_key, doc_ids} + end) + |> Map.new() + + {:noreply, %{docs: docs, field_keys: field_keys_map}} + end + + @impl GenServer + def handle_call( + {:get_docs_by_field_key, field_key}, + _from, + %{ + docs: docs, + field_keys: field_keys_map + } = state + ) do + doc_ids = Map.get(field_keys_map, field_key, []) + + docs_to_return = + docs + |> Enum.filter(fn {doc_id, _} -> doc_id in doc_ids end) + |> Map.new() + + {:reply, docs_to_return, state} + end + + @impl Absinthe.Subscription.DocumentStorage + def put(_pubsub, doc_id, doc_value, field_keys) do + GenServer.cast(__MODULE__, {:put, doc_id, doc_value, field_keys}) + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def delete(_pubsub, doc_id) do + GenServer.cast(__MODULE__, {:delete, doc_id}) + + :ok + end + + @impl Absinthe.Subscription.DocumentStorage + def get_docs_by_field_key(_pubsub, field_key) do + GenServer.call(__MODULE__, {:get_docs_by_field_key, field_key}) + end + end + + def run_subscription(query, schema, opts \\ []) do + opts = + Keyword.update( + opts, + :context, + %{pubsub: TestDocumentStoragePubSub}, + &Map.put(&1, :pubsub, opts[:context][:pubsub] || TestDocumentStoragePubSub) + ) + + case run(query, schema, opts) do + {:ok, %{"subscribed" => topic}} = val -> + opts[:context][:pubsub].subscribe(topic) + val + + val -> + val + end + end + + setup do + start_supervised(TestDocumentStorage) + + start_supervised(%{ + id: TestDocumentStoragePubSub, + start: {TestDocumentStoragePubSub, :start_link, []} + }) + + start_supervised( + {Absinthe.Subscription, pubsub: TestDocumentStoragePubSub, storage: TestDocumentStorage} + ) + + :ok + end + + @query """ + subscription ($clientId: ID!) { + thing(clientId: $clientId) + } + """ + test "can subscribe the current process" do + client_id = "abc" + + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{"clientId" => client_id}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", thing: client_id) + + assert_receive({:broadcast, msg}) + + assert %{ + event: "subscription:data", + result: %{data: %{"thing" => "foo"}}, + topic: topic + } == msg + end + + @query """ + subscription ($clientId: ID!) { + thing(clientId: $clientId) + } + """ + test "can unsubscribe the current process" do + client_id = "abc" + + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{"clientId" => client_id}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.unsubscribe(TestDocumentStoragePubSub, topic) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", thing: client_id) + + refute_receive({:broadcast, _}) + end + + @query """ + subscription { + multipleTopics + } + """ + test "schema can provide multiple topics to subscribe to" do + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + msg = %{ + event: "subscription:data", + result: %{data: %{"multipleTopics" => "foo"}}, + topic: topic + } + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_1") + + assert_receive({:broadcast, ^msg}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_2") + + assert_receive({:broadcast, ^msg}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_3") + + assert_receive({:broadcast, ^msg}) + end + + @query """ + subscription { + multipleTopics + } + """ + test "unsubscription works when multiple topics are provided" do + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + TestDocumentStorageSchema, + variables: %{}, + context: %{pubsub: TestDocumentStoragePubSub} + ) + + Absinthe.Subscription.unsubscribe(TestDocumentStoragePubSub, topic) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_1") + + refute_receive({:broadcast, _}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_2") + + refute_receive({:broadcast, _}) + + Absinthe.Subscription.publish(TestDocumentStoragePubSub, "foo", multiple_topics: "topic_3") + + refute_receive({:broadcast, _}) + end +end