Skip to content

Commit

Permalink
add implementation of embedded client
Browse files Browse the repository at this point in the history
which talks to electric over the internal api, not http
  • Loading branch information
magnetised committed Feb 6, 2025
1 parent 8955a58 commit 78ce6d0
Show file tree
Hide file tree
Showing 17 changed files with 494 additions and 101 deletions.
49 changes: 1 addition & 48 deletions .github/workflows/elixir_client_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
POSTGRES_VERSION: 140006
ELECTRIC_PORT: 3100
PG_PORT: 54323
ELECTRIC_URL: "http://127.0.0.1:3100"
ELECTRIC_URL: "http://127.0.0.1:3333"
DATABASE_URL: "postgresql://postgres:[email protected]:54323/postgres?sslmode=disable"
services:
postgres:
Expand Down Expand Up @@ -67,59 +67,12 @@ jobs:
${{ runner.os }}-elixir-client-build-[${{ github.ref_name }}]-
${{ runner.os }}-elixir-client-build-
- name: Cache sync-service dependencies
uses: actions/cache@v4
with:
path: packages/sync-service/deps
key: "${{ runner.os }}-sync-service-deps-${{ env.MIX_ENV }}-${{ hashFiles('packages/sync-service/mix.lock') }}"
restore-keys: |
${{ runner.os }}-sync-service-deps-${{ env.MIX_ENV }}-${{ hashFiles('packages/sync-service/mix.lock') }}
${{ runner.os }}-sync-service-deps-${{ env.MIX_ENV }}-
${{ runner.os }}-sync-service-deps-
- name: Cache sync-service compiled code
uses: actions/cache@v4
with:
path: |
packages/sync-service/_build/*/lib
!packages/sync-service/_build/*/lib/electric
key: "${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}-[${{ github.ref_name }}]-${{ github.sha }}"
restore-keys: |
${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}-[${{ github.ref_name }}]-${{ github.sha }}
${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}-[${{ github.ref_name }}]-
${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}-
${{ runner.os }}-sync-service-build-
- name: Install dependencies
run: mix deps.get && mix deps.compile

- name: Compiles without warnings
run: mix compile --force --all-warnings --warnings-as-errors

- name: Install sync-service dependencies
run: mix deps.get && mix deps.compile
working-directory: packages/sync-service
env:
MIX_ENV: prod

- uses: JarvusInnovations/background-action@v1
name: Bootstrap System Under Test (SUT)
with:
run: |
mix start_dev
env mix run --no-halt &
wait-on: "http://127.0.0.1:3100"

tail: true
log-output-resume: stderr
wait-for: 1m
log-output: true
log-output-if: failure
working-directory: packages/sync-service
env:
MIX_ENV: prod

- name: Run tests
run: mix test

Expand Down
2 changes: 2 additions & 0 deletions packages/elixir-client/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ elixir_client-*.tar

# Temporary files, for example, from tests.
/tmp/

/persistent
25 changes: 22 additions & 3 deletions packages/elixir-client/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,34 @@ import Config
config :logger, level: :warning

if config_env() == :test do
default_database_url = "postgresql://postgres:password@localhost:54321/electric"
port = 3333
default_database_url = "postgresql://postgres:password@localhost:54321/electric?sslmode=disable"
database_url = System.get_env("DATABASE_URL", default_database_url)

default_electric_url = "http://localhost:3000"
connection_opts = Electric.Config.parse_postgresql_uri!(database_url)

default_electric_url = "http://localhost:#{port}"
electric_url = System.get_env("ELECTRIC_URL", default_electric_url)

config :electric_client,
database_config: PostgresqlUri.parse(database_url),
database_config: connection_opts,
electric_url: electric_url

config :electric_client, Support.Repo, url: database_url

config :sentry,
environment_name: config_env(),
client: Electric.Telemetry.SentryReqHTTPClient

config :electric,
connection_opts: Electric.Utils.obfuscate_password(connection_opts),
# enable the http api so that the client tests against a real endpoint can
# run against our embedded electric instance.
enable_http_api: true,
service_port: port,
allow_shape_deletion?: false,
# use a non-default replication stream id so we can run the client
# tests at the same time as an active electric instance
replication_stream_id: "client_tests",
storage_dir: Path.join(System.tmp_dir!(), "electric/client-tests#{System.monotonic_time()}")
end
45 changes: 35 additions & 10 deletions packages/elixir-client/lib/electric/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,31 @@ defmodule Electric.Client do
end
end

if Code.ensure_loaded?(Electric.Application) do
@doc """
Get a client instance that runs against an embedded instance of electric,
that is an electric app running as a dependency of the current application.
"""
@spec embedded(Electric.Shapes.Api.options()) :: {:ok, t()} | {:error, term()}
def embedded(opts \\ []) do
api =
Electric.Application.api(Keyword.merge(opts, encoder: Electric.Shapes.Api.Encoder.Term))

Client.new(
base_url: "elixir://Electric.Client.Embedded",
fetch: {Electric.Client.Embedded, api: api}
)
end

@spec embedded!(Electric.Shapes.Api.options()) :: t() | no_return()
def embedded!(opts \\ []) do
case embedded(opts) do
{:ok, client} -> client
{:error, reason} -> raise ArgumentError, message: reason
end
end
end

@doc """
A shortcut to [`ShapeDefinition.new/2`](`Electric.Client.ShapeDefinition.new/2`).
Expand All @@ -288,6 +313,16 @@ defmodule Electric.Client do
end

if Code.ensure_loaded?(Ecto) do
defp validate_queryable!(queryable) when is_atom(queryable) do
Code.ensure_loaded!(queryable)

if function_exported?(queryable, :__schema__, 1) do
queryable
else
raise ArgumentError, message: "Expected Ecto struct or query, got #{inspect(queryable)}"
end
end

@doc """
Create a [`ShapeDefinition`](`Electric.Client.ShapeDefinition`) from an `Ecto` query.
Expand Down Expand Up @@ -410,14 +445,4 @@ defmodule Electric.Client do
request = request(client, method: :delete, shape: shape)
Electric.Client.Fetch.request(client, request, [])
end

defp validate_queryable!(queryable) when is_atom(queryable) do
Code.ensure_loaded!(queryable)

if function_exported?(queryable, :__schema__, 1) do
queryable
else
raise ArgumentError, message: "Expected Ecto struct or query, got #{inspect(queryable)}"
end
end
end
66 changes: 66 additions & 0 deletions packages/elixir-client/lib/electric/client/embedded.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
if Code.ensure_loaded?(Electric.Shapes.Api) do
defmodule Electric.Client.Embedded do
alias Electric.Client.Fetch
alias Electric.Client.ShapeDefinition
alias Electric.Shapes.Api

@behaviour Electric.Client.Fetch

@impl Electric.Client.Fetch
def fetch(%Fetch.Request{method: :delete} = request, opts) do
{:ok, api} = Keyword.fetch(opts, :api)

timestamp = DateTime.utc_now()

with {:ok, request} <- Api.validate_for_delete(api, request_to_params(request)),
%Api.Response{} = response <- Api.delete_shape(request) do
{:ok, translate_response(response, timestamp)}
end
end

def fetch(%Fetch.Request{method: :get} = request, opts) do
{:ok, api} = Keyword.fetch(opts, :api)

timestamp = DateTime.utc_now()

with {:ok, request} <- Api.validate(api, request_to_params(request)),
%Api.Response{} = response = Api.serve_shape_log(request) do
{:ok, translate_response(response, timestamp)}
end
end

defp translate_response(%Api.Response{} = response, timestamp) do
%Fetch.Response{
status: response.status,
last_offset: convert_offset(response.offset),
shape_handle: response.handle,
schema: Api.schema(response),
next_cursor: nil,
request_timestamp: timestamp,
body: response.body
}
end

defp request_to_params(%Fetch.Request{shape: %ShapeDefinition{} = shape} = request) do
%{
table: Electric.Utils.relation_to_sql({shape.namespace || "public", shape.table}),
offset: to_string(request.offset),
handle: request.shape_handle,
live: request.live,
where: shape.where,
columns: shape.columns,
replica: request.replica
}
end

defp convert_offset(nil) do
nil
end

# TODO: when we remove offset parsing from the elixir client, then it makes
# sense to use to_string on the server offset
defp convert_offset(%Electric.Replication.LogOffset{} = server_offset) do
Electric.Client.Offset.from_string!(to_string(server_offset))
end
end
end
10 changes: 9 additions & 1 deletion packages/elixir-client/lib/electric/client/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ defmodule Electric.Client.Stream do
|> after_fetch()
end

defp ensure_enum(body) do
case Enumerable.impl_for(body) do
nil -> List.wrap(body)
Enumerable.Map -> List.wrap(body)
_impl -> body
end
end

defp handle_response(%Fetch.Response{status: status} = resp, stream)
when status in 200..299 do
shape_handle = shape_handle!(resp)
Expand All @@ -166,7 +174,7 @@ defmodule Electric.Client.Stream do
|> Map.put(:offset, final_offset)

resp.body
|> List.wrap()
|> ensure_enum()
|> Enum.flat_map(&Message.parse(&1, shape_handle, final_offset, value_mapper_fun))
|> Enum.map(&Map.put(&1, :request_timestamp, resp.request_timestamp))
|> Enum.reduce_while(stream, &handle_msg/2)
Expand Down
3 changes: 2 additions & 1 deletion packages/elixir-client/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ defmodule Electric.Client.MixProject do
{:bypass, "~> 2.1", only: [:test]},
{:postgrex, "~> 0.19", only: [:test]},
{:postgresql_uri, "~> 0.1", only: [:test]},
{:uuid, "~> 1.1", only: [:test]}
{:uuid, "~> 1.1", only: [:test]},
{:electric, path: "../sync-service", only: [:test]}
]
end

Expand Down
Loading

0 comments on commit 78ce6d0

Please sign in to comment.