Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# This Fork

This is a fork of the Ethereumex project which implements a websocket transport. It is still a work in progress and requires some polish and tests to productionalize. Use at your own risk.

This work was sponsored by [Hawku](https://www.hawku.com/).

# Ethereumex

[![CircleCI](https://circleci.com/gh/mana-ethereum/ethereumex.svg?style=svg)](https://circleci.com/gh/exthereum/ethereumex)
Expand Down
2 changes: 2 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use Mix.Config

config :ethereumex, url: System.get_env("ETHEREUM_URL")

config :ethereumex, client_type: :ws
32 changes: 31 additions & 1 deletion lib/ethereumex/config.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Ethereumex.Config do
@moduledoc false
alias Ethereumex.IpcServer
alias Ethereumex.{IpcServer, WsServer}

def setup_children(), do: setup_children(client_type())

Expand All @@ -13,6 +13,15 @@ defmodule Ethereumex.Config do
]
end

# TODO: do we want/need to use poolboy here? it could make unsubscibing more complicated
def setup_children(:ws) do
[
:poolboy.child_spec(:worker, ws_poolboy_config(),
url: rpc_url()
)
]
end

def setup_children(:http) do
pool_opts = http_pool_options()

Expand Down Expand Up @@ -88,4 +97,25 @@ defmodule Ethereumex.Config do
defp http_pool_options() do
Application.get_env(:ethereumex, :http_pool_options, %{})
end

@spec ws_poolboy_config() :: keyword()
defp ws_poolboy_config() do
[
{:name, {:local, :ws_worker}},
{:worker_module, WsServer},
{:size, ws_worker_size()},
{:max_overflow, ws_max_worker_overflow()}
]
end

@spec ws_worker_size() :: integer()
defp ws_worker_size() do
Application.get_env(:ethereumex, :ws_worker_size, 5)
end

@spec ipc_max_worker_overflow() :: integer()
defp ws_max_worker_overflow() do
Application.get_env(:ethereumex, :ws_max_worker_overflow, 2)
end

end
46 changes: 46 additions & 0 deletions lib/ethereumex/ws_client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Ethereumex.WsClient do
@moduledoc false

require Logger

use Ethereumex.Client.BaseClient
alias Ethereumex.WsServer

@timeout 60_000
@spec post_request(binary(), []) :: {:ok | :error, any()}
def post_request(payload, opts) do
sub_pid = Keyword.get(opts, :subscriber)
with {:ok, response} <- call_ws(payload, sub_pid),
{:ok, decoded_body} <- Jason.decode(response) do
case decoded_body do
%{"error" => error} -> {:error, error}
result = [%{} | _] -> {:ok, format_batch(result)}
result -> {:ok, Map.get(result, "result")}
end
else
{:error, %Jason.DecodeError{data: ""}} -> {:error, :empty_response}
{:error, %Jason.DecodeError{} = error} -> {:error, {:invalid_json, error}}
{:error, error} -> {:error, error}
end
end

# defp call_ws(payload) do
# :poolboy.transaction(:ws_worker, fn pid -> WsServer.post(pid, payload) end, @timeout)
# end

defp call_ws(payload, sub_pid) do
# TODO: do we need to pool this? might cause issues with unsubscribe if not sticky
:poolboy.transaction(:ws_worker, fn pid -> WsServer.post(pid, payload, sub_pid) end, @timeout)
end

def eth_subscribe(params, sub_pid, opts \\ []) when is_list(params) do
# TODO: change params?
opts = Keyword.put(opts, :subscriber, sub_pid)
request("eth_subscribe", params, opts)
end

def eth_unsubscribe(sub_id, opts \\ []) do
params = [sub_id]
request("eth_unsubscribe", params, opts)
end
end
193 changes: 193 additions & 0 deletions lib/ethereumex/ws_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
defmodule Ethereumex.WsServer do
require Logger
@moduledoc false

use GenServer

@timeout 60_000

defmodule Socket do
use WebSockex

# TODO: handle disconnects - reconnect to socket and notify parent so subscriptions can be re-activated

def start_link(url, parent) do
Logger.debug("websocket server starting with url: #{url}")
WebSockex.start_link(url, __MODULE__, parent, handle_initial_conn_failure: true)
end

def send_request(socket, request) do
WebSockex.send_frame(socket, {:text, request})
end

def handle_frame({:text, msg}, parent) do
Logger.debug("received message from socket: #{msg}")
send(parent, {:response, msg})
{:ok, parent}
end

def handle_connect(_conn, parent) do
send(parent, :socket_connected)
{:ok, parent}
end

def handle_disconnect(%{reason: {_, :normal}}, state) do
Logger.debug("socket closing normally")
{:ok, state}
end
def handle_disconnect(%{attempt_number: attempts}, state) do
# attempt to reconnect
if attempts > 3 do
# still failing after 3 attempts. backoff a little
# TODO: propertize these params
:timer.sleep(max(100 * attempts, 1_000))
end
Logger.debug("socket disconnected. attempting to reconnect")
{:reconnect, state}
end
end

defmodule State do
defstruct socket: nil,
replies: %{},
subs: %{},
pending_subs: %{},
sub_params: %{}

def new(socket), do: %__MODULE__{socket: socket}

def pop_reply(state = %__MODULE__{replies: replies}, id) do
{addr, replies} = Map.pop(replies, id)
{addr, %{state | replies: replies}}
end

def push_reply(state = %__MODULE__{replies: replies}, id, recipient) do
replies = Map.put(replies, id, recipient)
%{state | replies: replies}
end

def add_pending_sub(state = %__MODULE__{}, _req_id, nil, _params), do: state
def add_pending_sub(state = %__MODULE__{pending_subs: pending_subs, sub_params: sub_params}, req_id, sub_pid, params) do
%{state |
pending_subs: Map.put(pending_subs, req_id, sub_pid),
sub_params: Map.put(sub_params, req_id, params)
}
end

def convert_pending_to_sub(state = %__MODULE__{pending_subs: p_subs, subs: subs, sub_params: sub_params}, req_id, sub_id) when is_binary(sub_id) do
case Map.get(p_subs, req_id) do
nil -> state
sub_pid ->
params = Map.get(sub_params, req_id)
sub_params = Map.delete(sub_params, req_id)
%{state |
subs: Map.put(subs, sub_id, sub_pid),
pending_subs: Map.delete(p_subs, req_id),
sub_params: Map.put(sub_params, sub_id, params)
}
end
end
def convert_pending_to_sub(state = %__MODULE__{}, _req_id, _sub_id), do: state

def get_sub(%__MODULE__{subs: subs}, sub_id), do: Map.get(subs, sub_id)

# TODO: when the connection goes down, delete the process
# TODO: store subscription ids by pid and remove them on disconnect?
def remove_sub(state = %__MODULE__{subs: subs, sub_params: sub_params}, sub_id) do
%{state | subs: Map.delete(subs, sub_id), sub_params: Map.delete(sub_params, sub_id)}
end
end

def start_link(opts) do
url = Keyword.get(opts, :url)
GenServer.start_link(__MODULE__, [url])
end

def init([url]) do
{:ok, socket} = Socket.start_link(url, self())
{:ok, State.new(socket)}
end

def post(pid, request, sub_pid) do
Logger.debug("sending request #{inspect(request)}")
GenServer.call(pid, {:request, request, sub_pid}, @timeout)
end

def handle_call({:request, request, sub_pid}, from, state) do
decoded = %{"id" => id} = Jason.decode!(request)
:ok = Socket.send_request(state.socket, request)
state = State.push_reply(state, id, from)

state = case decoded do
%{"method" => "eth_subscribe", "params" => params} ->
# TODO: cleanup pending subs if they don't get processed in some amount of time to avoid potential memory leak
# TODO: monitor the sub_pid and remove it's subscriptions if the process stops
State.add_pending_sub(state, id, sub_pid, params)
%{"method" => "eth_unsubscribe", "params" => [sub_id]} ->
State.remove_sub(state, sub_id)
_ -> state
end

Process.send_after(self(), {:timeout_reply, id}, @timeout)

{:noreply, state}
end

def handle_info({:response, json_response}, state) do
# TODO: error handling - if we let this crash we'll potentially lose a lot of responses...
{:ok, response} = Jason.decode(json_response)
case response do
%{"method" => "eth_subscription"} ->
# relay subscription message to subscribing process
sub_id = State.get_sub(state, get_in(response, ["params", "subscription"]))
Logger.debug("relaying subscription message to subscriber: #{inspect(sub_id)} #{json_response}")
send(sub_id, {:eth_subscription, response})
{:noreply, state}
%{"id" => id, "result" => maybe_sub_id} ->
# just a standard response or maybe an eth_subscribe response
{addr, state} = State.pop_reply(state, id)
if addr != nil do
GenServer.reply(addr, {:ok, json_response})
else
# resub
addr = Map.get(state.pending_subs, id)
if addr != nil do
# notify the subscriber about the resubscribe
send(addr, {:resubscribe, response})
end
end
{:noreply, State.convert_pending_to_sub(state, id, maybe_sub_id)}
end
end

# if somehow reply addresses are accumulating, clear them out
def handle_info({:timeout_reply, id}, state) do
{addr, state} = State.pop_reply(state, id)

if addr != nil do
Logger.warn("timeout exceeded. giving up on reply for #{id} to #{inspect(addr)}")
end

{:noreply, state}
end

def handle_info(:socket_connected, state = %State{}) do
# resubscribe all the existing subscriptions
state = Enum.reduce(state.subs, state, fn {sub_id, sub_pid}, state ->
params = Map.get(state.sub_params, sub_id)
# TODO: finish implementing this

req_id = "resub-#{sub_id}"
request = %{"id" => req_id, "method" => "eth_subscribe", "params" => params}
|> Jason.encode!()

Socket.send_request(state.socket, request)

state
|> State.remove_sub(sub_id)
|> State.add_pending_sub(req_id, sub_pid, params)
end)

{:noreply, state}
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ defmodule Ethereumex.Mixfile do
{:dialyxir, "~> 1.0", only: [:dev, :test], runtime: false},
{:poolboy, "~> 1.5"},
{:telemetry, "~> 0.4 or ~> 1.0"},
{:with_env, "~> 0.1", only: :test}
{:with_env, "~> 0.1", only: :test},
{:websockex, "~> 0.4"}
]
end

Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
"nimble_pool": {:hex, :nimble_pool, "0.2.4", "1db8e9f8a53d967d595e0b32a17030cdb6c0dc4a451b8ac787bf601d3f7704c3", [:mix], [], "hexpm", "367e8071e137b787764e6a9992ccb57b276dc2282535f767a07d881951ebeac6"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"},
"websockex": {:hex, :websockex, "0.4.3", "92b7905769c79c6480c02daacaca2ddd49de936d912976a4d3c923723b647bf0", [:mix], [], "hexpm", "95f2e7072b85a3a4cc385602d42115b73ce0b74a9121d0d6dbbf557645ac53e4"},
"with_env": {:hex, :with_env, "0.1.0", "4c4d4bdf54733917945fa0c521b7a7bbad4f4c65ce75b346a566695898a4f59a", [:mix], [], "hexpm", "2345cf474a963184edb23f626b4f498472c7d9fe5aa2c71473981dc054bdef95"},
}