Skip to content
This repository has been archived by the owner on Nov 29, 2022. It is now read-only.

Adds TLS support to the elixir client and server #26

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 54 additions & 41 deletions ex/loqui/lib/loqui/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ defmodule Loqui.Client do
@type t :: %__MODULE__{
port: pos_integer,
host: charlist,
tcp_opts: Loqui.Client.tcp_opts,
transport: Loqui.Client.transport,
transport_opts: Loqui.Client.transport_opts,
recv_timeout: pos_integer,
loqui_path: String.t,
buffer: binary,
Expand All @@ -116,7 +117,8 @@ defmodule Loqui.Client do
defstruct host: nil,
port: nil,
sock: nil,
tcp_opts: nil,
transport: nil,
transport_opts: nil,
connect_timeout: nil,
recv_timeout: nil,
loqui_path: nil,
Expand Down Expand Up @@ -166,18 +168,19 @@ defmodule Loqui.Client do

@type sequence :: 1..unquote(@max_sequence)

@type tcp_opt :: {:recv_timeout, pos_integer} | {:send_timeout, pos_integer}
@type tcp_opts :: [tcp_opt]
@type transport :: :gen_tcp | :ssl
@type transport_opt :: {:recv_timeout, pos_integer} | {:send_timeout, pos_integer} | :ssl.ssl_option()
@type transport_opts :: [transport_opt]

@type loqui_opt :: {:codecs, [Codec.t]} | {:compressors, [Compressor.t]}
@type loqui_opts :: [loqui_opt]

@type opts :: [
loqui_opts: loqui_opts,
tcp_opts: tcp_opts
transport: transport,
transport_opts: transport_opts
]


@doc """
Creates a connection to a Loqui server.
"""
Expand All @@ -188,8 +191,8 @@ defmodule Loqui.Client do

@doc false
def init({host, port, loqui_path, opts}) do
tcp_opts = opts
|> Keyword.get(:tcp_opts, [])
transport_opts = opts
|> Keyword.get(:transport_opts, [])
|> Keyword.put(:active, :false)
|> Keyword.put(:mode, :binary)
|> Keyword.put(:send_timeout_close, true)
Expand All @@ -205,12 +208,14 @@ defmodule Loqui.Client do
registered_codecs = Map.merge(@default_codecs, extract_option.(:codecs))
registered_compressors = Map.merge(@default_compressors, extract_option.(:compressors))

{connect_timeout, tcp_opts} = Keyword.pop(tcp_opts, :connect_timeout, @default_timeout)
{transport, _opts} = Keyword.pop(opts, :transport, :gen_tcp)
{connect_timeout, transport_opts} = Keyword.pop(transport_opts, :connect_timeout, @default_timeout)

state = %State{
host: to_host(host),
port: port,
tcp_opts: tcp_opts,
transport: transport,
transport_opts: transport_opts,
loqui_path: loqui_path,
connect_timeout: connect_timeout,
sequence: 1,
Expand Down Expand Up @@ -261,15 +266,15 @@ defmodule Loqui.Client do
end

@doc false
def connect(_info, %{sock: nil, host: host, port: port, tcp_opts: tcp_opts, connect_timeout: connect_timeout}=state) do
case :gen_tcp.connect(host, port, tcp_opts, connect_timeout) do
def connect(_info, %{sock: nil, host: host, port: port, transport: transport, transport_opts: transport_opts, connect_timeout: connect_timeout}=state) do
case transport.connect(host, port, transport_opts, connect_timeout) do
{:ok, sock} ->
# this is out of the with statement because we need to bind
# the socket to the state, and if the with statement doesn't
# complete, the state won't have the socket in it.
state = %{state | sock: sock}

with {:ok, _sock} <- update_socket_opts(sock),
with {:ok, _sock} <- update_socket_opts(sock, transport),
{:ok, state} <- do_upgrade(state),
state <- make_active_once(state),
{:ok, state} <- do_loqui_connect(state) do
Expand All @@ -287,20 +292,20 @@ defmodule Loqui.Client do
end
end

defp update_socket_opts(socket) do
{:ok, opts} = :inet.getopts(socket, [:sndbuf, :recbuf])
defp update_socket_opts(socket, transport) do
{:ok, opts} = getopts(transport, socket, [:sndbuf, :recbuf])
send_buffer_size = opts[:sndbuf]
recieve_buffer_size = opts[:recbuf]
buffer_size = max(send_buffer_size, recieve_buffer_size)
:inet.setopts(socket, [sndbuf: send_buffer_size,
setopts(transport, socket, [sndbuf: send_buffer_size,
recbuf: recieve_buffer_size,
buffer: buffer_size])
{:ok, socket}
end

@doc false
def disconnect(info, %State{sock: sock}=state) do
:ok = :gen_tcp.close(sock)
def disconnect(info, %State{sock: sock, transport: transport}=state) do
:ok = transport.close(sock)
case info do
{:close, from} ->
Connection.reply(from, :ok)
Expand All @@ -313,9 +318,9 @@ defmodule Loqui.Client do

# Connection callbacks

def handle_call(:ping, caller, %{sock: sock}=state) do
def handle_call(:ping, caller, %{sock: sock, transport: transport}=state) do
{next_seq, state} = State.next_sequence(state)
:gen_tcp.send(sock, Frames.ping(0, next_seq))
transport.send(sock, Frames.ping(0, next_seq))

{:noreply, State.add_waiter(state, next_seq, caller)}
end
Expand All @@ -324,9 +329,9 @@ defmodule Loqui.Client do
{:reply, {:error, :remote_went_away}, state}
end

def handle_call(:close, from, %State{sock: sock}=s) do
def handle_call(:close, from, %State{sock: sock, transport: transport}=s) do
go_away_packet = Frames.goaway(0, 0, "Closing")
:gen_tcp.send(sock, go_away_packet)
transport.send(sock, go_away_packet)

{:disconnect, {:close, from}, s}
end
Expand Down Expand Up @@ -357,7 +362,7 @@ defmodule Loqui.Client do
{:noreply, buffer_packet(state, push_frame)}
end

def handle_info({:tcp, socket, data}, %State{sock: socket}=state) do
def handle_info({type, socket, data}, %State{sock: socket}=state) when type in [:tcp, :ssl] do
make_active_once(state)
with {:ok, parsed_packets, leftover_data} <- Parser.parse(state.buffer, data) do
state = handle_packets(parsed_packets, state)
Expand All @@ -374,18 +379,18 @@ defmodule Loqui.Client do
{:noreply, state}
end

def handle_info(:flush_packets, %State{packet_buffer: packets, sock: socket}=state) do
:gen_tcp.send(socket, packets)
def handle_info(:flush_packets, %State{packet_buffer: packets, sock: socket, transport: transport}=state) do
transport.send(socket, packets)

{:noreply, %State{state | packet_buffer: []}}
end

def handle_info({:tcp_closed, _socket}, _state) do
{:stop, :tcp_closed, nil}
def handle_info({type, _socket}, _state) when type in [:tcp_closed, :ssl_closed] do
{:stop, type, nil}
end

def handle_info({:tcp_error, _, _}, _state) do
{:stop, :tcp_closed, nil}
def handle_info({type, _, _}, _state) when type in [:tcp_error, :ssl_error] do
{:stop, type, nil}
end

def handle_info({:close_go_away, go_away_code, go_away_data}, %State{sequence: :go_away, waiters: waiters}=state) do
Expand All @@ -398,10 +403,10 @@ defmodule Loqui.Client do
{:disconnect, err, %State{state | waiters: %{}}}
end

def handle_info(:send_ping, %State{sock: sock, last_ping: nil}=state) do
def handle_info(:send_ping, %State{sock: sock, last_ping: nil, transport: transport}=state) do
{next_seq, state} = State.next_sequence(state)

:gen_tcp.send(sock, Frames.ping(0, next_seq))
transport.send(sock, Frames.ping(0, next_seq))
new_state = state
|> schedule_ping
|> Map.put(:last_ping, {next_seq, :erlang.system_time(:milli_seconds)})
Expand Down Expand Up @@ -430,8 +435,8 @@ defmodule Loqui.Client do
handle_packets(rest, state)
end

defp handle_packet({:ping, _flags, seq}, %State{sock: sock}=state) do
:gen_tcp.send(sock, Frames.pong(0, seq))
defp handle_packet({:ping, _flags, seq}, %State{sock: sock, transport: transport}=state) do
transport.send(sock, Frames.pong(0, seq))
state
end

Expand Down Expand Up @@ -494,15 +499,23 @@ defmodule Loqui.Client do
state
end

defp make_active_once(%State{sock: sock}=state) do
:ok = :inet.setopts(sock, [active: :once])
defp getopts(:ssl, socket, opts), do: :ssl.getopts(socket, opts)

defp getopts(:gen_tcp, socket, opts), do: :inet.getopts(socket, opts)

defp setopts(:ssl, socket, opts), do: :ssl.setopts(socket, opts)

defp setopts(:gen_tcp, socket, opts), do: :inet.setopts(socket, opts)

defp make_active_once(%State{sock: sock, transport: transport}=state) do
:ok = setopts(transport, sock, [active: :once])
state
end

defp do_upgrade(%State{sock: sock, tcp_opts: tcp_opts}=state) do
recv_timeout = Keyword.get(tcp_opts, :recv_timeout, @default_timeout)
with :ok <- :gen_tcp.send(sock, upgrade_request(state)),
{:ok, data} <- :gen_tcp.recv(sock, 0, recv_timeout),
defp do_upgrade(%State{sock: sock, transport_opts: transport_opts, transport: transport}=state) do
recv_timeout = Keyword.get(transport_opts, :recv_timeout, @default_timeout)
with :ok <- transport.send(sock, upgrade_request(state)),
{:ok, data} <- transport.recv(sock, 0, recv_timeout),
:upgraded <- parse_upgrade_response(data) do

{:ok, state}
Expand Down Expand Up @@ -531,7 +544,7 @@ defmodule Loqui.Client do
{:error, {:upgrade_failed, invalid_response}}
end

defp do_loqui_connect(%State{sock: sock, registered_codecs: codecs, registered_compressors: compressors}=state) do
defp do_loqui_connect(%State{sock: sock, registered_codecs: codecs, registered_compressors: compressors, transport: transport}=state) do
extract_names = fn(m) ->
m
|> Map.keys
Expand All @@ -540,7 +553,7 @@ defmodule Loqui.Client do

codecs = extract_names.(codecs)
compressors = extract_names.(compressors)
:gen_tcp.send(sock, Frames.hello(0, "#{codecs}|#{compressors}"))
transport.send(sock, Frames.hello(0, "#{codecs}|#{compressors}"))

{:ok, state}
end
Expand Down
8 changes: 4 additions & 4 deletions ex/loqui/lib/loqui/ranch_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ defmodule Loqui.RanchProtocol do
|> handle_response(seq, response, [])
|> handler_loop(so_far)

{:tcp, ^socket_pid, data} ->
{type, ^socket_pid, data} when type in [:tcp, :ssl] ->
handle_socket_data(state, <<so_far::binary, data::binary>>)

{:tcp_closed, ^socket_pid} ->
{type, ^socket_pid} when type in [:tcp_closed, :ssl_closed] ->
Logger.info "[loqui] tcp_closed. socket_pid=#{inspect socket_pid}"
close(state, :tcp_closed)
close(state, type)

{:tcp_error, ^socket_pid, reason} ->
{type, ^socket_pid, reason} when type in [:tcp_error, :ssl_error] ->
goaway(state, reason)

{:DOWN, ref, :process, _pid, reason} ->
Expand Down
6 changes: 3 additions & 3 deletions ex/loqui/lib/loqui/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Loqui.Server do

def loop(%{socket: sock} = state) do
receive do
{:tcp, ^sock, data} ->
{type, ^sock, data} when type in [:tcp, :ssl] ->
case handle_tcp_data(data, state) do
:ok ->
exit(:normal)
Expand All @@ -56,11 +56,11 @@ defmodule Loqui.Server do
exit(reason)
end

{:tcp_error, ^sock, reason} ->
{type, ^sock, reason} when type in [:tcp_error, :ssl_error] ->
Logger.warn("TCP error #{inspect reason} from client #{inspect ip_address(sock)}. Closing")
exit(reason)

{:tcp_closed, ^sock} ->
{type, ^sock} when type in [:tcp_closed, :ssl_closed] ->
Logger.info("Client #{inspect ip_address(sock)} closed.")
exit(:normal)
end
Expand Down