diff --git a/.github/workflows/elixir_client_tests.yml b/.github/workflows/elixir_client_tests.yml index a5dbeb9d78..626065474b 100644 --- a/.github/workflows/elixir_client_tests.yml +++ b/.github/workflows/elixir_client_tests.yml @@ -20,7 +20,7 @@ jobs: POSTGRES_VERSION: 140006 ELECTRIC_PORT: 3100 PG_PORT: 54323 - ELECTRIC_URL: "http://127.0.0.1:3100" + ELECTRIC_URL: "http://127.0.0.1:3333" DATABASE_URL: "postgresql://postgres:password@127.0.0.1:54323/postgres?sslmode=disable" services: postgres: @@ -67,59 +67,12 @@ jobs: ${{ runner.os }}-elixir-client-build-[${{ github.ref_name }}]- ${{ runner.os }}-elixir-client-build- - - name: Cache sync-service dependencies - uses: actions/cache@v4 - with: - path: packages/sync-service/deps - key: "${{ runner.os }}-sync-service-deps-${{ env.MIX_ENV }}-${{ hashFiles('packages/sync-service/mix.lock') }}" - restore-keys: | - ${{ runner.os }}-sync-service-deps-${{ env.MIX_ENV }}-${{ hashFiles('packages/sync-service/mix.lock') }} - ${{ runner.os }}-sync-service-deps-${{ env.MIX_ENV }}- - ${{ runner.os }}-sync-service-deps- - - - name: Cache sync-service compiled code - uses: actions/cache@v4 - with: - path: | - packages/sync-service/_build/*/lib - !packages/sync-service/_build/*/lib/electric - key: "${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}-[${{ github.ref_name }}]-${{ github.sha }}" - restore-keys: | - ${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}-[${{ github.ref_name }}]-${{ github.sha }} - ${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}-[${{ github.ref_name }}]- - ${{ runner.os }}-sync-service-build-${{ env.MIX_ENV }}- - ${{ runner.os }}-sync-service-build- - - name: Install dependencies run: mix deps.get && mix deps.compile - name: Compiles without warnings run: mix compile --force --all-warnings --warnings-as-errors - - name: Install sync-service dependencies - run: mix deps.get && mix deps.compile - working-directory: packages/sync-service - env: - MIX_ENV: prod - - - uses: JarvusInnovations/background-action@v1 - name: Bootstrap System Under Test (SUT) - with: - run: | - mix start_dev - env mix run --no-halt & - - wait-on: "http://127.0.0.1:3100" - - tail: true - log-output-resume: stderr - wait-for: 1m - log-output: true - log-output-if: failure - working-directory: packages/sync-service - env: - MIX_ENV: prod - - name: Run tests run: mix test diff --git a/packages/elixir-client/.gitignore b/packages/elixir-client/.gitignore index e7cf3d449a..ee5e4f40fb 100644 --- a/packages/elixir-client/.gitignore +++ b/packages/elixir-client/.gitignore @@ -24,3 +24,5 @@ elixir_client-*.tar # Temporary files, for example, from tests. /tmp/ + +/persistent diff --git a/packages/elixir-client/config/runtime.exs b/packages/elixir-client/config/runtime.exs index d46d70ef47..e8ba373546 100644 --- a/packages/elixir-client/config/runtime.exs +++ b/packages/elixir-client/config/runtime.exs @@ -3,15 +3,34 @@ import Config config :logger, level: :warning if config_env() == :test do - default_database_url = "postgresql://postgres:password@localhost:54321/electric" + port = 3333 + default_database_url = "postgresql://postgres:password@localhost:54321/electric?sslmode=disable" database_url = System.get_env("DATABASE_URL", default_database_url) - default_electric_url = "http://localhost:3000" + connection_opts = Electric.Config.parse_postgresql_uri!(database_url) + + default_electric_url = "http://localhost:#{port}" electric_url = System.get_env("ELECTRIC_URL", default_electric_url) config :electric_client, - database_config: PostgresqlUri.parse(database_url), + database_config: connection_opts, electric_url: electric_url config :electric_client, Support.Repo, url: database_url + + config :sentry, + environment_name: config_env(), + client: Electric.Telemetry.SentryReqHTTPClient + + config :electric, + connection_opts: Electric.Utils.obfuscate_password(connection_opts), + # enable the http api so that the client tests against a real endpoint can + # run against our embedded electric instance. + enable_http_api: true, + service_port: port, + allow_shape_deletion?: false, + # use a non-default replication stream id so we can run the client + # tests at the same time as an active electric instance + replication_stream_id: "client_tests", + storage_dir: Path.join(System.tmp_dir!(), "electric/client-tests#{System.monotonic_time()}") end diff --git a/packages/elixir-client/lib/electric/client.ex b/packages/elixir-client/lib/electric/client.ex index 116045a32d..ae9efbe366 100644 --- a/packages/elixir-client/lib/electric/client.ex +++ b/packages/elixir-client/lib/electric/client.ex @@ -273,6 +273,31 @@ defmodule Electric.Client do end end + if Code.ensure_loaded?(Electric.Application) do + @doc """ + Get a client instance that runs against an embedded instance of electric, + that is an electric app running as a dependency of the current application. + """ + @spec embedded(Electric.Shapes.Api.options()) :: {:ok, t()} | {:error, term()} + def embedded(opts \\ []) do + api = + Electric.Application.api(Keyword.merge(opts, encoder: Electric.Shapes.Api.Encoder.Term)) + + Client.new( + base_url: "elixir://Electric.Client.Embedded", + fetch: {Electric.Client.Embedded, api: api} + ) + end + + @spec embedded!(Electric.Shapes.Api.options()) :: t() | no_return() + def embedded!(opts \\ []) do + case embedded(opts) do + {:ok, client} -> client + {:error, reason} -> raise ArgumentError, message: reason + end + end + end + @doc """ A shortcut to [`ShapeDefinition.new/2`](`Electric.Client.ShapeDefinition.new/2`). @@ -288,6 +313,16 @@ defmodule Electric.Client do end if Code.ensure_loaded?(Ecto) do + defp validate_queryable!(queryable) when is_atom(queryable) do + Code.ensure_loaded!(queryable) + + if function_exported?(queryable, :__schema__, 1) do + queryable + else + raise ArgumentError, message: "Expected Ecto struct or query, got #{inspect(queryable)}" + end + end + @doc """ Create a [`ShapeDefinition`](`Electric.Client.ShapeDefinition`) from an `Ecto` query. @@ -410,14 +445,4 @@ defmodule Electric.Client do request = request(client, method: :delete, shape: shape) Electric.Client.Fetch.request(client, request, []) end - - defp validate_queryable!(queryable) when is_atom(queryable) do - Code.ensure_loaded!(queryable) - - if function_exported?(queryable, :__schema__, 1) do - queryable - else - raise ArgumentError, message: "Expected Ecto struct or query, got #{inspect(queryable)}" - end - end end diff --git a/packages/elixir-client/lib/electric/client/embedded.ex b/packages/elixir-client/lib/electric/client/embedded.ex new file mode 100644 index 0000000000..64b0d9c5e2 --- /dev/null +++ b/packages/elixir-client/lib/electric/client/embedded.ex @@ -0,0 +1,66 @@ +if Code.ensure_loaded?(Electric.Shapes.Api) do + defmodule Electric.Client.Embedded do + alias Electric.Client.Fetch + alias Electric.Client.ShapeDefinition + alias Electric.Shapes.Api + + @behaviour Electric.Client.Fetch + + @impl Electric.Client.Fetch + def fetch(%Fetch.Request{method: :delete} = request, opts) do + {:ok, api} = Keyword.fetch(opts, :api) + + timestamp = DateTime.utc_now() + + with {:ok, request} <- Api.validate_for_delete(api, request_to_params(request)), + %Api.Response{} = response <- Api.delete_shape(request) do + {:ok, translate_response(response, timestamp)} + end + end + + def fetch(%Fetch.Request{method: :get} = request, opts) do + {:ok, api} = Keyword.fetch(opts, :api) + + timestamp = DateTime.utc_now() + + with {:ok, request} <- Api.validate(api, request_to_params(request)), + %Api.Response{} = response = Api.serve_shape_log(request) do + {:ok, translate_response(response, timestamp)} + end + end + + defp translate_response(%Api.Response{} = response, timestamp) do + %Fetch.Response{ + status: response.status, + last_offset: convert_offset(response.offset), + shape_handle: response.handle, + schema: Api.schema(response), + next_cursor: nil, + request_timestamp: timestamp, + body: response.body + } + end + + defp request_to_params(%Fetch.Request{shape: %ShapeDefinition{} = shape} = request) do + %{ + table: Electric.Utils.relation_to_sql({shape.namespace || "public", shape.table}), + offset: to_string(request.offset), + handle: request.shape_handle, + live: request.live, + where: shape.where, + columns: shape.columns, + replica: request.replica + } + end + + defp convert_offset(nil) do + nil + end + + # TODO: when we remove offset parsing from the elixir client, then it makes + # sense to use to_string on the server offset + defp convert_offset(%Electric.Replication.LogOffset{} = server_offset) do + Electric.Client.Offset.from_string!(to_string(server_offset)) + end + end +end diff --git a/packages/elixir-client/lib/electric/client/stream.ex b/packages/elixir-client/lib/electric/client/stream.ex index 63819f029a..5420176b81 100644 --- a/packages/elixir-client/lib/electric/client/stream.ex +++ b/packages/elixir-client/lib/electric/client/stream.ex @@ -154,6 +154,14 @@ defmodule Electric.Client.Stream do |> after_fetch() end + defp ensure_enum(body) do + case Enumerable.impl_for(body) do + nil -> List.wrap(body) + Enumerable.Map -> List.wrap(body) + _impl -> body + end + end + defp handle_response(%Fetch.Response{status: status} = resp, stream) when status in 200..299 do shape_handle = shape_handle!(resp) @@ -166,7 +174,7 @@ defmodule Electric.Client.Stream do |> Map.put(:offset, final_offset) resp.body - |> List.wrap() + |> ensure_enum() |> Enum.flat_map(&Message.parse(&1, shape_handle, final_offset, value_mapper_fun)) |> Enum.map(&Map.put(&1, :request_timestamp, resp.request_timestamp)) |> Enum.reduce_while(stream, &handle_msg/2) diff --git a/packages/elixir-client/mix.exs b/packages/elixir-client/mix.exs index 53c3dd0d0a..021d80531c 100644 --- a/packages/elixir-client/mix.exs +++ b/packages/elixir-client/mix.exs @@ -45,7 +45,8 @@ defmodule Electric.Client.MixProject do {:bypass, "~> 2.1", only: [:test]}, {:postgrex, "~> 0.19", only: [:test]}, {:postgresql_uri, "~> 0.1", only: [:test]}, - {:uuid, "~> 1.1", only: [:test]} + {:uuid, "~> 1.1", only: [:test]}, + {:electric, path: "../sync-service", only: [:test]} ] end diff --git a/packages/elixir-client/mix.lock b/packages/elixir-client/mix.lock index 7445840402..7e1fbad269 100644 --- a/packages/elixir-client/mix.lock +++ b/packages/elixir-client/mix.lock @@ -1,20 +1,31 @@ %{ + "acceptor_pool": {:hex, :acceptor_pool, "1.0.0", "43c20d2acae35f0c2bcd64f9d2bde267e459f0f3fd23dab26485bf518c281b21", [:rebar3], [], "hexpm", "0cbcd83fdc8b9ad2eee2067ef8b91a14858a5883cb7cd800e6fcd5803e158788"}, + "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, + "bandit": {:hex, :bandit, "1.6.7", "42f30e37a1c89a2a12943c5dca76f731a2313e8a2e21c1a95dc8241893e922d1", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "551ba8ff5e4fc908cbeb8c9f0697775fb6813a96d9de5f7fe02e34e76fd7d184"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, + "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, + "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, "cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"}, + "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"}, + "dotenvy": {:hex, :dotenvy, "0.9.0", "aad823209cd7c13babe2dc310d9e54ce0203674cbd7631b0ced2a771e3a49532", [:mix], [], "hexpm", "ab959208a9ad02ff26ce1c5d4911668925c12a6cf58287ef77ae63161909c73b"}, "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, "ecto": {:hex, :ecto, "3.12.3", "1a9111560731f6c3606924c81c870a68a34c819f6d4f03822f370ea31a582208", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9efd91506ae722f95e48dc49e70d0cb632ede3b7a23896252a60a14ac6d59165"}, "ecto_sql": {:hex, :ecto_sql, "3.12.0", "73cea17edfa54bde76ee8561b30d29ea08f630959685006d9c6e7d1e59113b7d", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dc9e4d206f274f3947e96142a8fdc5f69a2a6a9abb4649ef5c882323b6d512f0"}, + "electric_cubdb": {:hex, :electric_cubdb, "2.0.2", "36f86e3c52dc26f4e077a49fbef813b1a38d3897421cece851f149190b34c16c", [:mix], [], "hexpm", "0c0e24b31fb76ad1b33c5de2ab35c41a4ff9da153f5c1f9b15e2de78575acaf2"}, "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, "finch": {:hex, :finch, "0.19.0", "c644641491ea854fc5c1bbaef36bfc764e3f08e7185e1f084e35e0672241b76d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc5324ce209125d1e2fa0fcd2634601c52a787aff1cd33ee833664a5af4ea2b6"}, "gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"}, + "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, + "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, + "hpack": {:hex, :hpack_erl, "0.3.0", "2461899cc4ab6a0ef8e970c1661c5fc6a52d3c25580bc6dd204f84ce94669926", [:rebar3], [], "hexpm", "d6137d7079169d8c485c6962dfe261af5b9ef60fbc557344511c1e65e3d95fb0"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"}, @@ -23,15 +34,37 @@ "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, + "nimble_ownership": {:hex, :nimble_ownership, "1.0.1", "f69fae0cdd451b1614364013544e66e4f5d25f36a2056a9698b793305c5aa3a6", [:mix], [], "hexpm", "3825e461025464f519f3f3e4a1f9b68c47dc151369611629ad08b636b73bb22d"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "opentelemetry": {:hex, :opentelemetry, "1.5.0", "7dda6551edfc3050ea4b0b40c0d2570423d6372b97e9c60793263ef62c53c3c2", [:rebar3], [{:opentelemetry_api, "~> 1.4", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "cdf4f51d17b592fc592b9a75f86a6f808c23044ba7cf7b9534debbcc5c23b0ee"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.4.0", "63ca1742f92f00059298f478048dfb826f4b20d49534493d6919a0db39b6db04", [:mix, :rebar3], [], "hexpm", "3dfbbfaa2c2ed3121c5c483162836c4f9027def469c41578af5ef32589fcfc58"}, + "opentelemetry_exporter": {:hex, :opentelemetry_exporter, "1.8.0", "5d546123230771ef4174e37bedfd77e3374913304cd6ea3ca82a2add49cd5d56", [:rebar3], [{:grpcbox, ">= 0.0.0", [hex: :grpcbox, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.5.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.4.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:tls_certificate_check, "~> 1.18", [hex: :tls_certificate_check, repo: "hexpm", optional: false]}], "hexpm", "a1f9f271f8d3b02b81462a6bfef7075fd8457fdb06adff5d2537df5e2264d9af"}, + "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"}, + "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"}, + "otel_metric_exporter": {:hex, :otel_metric_exporter, "0.2.2", "4594d95d4cb5ee49549125feec1a10e388cfe055d8060779e98808989ee039eb", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.13.0", [hex: :protobuf, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "757a28627e5f036543e9f5711051c0e18511d65cdd063e08181b44580d90eac1"}, + "pg_query_ex": {:hex, :pg_query_ex, "0.5.3", "84bf09f4ea10ada6e1cbfd739ccb9ffb6e5b21d87ab81cf52a42fefcc1808566", [:make, :mix], [{:elixir_make, "~> 0.4", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:protox, "~> 1.7", [hex: :protox, repo: "hexpm", optional: false]}], "hexpm", "ec0554d6d287da4cc15cc773577ef61cf41d5d6fcc784bb85f6209439cb246a7"}, "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, "plug_cowboy": {:hex, :plug_cowboy, "2.7.2", "fdadb973799ae691bf9ecad99125b16625b1c6039999da5fe544d99218e662e4", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "245d8a11ee2306094840c000e8816f0cbed69a23fc0ac2bcf8d7835ae019bb2f"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, "postgresql_uri": {:hex, :postgresql_uri, "0.1.0", "fab3aa8b3e5fe6c4df6d2e80b89a3e99580404b15dde727606e370b74026060b", [:mix], [], "hexpm", "7db308c2eaab0bf7c2864e6bfdd1ed4496f4370ef2f0b778cbe39019b4e0460c"}, "postgrex": {:hex, :postgrex, "0.19.1", "73b498508b69aded53907fe48a1fee811be34cc720e69ef4ccd568c8715495ea", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "8bac7885a18f381e091ec6caf41bda7bb8c77912bb0e9285212829afe5d8a8f8"}, + "protobuf": {:hex, :protobuf, "0.13.0", "7a9d9aeb039f68a81717eb2efd6928fdf44f03d2c0dfdcedc7b560f5f5aae93d", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "21092a223e3c6c144c1a291ab082a7ead32821ba77073b72c68515aa51fef570"}, + "protox": {:hex, :protox, "1.7.8", "ccae41afec6e63cf061bee874d7d042ed585d501df1cd004661ffac0e5628686", [:mix], [{:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm", "f6702c9deb9fb7cd2eadd73d3dbc0303c506dc87635e509228c61309f7062933"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, + "remote_ip": {:hex, :remote_ip, "1.2.0", "fb078e12a44414f4cef5a75963c33008fe169b806572ccd17257c208a7bc760f", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2ff91de19c48149ce19ed230a81d377186e4412552a597d6a5137373e5877cb7"}, "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, + "retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"}, + "sentry": {:hex, :sentry, "10.8.1", "aa45309785e1521416225adb16e0b4d8b957578804527f3c7babb6fefbc5e456", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "495b3cdadad90ba72eef973aa3dec39b3b8b2a362fe87e2f4ef32133ac3b4097"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"}, + "telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.2.1", "c9755987d7b959b557084e6990990cb96a50d6482c683fb9622a63837f3cd3d8", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5e2c599da4983c4f88a33e9571f1458bf98b0cf6ba930f1dc3a6e8cf45d5afb6"}, + "telemetry_metrics_statsd": {:hex, :telemetry_metrics_statsd, "0.7.1", "3502235bb5b35ce50d608bf0f34369ef76eb92a4dbc8708c7e8780ca0da2d53e", [:mix], [{:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "06338d9dc3b4a202f11a6e706fd3feba4c46100d0aca23688dea0b8f801c361f"}, + "telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"}, + "thousand_island": {:hex, :thousand_island, "1.3.9", "095db3e2650819443e33237891271943fad3b7f9ba341073947581362582ab5a", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "25ab4c07badadf7f87adb4ab414e0ed374e5f19e72503aa85132caa25776e54f"}, + "tls_certificate_check": {:hex, :tls_certificate_check, "1.26.0", "c0e8ffab875748f2b122d4d4e465aeaa7249ea539f1004b7922cb3c61ffe261d", [:rebar3], [{:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "1bad73d88637f788b554a8e939c25db2bdaac88b10fffd5bba9d1b65f43a6b54"}, + "tz": {:hex, :tz, "0.28.1", "717f5ffddfd1e475e2a233e221dc0b4b76c35c4b3650b060c8e3ba29dd6632e9", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.6", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "bfdca1aa1902643c6c43b77c1fb0cb3d744fd2f09a8a98405468afdee0848c8a"}, "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"}, + "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, } diff --git a/packages/elixir-client/test/electric/client/ecto_adapter_test.exs b/packages/elixir-client/test/electric/client/ecto_adapter_test.exs index 0a69943a42..1e6c1a3fcd 100644 --- a/packages/elixir-client/test/electric/client/ecto_adapter_test.exs +++ b/packages/elixir-client/test/electric/client/ecto_adapter_test.exs @@ -195,7 +195,7 @@ defmodule Electric.Client.EctoAdapterTest do Support.Repo.insert(value1) Support.Repo.insert(value2) - assert_receive {:stream, %Message.ControlMessage{control: :up_to_date}}, 500 + assert_receive {:stream, %Message.ControlMessage{control: :up_to_date}}, 5000 assert_receive {:stream, %Message.ChangeMessage{} = message}, 500 assert %TestTable{ diff --git a/packages/elixir-client/test/electric/client/embedded_test.exs b/packages/elixir-client/test/electric/client/embedded_test.exs new file mode 100644 index 0000000000..84f84f11c8 --- /dev/null +++ b/packages/elixir-client/test/electric/client/embedded_test.exs @@ -0,0 +1,194 @@ +defmodule Electric.Client.EmbeddedTest do + use ExUnit.Case, async: false + + @moduletag :capture_log + + alias Electric.Client + alias Electric.Client.Message.{ChangeMessage, ControlMessage} + alias Electric.Client.ShapeDefinition + + import Support.DbSetup + import Support.ClientHelpers + + defp stream(ctx) do + client_stream(ctx, []) + end + + defp stream(ctx, opts) when is_list(opts) do + client_stream(ctx, opts) + end + + defp stream(ctx, limit) when is_integer(limit) do + client_stream(ctx, []) |> Enum.take(limit) + end + + defp client_stream(ctx, opts) do + Client.stream(ctx.client, ctx.shape, opts) + end + + defp with_embedded_client(_ctx) do + {:ok, client} = Electric.Client.embedded() + [client: client] + end + + setup [:with_unique_table, :with_embedded_client] + + setup(ctx) do + shape = ShapeDefinition.new!(ctx.tablename) + + on_exit(fn -> + ExUnit.CaptureLog.capture_log(fn -> + Client.delete_shape(ctx.client, shape) + end) + end) + + [shape: shape] + end + + test "streams an empty shape", ctx do + assert [%ControlMessage{control: :up_to_date, offset: offset0()}] = stream(ctx, 1) + end + + test "streams a non empty shape", ctx do + %{tablename: table} = ctx + + {:ok, id1} = insert_item(ctx) + {:ok, id2} = insert_item(ctx) + {:ok, id3} = insert_item(ctx) + + # snapshot values + msgs = stream(ctx, 4) + + assert [ + %ChangeMessage{ + headers: %{operation: :insert, relation: ["public", ^table]}, + value: %{"id" => ^id1}, + offset: %Electric.Client.Offset{tx: 0, op: 0} + }, + %ChangeMessage{ + headers: %{operation: :insert, relation: ["public", ^table]}, + value: %{"id" => ^id2}, + offset: %Electric.Client.Offset{tx: 0, op: 0} + }, + %ChangeMessage{ + headers: %{operation: :insert, relation: ["public", ^table]}, + value: %{"id" => ^id3}, + offset: %Electric.Client.Offset{tx: 0, op: 0} + }, + up_to_date0() + ] = msgs + + # 1 timestamp for the snapshot, 1 for the up-to-date response + assert length(Enum.uniq_by(msgs, & &1.request_timestamp)) == 2 + end + + test "streams live data changes", ctx do + {:ok, id1} = insert_item(ctx) + + parent = self() + stream = stream(ctx) + + {:ok, _task} = + start_supervised( + {Task, + fn -> + stream + |> Stream.each(&send(parent, {:stream, 1, &1})) + |> Stream.run() + end}, + id: {:stream, 1} + ) + + {:ok, _task} = + start_supervised( + {Task, + fn -> + stream + |> Stream.each(&send(parent, {:stream, 2, &1})) + |> Stream.run() + end}, + id: {:stream, 2} + ) + + assert_receive {:stream, 1, %ChangeMessage{value: %{"id" => ^id1}}}, 5000 + assert_receive {:stream, 1, up_to_date0()} + assert_receive {:stream, 2, %ChangeMessage{value: %{"id" => ^id1}}}, 5000 + assert_receive {:stream, 2, up_to_date0()} + + {:ok, id2} = insert_item(ctx) + {:ok, id3} = insert_item(ctx) + + assert_receive {:stream, 1, %ChangeMessage{value: %{"id" => ^id2}}}, 5000 + assert_receive {:stream, 1, %ChangeMessage{value: %{"id" => ^id3}}}, 5000 + assert_receive {:stream, 1, up_to_date()} + + assert_receive {:stream, 2, %ChangeMessage{value: %{"id" => ^id2}}}, 5000 + assert_receive {:stream, 2, %ChangeMessage{value: %{"id" => ^id3}}}, 5000 + assert_receive {:stream, 2, up_to_date()} + end + + test "sends full rows with replica: :full", ctx do + {:ok, id1} = insert_item(ctx, title: "Changing item") + parent = self() + stream = stream(ctx, replica: :full) + + {:ok, _task} = + start_supervised( + {Task, + fn -> + stream + |> Stream.each(&send(parent, {:stream, 1, &1})) + |> Stream.run() + end}, + id: {:stream, 1} + ) + + assert_receive {:stream, 1, %ChangeMessage{value: %{"id" => ^id1}}}, 5000 + assert_receive {:stream, 1, up_to_date0()} + + :ok = update_item(ctx, id1, value: 999) + + assert_receive {:stream, 1, + %ChangeMessage{ + value: %{"id" => ^id1, "value" => 999, "title" => "Changing item"} + }}, + 500 + + assert_receive {:stream, 1, up_to_date()} + end + + test "supports shapes with where clauses and column lists", ctx do + %{tablename: table} = ctx + + shape = ShapeDefinition.new!(table, where: "value IS NOT NULL", columns: ["id", "value"]) + + {:ok, id1} = insert_item(ctx) + {:ok, id2} = insert_item(ctx) + {:ok, id3} = insert_item(ctx) + + # snapshot values + msgs = stream(%{ctx | shape: shape}, 4) + + assert [ + %ChangeMessage{ + headers: %{operation: :insert, relation: ["public", ^table]}, + value: %{"id" => ^id1}, + offset: %Electric.Client.Offset{tx: 0, op: 0} + }, + %ChangeMessage{ + headers: %{operation: :insert, relation: ["public", ^table]}, + value: %{"id" => ^id2}, + offset: %Electric.Client.Offset{tx: 0, op: 0} + }, + %ChangeMessage{ + headers: %{operation: :insert, relation: ["public", ^table]}, + value: %{"id" => ^id3}, + offset: %Electric.Client.Offset{tx: 0, op: 0} + }, + up_to_date0() + ] = msgs + + # 1 timestamp for the snapshot, 1 for the up-to-date response + assert length(Enum.uniq_by(msgs, & &1.request_timestamp)) == 2 + end +end diff --git a/packages/elixir-client/test/electric/client_test.exs b/packages/elixir-client/test/electric/client_test.exs index 7223a19a0f..68f7d3b715 100644 --- a/packages/elixir-client/test/electric/client_test.exs +++ b/packages/elixir-client/test/electric/client_test.exs @@ -1,5 +1,5 @@ defmodule Electric.ClientTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false @moduletag :capture_log @@ -180,7 +180,9 @@ defmodule Electric.ClientTest do shape = ShapeDefinition.new!(ctx.tablename) on_exit(fn -> - Client.delete_shape(client, shape) + ExUnit.CaptureLog.capture_log(fn -> + Client.delete_shape(client, shape) + end) end) [client: client, shape: shape] diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 76d3e1e15d..a66d2bd25d 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -46,6 +46,18 @@ defmodule Electric.Application do publication_name = "electric_publication_#{replication_stream_id}" slot_name = "electric_slot_#{replication_stream_id}" + api_server = + if Electric.Config.get_env(:enable_http_api) do + [ + {Bandit, + plug: {Electric.Plug.Router, router_opts}, + port: Electric.Config.get_env(:service_port), + thousand_island_options: http_listener_options()} + ] + else + [] + end + # The root application supervisor starts the core global processes, including the HTTP # server and the database connection manager. The latter is responsible for establishing # all needed connections to the database (acquiring the exclusive access lock, opening a @@ -75,18 +87,42 @@ defmodule Electric.Application do storage: storage, chunk_bytes_threshold: Electric.Config.get_env(:chunk_bytes_threshold) }, - {Electric.Telemetry, stack_id: stack_id, storage: storage}, - {Bandit, - plug: {Electric.Plug.Router, router_opts}, - port: Electric.Config.get_env(:service_port), - thousand_island_options: http_listener_options()} + {Electric.Telemetry, stack_id: stack_id, storage: storage} ], + api_server, prometheus_endpoint(Electric.Config.get_env(:prometheus_port)) ]) Supervisor.start_link(children, strategy: :one_for_one, name: Electric.Supervisor) end + @doc """ + Returns a configured Electric.Shapes.Api instance + """ + def api(overrides \\ []) do + config = + Enum.reduce( + [ + Electric.StackSupervisor.build_shared_opts( + stack_id: Electric.Config.get_env(:provided_database_id), + stack_events_registry: Registry.StackEvents, + storage: Electric.Config.get_env(:storage) + ), + [ + long_poll_timeout: 20_000, + max_age: Electric.Config.get_env(:cache_max_age), + stale_age: Electric.Config.get_env(:cache_stale_age), + allow_shape_deletion: Electric.Config.get_env(:allow_shape_deletion?) + ], + overrides + ], + [], + &Keyword.merge(&2, &1) + ) + + Electric.Shapes.Api.configure!(config) + end + defp prometheus_endpoint(nil), do: [] defp prometheus_endpoint(port) do diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 05533fdd34..d30f49152d 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -31,30 +31,32 @@ defmodule Electric.Config do @build_env Mix.env() @defaults [ - # Database + ## Database provided_database_id: "single_stack", db_pool_size: 20, replication_stream_id: "default", replication_slot_temporary?: false, - # HTTP API + ## HTTP API + # set enable_http_api: false to turn of the HTTP server totally + enable_http_api: true, cache_max_age: 60, cache_stale_age: 60 * 5, chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), allow_shape_deletion?: false, service_port: 3000, listen_on_ipv6?: false, - # Storage + ## Storage storage_dir: "./persistent", storage: &Electric.Config.Defaults.storage/0, persistent_kv: &Electric.Config.Defaults.persistent_kv/0, - # Telemetry + ## Telemetry instance_id: nil, prometheus_port: nil, call_home_telemetry?: @build_env == :prod, telemetry_statsd_host: nil, telemetry_url: URI.new!("https://checkpoint.electric-sql.com"), system_metrics_poll_interval: :timer.seconds(5), - # Memory + ## Memory shape_hibernate_after: :timer.seconds(30) ] diff --git a/packages/sync-service/lib/electric/plug/utils.ex b/packages/sync-service/lib/electric/plug/utils.ex index c36d032baa..5f024cc20b 100644 --- a/packages/sync-service/lib/electric/plug/utils.ex +++ b/packages/sync-service/lib/electric/plug/utils.ex @@ -47,6 +47,10 @@ defmodule Electric.Plug.Utils do end) end + def parse_columns_param([col | _] = columns) when is_binary(col) do + {:ok, columns} + end + @doc """ Calculate the next interval that should be used for long polling based on the current time and previous interval used. diff --git a/packages/sync-service/lib/electric/shapes/api.ex b/packages/sync-service/lib/electric/shapes/api.ex index b78666d53b..c64cc59ca4 100644 --- a/packages/sync-service/lib/electric/shapes/api.ex +++ b/packages/sync-service/lib/electric/shapes/api.ex @@ -11,6 +11,24 @@ defmodule Electric.Shapes.Api do require Logger + @options [ + inspector: [type: :mod_arg, required: true], + pg_id: [type: :string], + registry: [type: :atom, required: true], + shape_cache: [type: :mod_arg, required: true], + stack_events_registry: [type: :atom, required: true], + stack_id: [type: :string, required: true], + storage: [type: :mod_arg, required: true], + allow_shape_deletion: [type: :boolean], + long_poll_timeout: [type: :integer], + max_age: [type: :integer], + stack_ready_timeout: [type: :integer], + stale_age: [type: :integer], + encoder: [type: :atom] + ] + @schema NimbleOptions.new!(@options) + @option_keys Keyword.keys(@options) |> MapSet.new() + defguardp is_configured(api) when api.configured defstruct [ @@ -31,6 +49,7 @@ defmodule Electric.Shapes.Api do ] @type t() :: %__MODULE__{} + @type options() :: [unquote(NimbleOptions.option_typespec(@schema))] # Aliasing for pattern matching @before_all_offset LogOffset.before_all() @@ -44,13 +63,24 @@ defmodule Electric.Shapes.Api do # when using shapes api @behaviour Access + @doc false + def options_schema do + @schema + end + + def configure!(opts) do + {api, _unused_opts} = configure(opts) + api + end + def configure(opts) do - api = %__MODULE__{configured: true} |> struct(opts) |> validate_encoder!() + {valid, extra} = Keyword.split_with(opts, fn {k, _} -> MapSet.member?(@option_keys, k) end) + + options = NimbleOptions.validate!(valid, @schema) - used_keys = api |> Map.from_struct() |> Map.keys() - unused_opts = Keyword.drop(opts, used_keys) + api = %__MODULE__{configured: true} |> struct(options) |> validate_encoder!() - {api, unused_opts} + {api, extra} end def plug_opts(opts) do diff --git a/packages/sync-service/lib/electric/shapes/api/params.ex b/packages/sync-service/lib/electric/shapes/api/params.ex index e1f461bab5..8a68ad25e1 100644 --- a/packages/sync-service/lib/electric/shapes/api/params.ex +++ b/packages/sync-service/lib/electric/shapes/api/params.ex @@ -10,6 +10,27 @@ defmodule Electric.Shapes.Api.Params do @tmp_compaction_flag :experimental_compaction @primary_key false + defmodule ColumnList do + use Ecto.Type + + def type, do: {:array, :string} + + def cast([_ | _] = columns) do + {:ok, Enum.map(columns, &to_string/1)} + end + + def cast(columns) when is_binary(columns) do + with {:error, reason} <- Electric.Plug.Utils.parse_columns_param(columns) do + {:error, message: reason} + end + end + + def cast(_), do: :error + + def load([_ | _] = columns), do: {:ok, columns} + + def dump([_ | _] = columns), do: {:ok, columns} + end embedded_schema do field(:table, :string) @@ -17,7 +38,7 @@ defmodule Electric.Shapes.Api.Params do field(:handle, :string) field(:live, :boolean, default: false) field(:where, :string) - field(:columns, :string) + field(:columns, ColumnList) field(:shape_definition, :string) field(:replica, Ecto.Enum, values: [:default, :full], default: :default) field(@tmp_compaction_flag, :boolean, default: false) @@ -30,7 +51,6 @@ defmodule Electric.Shapes.Api.Params do |> cast_params() |> validate_required([:table, :offset]) |> cast_offset() - |> cast_columns() |> validate_handle_with_offset() |> validate_live_with_offset() |> cast_root_table(inspector: api.inspector) @@ -50,7 +70,6 @@ defmodule Electric.Shapes.Api.Params do %{changes: %{table: _table}} = changeset -> changeset |> validate_required([:table]) - |> cast_columns() |> cast_root_table(inspector: api.inspector) |> apply_action(:validate) |> convert_error(api) @@ -68,9 +87,7 @@ defmodule Electric.Shapes.Api.Params do defp cast_params(params) do %__MODULE__{} - |> cast(params, __schema__(:fields) -- [:shape_definition], - message: fn _, _ -> "must be %{type}" end - ) + |> cast(params, __schema__(:fields) -- [:shape_definition]) end defp convert_error({:ok, params}, _api), do: {:ok, params} @@ -100,21 +117,6 @@ defmodule Electric.Shapes.Api.Params do end end - def cast_columns(%Ecto.Changeset{valid?: false} = changeset), do: changeset - - def cast_columns(%Ecto.Changeset{} = changeset) do - case fetch_field!(changeset, :columns) do - nil -> - changeset - - columns -> - case Electric.Plug.Utils.parse_columns_param(columns) do - {:ok, parsed_cols} -> put_change(changeset, :columns, parsed_cols) - {:error, reason} -> add_error(changeset, :columns, reason) - end - end - end - def validate_handle_with_offset(%Ecto.Changeset{valid?: false} = changeset), do: changeset diff --git a/packages/sync-service/test/electric/shapes/api_test.exs b/packages/sync-service/test/electric/shapes/api_test.exs index a11df57e80..e9b7d9a394 100644 --- a/packages/sync-service/test/electric/shapes/api_test.exs +++ b/packages/sync-service/test/electric/shapes/api_test.exs @@ -319,6 +319,22 @@ defmodule Electric.Shapes.ApiTest do } end + test "returns error for invalid column spec", ctx do + assert {:error, %{status: 400} = response} = + Api.validate( + ctx.api, + %{ + table: "public.users", + offset: "-1", + columns: ",,," + } + ) + + assert response_body(response) == %{ + columns: ["Invalid zero-length delimited identifier"] + } + end + test "honours replica query param for shape", ctx do test_shape_handle = "test-shape-without-deltas" next_offset = LogOffset.increment(@first_offset)