Skip to content

Commit

Permalink
fix: process hierarchy (#37)
Browse files Browse the repository at this point in the history
* fix: use temporary instead of transient restart for Kadabra.Supervisor

* feat: removed StreamSupervisor

Part of a supervision restructuring. Connection will basically monitor
everything except ConnectionQueue.

* feat: Hpack and Socket brought under Connection

* feat: all new process hierarchy

Removed GenStage completely. ConnectionQueue is replaced with
ConnectionPool, which will (eventually) maintain clones of Connection
for better parallelism.

* fix: pass tests for elixir 1.4.5

* fix: handle :connection_error shutdowns

* refactor: minor changes

Also: added elixir 1.7.0 to travis

* refactor: module dependency restructuring

- Encodable implementations moved into Encodable
- Frame.Flags broken back out into each frame
- Connection.Egress for sending outbound frames

* test: removed some useless tests

* fix: removed unnecessary Process.monitor of streams by connection

Also: GOAWAY logging disabled by default, can be re-enabled with
`:debug_log?` config option.
  • Loading branch information
hpopp authored Jul 28, 2018
1 parent 2122dc2 commit 9b2b89d
Show file tree
Hide file tree
Showing 38 changed files with 575 additions and 760 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,4 @@ erl_crash.dump
# Also ignore archive artifacts (built via "mix archive.build").
*.ez

mix.lock
.iex.exs
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ matrix:
otp_release: 20.3.1
- elixir: 1.6.6
otp_release: 20.1
- elixir: 1.7.0
otp_release: 21.0.3
script:
- "MIX_ENV=test mix do deps.get, compile, coveralls.travis"
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v0.4.3
- Fixed supervisor crash report during normal connection shutdown
- Removed `GenStage` dependency
- GOAWAY error logger messages now disabled by default.
Re-enable with `config :kadabra, debug_log?: true`

## v0.4.2
- Fixed `{:closed, pid}` task race condition during connection cleanup
- Everything is supervised under `Kadabra.Application` again, instead of
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Add kadabra to your `mix.exs`:
```elixir
def deps do
[
{:kadabra, "~> 0.4.0"}
{:kadabra, "~> 0.4.3"}
]
end
```
Expand Down
30 changes: 1 addition & 29 deletions lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,12 @@ defmodule Kadabra.Application do
use Application
import Supervisor.Spec

alias Kadabra.Connection

@app :kadabra

def start(_type, _args) do
children = [
supervisor(Registry, [:unique, Registry.Kadabra]),
supervisor(Task.Supervisor, [[name: Kadabra.Tasks]])
]

Supervisor.start_link(children, strategy: :one_for_one, name: @app)
end

def start_connection(uri, pid, opts) do
Supervisor.start_child(
@app,
supervisor(Kadabra.Supervisor, [uri, pid, opts], spec_opts())
)
end

defp spec_opts do
ref = :erlang.make_ref()
[id: ref, restart: :transient]
end

def ping(pid) do
pid
|> Connection.via_tuple()
|> Connection.ping()
end

def close(pid) do
pid
|> Connection.via_tuple()
|> Connection.close()
Supervisor.start_link(children, strategy: :one_for_one, name: :kadabra)
end
end
10 changes: 6 additions & 4 deletions lib/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ defmodule Kadabra.Config do
@moduledoc false

defstruct client: nil,
supervisor: nil,
ref: nil,
queue: nil,
encoder: nil,
decoder: nil,
uri: nil,
socket: nil,
queue: nil,
opts: []

@type t :: %__MODULE__{
client: pid,
supervisor: pid,
ref: term,
queue: pid,
encoder: pid,
decoder: pid,
uri: URI.t(),
socket: pid,
opts: Keyword.t()
Expand Down
116 changes: 47 additions & 69 deletions lib/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,19 @@ defmodule Kadabra.Connection do
local_settings: nil,
queue: nil

use GenStage
use GenServer
require Logger

import Kernel, except: [send: 2]

alias Kadabra.{
Config,
Connection,
Encodable,
Error,
Frame,
Socket,
StreamSupervisor
Hpack,
Socket
}

alias Kadabra.Frame.{Goaway, Ping}

alias Kadabra.Connection.{FlowControl, Processor}
alias Kadabra.Connection.{Egress, FlowControl, Processor}

@type t :: %__MODULE__{
buffer: binary,
Expand All @@ -38,41 +33,46 @@ defmodule Kadabra.Connection do

@type sock :: {:sslsocket, any, pid | {any, any}}

def start_link(%Config{supervisor: sup} = config) do
name = via_tuple(sup)
GenStage.start_link(__MODULE__, config, name: name)
def start_link(%Config{} = config) do
GenServer.start_link(__MODULE__, config)
end

def via_tuple(ref) do
{:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}}
end
def init(%Config{} = config) do
{:ok, encoder} = Hpack.start_link()
{:ok, decoder} = Hpack.start_link()
{:ok, socket} = Socket.start_link(config.uri, config.opts)

config =
config
|> Map.put(:encoder, encoder)
|> Map.put(:decoder, decoder)
|> Map.put(:socket, socket)

def init(%Config{queue: queue} = config) do
state = initial_state(config)

Kernel.send(self(), :start)
Process.flag(:trap_exit, true)

{:consumer, state, subscribe_to: [queue]}
{:ok, state}
end

defp initial_state(%Config{opts: opts, queue: queue} = config) do
settings = Keyword.get(opts, :settings, Connection.Settings.fastest())
socket = config.supervisor |> Socket.via_tuple()

%__MODULE__{
config: %{config | socket: socket},
config: config,
queue: queue,
local_settings: settings,
flow_control: %FlowControl{}
}
end

def close(pid) do
GenStage.call(pid, :close)
GenServer.call(pid, :close)
end

def ping(pid) do
GenStage.cast(pid, {:send, :ping})
GenServer.cast(pid, {:send, :ping})
end

# handle_cast
Expand All @@ -81,17 +81,13 @@ defmodule Kadabra.Connection do
sendf(type, state)
end

def handle_cast(_msg, state) do
{:noreply, [], state}
end

def handle_events(events, _from, state) do
def handle_cast({:request, events}, state) do
state = do_send_headers(events, state)
{:noreply, [], state}
{:noreply, state}
end

def handle_subscribe(:producer, _opts, from, state) do
{:manual, %{state | queue: from}}
def handle_cast(_msg, state) do
{:noreply, state}
end

# handle_call
Expand All @@ -102,25 +98,21 @@ defmodule Kadabra.Connection do
config: config
} = state

StreamSupervisor.stop(state.config.ref)

bin = flow.stream_set.stream_id |> Goaway.new() |> Encodable.to_bin()
Socket.send(config.socket, bin)
Egress.send_goaway(config.socket, flow.stream_set.stream_id)

{:stop, :shutdown, :ok, state}
end

# sendf

@spec sendf(:goaway | :ping, t) :: {:noreply, [], t}
@spec sendf(:goaway | :ping, t) :: {:noreply, t}
def sendf(:ping, %Connection{config: config} = state) do
bin = Ping.new() |> Encodable.to_bin()
Socket.send(config.socket, bin)
{:noreply, [], state}
Egress.send_ping(config.socket)
{:noreply, state}
end

def sendf(_else, state) do
{:noreply, [], state}
{:noreply, state}
end

defp do_send_headers(request, %{flow_control: flow} = state) do
Expand All @@ -132,66 +124,52 @@ defmodule Kadabra.Connection do
%{state | flow_control: flow}
end

def handle_info(:start, %{config: config} = state) do
config.supervisor
|> Socket.via_tuple()
|> Socket.set_active()

bin =
%Frame.Settings{settings: state.local_settings}
|> Encodable.to_bin()
def handle_info(:start, %{config: %{socket: socket}} = state) do
Socket.set_active(socket)
Egress.send_local_settings(socket, state.local_settings)

config.supervisor
|> Socket.via_tuple()
|> Socket.send(bin)

{:noreply, [], state}
{:noreply, state}
end

def handle_info({:closed, _pid}, state) do
{:stop, :shutdown, state}
end

def handle_info({:DOWN, _, _, _pid, {:shutdown, {:finished, sid}}}, state) do
GenStage.ask(state.queue, 1)
def handle_info({:EXIT, _pid, {:shutdown, {:finished, sid}}}, state) do
GenServer.cast(state.queue, {:ask, 1})

flow =
state.flow_control
|> FlowControl.finish_stream(sid)
|> FlowControl.process(state.config)

{:noreply, [], %{state | flow_control: flow}}
{:noreply, %{state | flow_control: flow}}
end

def handle_info({:push_promise, stream}, %{config: config} = state) do
Kernel.send(config.client, {:push_promise, stream})
{:noreply, [], state}
{:noreply, state}
end

def handle_info({:recv, frame}, state) do
case Processor.process(frame, state) do
{:ok, state} ->
{:noreply, [], state}
{:noreply, state}

{:connection_error, error, reason, state} ->
handle_connection_error(state, error, reason)
Egress.send_goaway(
state.config.socket,
state.flow_control.stream_set.stream_id,
error,
reason
)

{:stop, {:shutdown, :connection_error}, state}
end
end

defp handle_connection_error(%{config: config} = state, error, reason) do
code = <<Error.code(error)::32>>

bin =
state.flow_control.stream_set.stream_id
|> Goaway.new(code, reason)
|> Encodable.to_bin()

Socket.send(config.socket, bin)
end

def terminate(_reason, %{config: config}) do
Kernel.send(config.client, {:closed, config.supervisor})
Kernel.send(config.client, {:closed, config.queue})
:ok
end
end
59 changes: 59 additions & 0 deletions lib/connection/egress.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule Kadabra.Connection.Egress do
@moduledoc false

alias Kadabra.{Encodable, Error, Frame, Socket}

alias Kadabra.Frame.{
Goaway,
Ping,
WindowUpdate
}

def send_goaway(socket, stream_id) do
bin = stream_id |> Goaway.new() |> Encodable.to_bin()
Socket.send(socket, bin)
end

def send_goaway(socket, stream_id, error, reason) do
code = <<Error.code(error)::32>>

bin =
stream_id
|> Goaway.new(code, reason)
|> Encodable.to_bin()

Socket.send(socket, bin)
end

def send_ping(socket) do
bin = Ping.new() |> Encodable.to_bin()
Socket.send(socket, bin)
end

def send_local_settings(socket, settings) do
bin =
%Frame.Settings{settings: settings}
|> Encodable.to_bin()

Socket.send(socket, bin)
end

@spec send_window_update(pid, non_neg_integer, integer) :: no_return
def send_window_update(socket, stream_id, bytes)
when bytes > 0 and bytes < 2_147_483_647 do
bin =
stream_id
|> WindowUpdate.new(bytes)
|> Encodable.to_bin()

# Logger.info("Sending WINDOW_UPDATE on Stream #{stream_id} (#{bytes})")
Socket.send(socket, bin)
end

def send_window_update(_socket, _stream_id, _bytes), do: :ok

def send_settings_ack(socket) do
bin = Frame.Settings.ack() |> Encodable.to_bin()
Socket.send(socket, bin)
end
end
Loading

0 comments on commit 9b2b89d

Please sign in to comment.