Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bryanj/graph 1102/customizable document storage2 #11

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,13 @@ defmodule Absinthe.Subscription do
Module.concat([pubsub, :Registry])
end

@doc false
def document_storage_name(pubsub) do
Module.concat([pubsub, :Storage])
end

def document_storage(pubsub) do
{:ok, document_storage} =
def storage_module(pubsub) do
{:ok, storage} =
pubsub
|> registry_name
|> Registry.meta(:document_storage)
|> Registry.meta(:storage)

document_storage
storage
end

@doc false
Expand Down
30 changes: 16 additions & 14 deletions lib/absinthe/subscription/default_document_storage.ex
Original file line number Diff line number Diff line change
@@ -1,47 +1,49 @@
defmodule Absinthe.Subscription.DefaultDocumentStorage do
@behaviour Absinthe.Subscription.DocumentStorage

@moduledoc """
Default document storage for Absinthe. Stores subscription
documents and field keys in a Registry.
"""

@impl Absinthe.Subscription.DocumentStorage
def child_spec(opts) do
Registry.child_spec(opts)
end
alias Absinthe.Subscription

@impl Absinthe.Subscription.DocumentStorage
def put(storage_process_name, doc_id, doc_value, field_keys) do
def put(pubsub, doc_id, doc_value, field_keys) do
registry = Subscription.registry_name(pubsub)

pdict_add_fields(doc_id, field_keys)

for field_key <- field_keys do
{:ok, _} = Registry.register(storage_process_name, field_key, doc_id)
{:ok, _} = Registry.register(registry, field_key, doc_id)
end

{:ok, _} = Registry.register(storage_process_name, doc_id, doc_value)
{:ok, _} = Registry.register(registry, doc_id, doc_value)
end

@impl Absinthe.Subscription.DocumentStorage
def delete(storage_process_name, doc_id) do
def delete(pubsub, doc_id) do
registry = Subscription.registry_name(pubsub)

for field_key <- pdict_fields(doc_id) do
Registry.unregister(storage_process_name, field_key)
Registry.unregister(registry, field_key)
end

pdict_delete_fields(doc_id)

Registry.unregister(storage_process_name, doc_id)
Registry.unregister(registry, doc_id)

:ok
end

@impl Absinthe.Subscription.DocumentStorage
def get_docs_by_field_key(storage_process_name, field_key) do
storage_process_name
def get_docs_by_field_key(pubsub, field_key) do
registry = Subscription.registry_name(pubsub)

registry
|> Registry.lookup(field_key)
|> MapSet.new(fn {_pid, doc_id} -> doc_id end)
|> Enum.reduce(%{}, fn doc_id, acc ->
case Registry.lookup(storage_process_name, doc_id) do
case Registry.lookup(registry, doc_id) do
[] ->
acc

Expand Down
53 changes: 11 additions & 42 deletions lib/absinthe/subscription/document_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,22 @@ defmodule Absinthe.Subscription.DocumentStorage do
the storage for subscription documents. This behaviour can be implemented to
allow for a custom storage solution if needed.

The `child_spec` is used so that Absinthe can start your process when starting `Absinthe.Subscription`.

To tell `Absinthe.Subscription` to use your custom storage, make sure to pass in `document_storage` and `storage_opts`
when adding `Absinthe.Subscription` to your application supervisor.

```elixir
{Absinthe.Subscription, pubsub: MyApp.Pubsub, document_storage: MyApp.DocumentStorage, storage_opts: [key1: value1, key2: value2]}
```

Absinthe.Subscription will update `storage_opts` to include a `name` key. This will be the name `Absinthe.Subscription` uses to
reference the process.
When starting `Absinthe.Subscription`, include `storage`. Defaults to `Absinthe.Subscription.DefaultDocumentStorage`

```elixir
@impl Absinthe.Subscription.DocumentStorage
def child_spec(opts) do
# opts is the `storage_opts` with the `name` key added
{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]}
}
end
{Absinthe.Subscription, pubsub: MyApp.Pubsub, storage: MyApp.DocumentStorage}
```
"""

alias Absinthe.Subscription
alias Absinthe.Subscription.PipelineSerializer

@doc """
Child spec to determine how to start the
Document storage process. This will be supervised. Absinthe will give
the process a name and that name will be passed in the other callbacks
in order to reference it there.
"""
@callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec()

@doc """
Adds `doc` to storage with `doc_id` as the key. Associates the given
`field_keys` with `doc_id`.
"""
@callback put(
storage_process_name :: atom,
pubsub :: atom,
doc_id :: term,
doc :: %{
initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(),
Expand All @@ -61,20 +36,20 @@ defmodule Absinthe.Subscription.DocumentStorage do
@doc """
Removes the document. Along with any field_keys associated with it
"""
@callback delete(storage_process_name :: atom, doc_id :: term) :: :ok
@callback delete(pubsub :: atom, doc_id :: term) :: :ok

@doc """
Get all docs associated with `field_key`
"""
@callback get_docs_by_field_key(
storage_process_name :: atom,
pubsub :: atom,
field_key :: {field :: term, key :: term}
) ::
map()

@doc false
def put(pubsub, doc_id, doc, field_keys) do
{storage_module, storage_process_name} = storage_info(pubsub)
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :put],
Expand All @@ -92,7 +67,7 @@ defmodule Absinthe.Subscription.DocumentStorage do
source: doc.source
}

result = storage_module.put(storage_process_name, doc_id, doc_value, field_keys)
result = storage_module.put(pubsub, doc_id, doc_value, field_keys)

{result,
%{
Expand All @@ -107,7 +82,7 @@ defmodule Absinthe.Subscription.DocumentStorage do

@doc false
def delete(pubsub, doc_id) do
{storage_module, storage_process_name} = storage_info(pubsub)
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :delete],
Expand All @@ -116,7 +91,7 @@ defmodule Absinthe.Subscription.DocumentStorage do
storage_module: storage_module
},
fn ->
result = storage_module.delete(storage_process_name, doc_id)
result = storage_module.delete(pubsub, doc_id)

{result,
%{
Expand All @@ -129,7 +104,7 @@ defmodule Absinthe.Subscription.DocumentStorage do

@doc false
def get_docs_by_field_key(pubsub, field_key) do
{storage_module, storage_process_name} = storage_info(pubsub)
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :get_docs_by_field_key],
Expand All @@ -139,7 +114,7 @@ defmodule Absinthe.Subscription.DocumentStorage do
},
fn ->
result =
storage_process_name
pubsub
|> storage_module.get_docs_by_field_key(field_key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
Expand All @@ -155,10 +130,4 @@ defmodule Absinthe.Subscription.DocumentStorage do
end
)
end

defp storage_info(pubsub) do
storage_module = Subscription.document_storage(pubsub)
storage_process_name = Subscription.document_storage_name(pubsub)
{storage_module, storage_process_name}
end
end
36 changes: 8 additions & 28 deletions lib/absinthe/subscription/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,24 @@ defmodule Absinthe.Subscription.Supervisor do

pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
compress_registry? = Keyword.get(opts, :compress_registry?, true)
storage = Keyword.get(opts, :storage, Absinthe.Subscription.DefaultDocumentStorage)

document_storage =
Keyword.get(opts, :document_storage, Absinthe.Subscription.DefaultDocumentStorage)

storage_opts =
case document_storage do
Absinthe.Subscription.DefaultDocumentStorage ->
[
keys: :duplicate,
partitions: System.schedulers_online(),
compressed: compress_registry?
]

_ ->
Keyword.get(opts, :storage_opts, Keyword.new())
end

Supervisor.start_link(
__MODULE__,
{pubsub, pool_size, document_storage, storage_opts}
)
Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, storage})
end

def init({pubsub, pool_size, document_storage, storage_opts}) do
def init({pubsub, pool_size, compress_registry?, storage}) do
registry_name = Absinthe.Subscription.registry_name(pubsub)
meta = [pool_size: pool_size, document_storage: document_storage]

storage_opts =
Keyword.put(storage_opts, :name, Absinthe.Subscription.document_storage_name(pubsub))
meta = [pool_size: pool_size, storage: storage]

children = [
{Registry,
[
keys: :unique,
keys: :duplicate,
name: registry_name,
meta: meta
partitions: System.schedulers_online(),
meta: meta,
compressed: compress_registry?
]},
document_storage.child_spec(storage_opts),
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]}
]

Expand Down
Loading
Loading