Skip to content

Commit

Permalink
Implement exponential backoffs for critical functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Guarino committed Aug 8, 2018
1 parent 35db823 commit fe4d588
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 2 deletions.
16 changes: 14 additions & 2 deletions lib/exlasticsearch/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule ExlasticSearch.Repo do
url: "https://elasticsearch.url.io:9200"
"""
use Scrivener
use ExlasticSearch.Retry.Decorator
alias ExlasticSearch.{Indexable, Query, Response}
alias Elastix.{Index, Mapping, Document, Bulk, Search, HTTP}
require Logger
Expand Down Expand Up @@ -93,14 +94,18 @@ defmodule ExlasticSearch.Repo do
protocol prior to insertion
"""
@spec index(struct) :: response
@decorate retry()
def index(%{__struct__: model} = struct) do
id = Indexable.id(struct)
document = Indexable.document(struct)
es_url() |> Document.index(model.__es_index__(), model.__doc_type__(), id, document)

es_url()
|> Document.index(model.__es_index__(), model.__doc_type__(), id, document)
|> mark_failure()
end

@doc """
Gets an ES document by id
Gets an ES document by _id
"""
@spec get(struct) :: response
def get(%{__struct__: model} = struct) do
Expand Down Expand Up @@ -130,9 +135,11 @@ defmodule ExlasticSearch.Repo do
Removes `struct` from the index of its model
"""
@spec delete(struct) :: response
@decorate retry()
def delete(%{__struct__: model} = struct) do
es_url()
|> Document.delete(model.__es_index__(), model.__doc_type__(), Indexable.id(struct))
|> mark_failure()
end


Expand All @@ -157,6 +164,7 @@ defmodule ExlasticSearch.Repo do

es_url()
|> Bulk.post(bulk_request, [], opts)
|> mark_failure()
end

def index_stream(stream, parallelism \\ 10, demand \\ 10) do
Expand Down Expand Up @@ -192,4 +200,8 @@ defmodule ExlasticSearch.Repo do
end
end
defp decode(response, _, _), do: response

defp mark_failure({:ok, %HTTPoison.Response{body: %{"_shards" => %{"successful" => 0}}} = result}), do: {:error, result}
defp mark_failure({:ok, %HTTPoison.Response{body: %{"errors" => true}} = result}), do: {:error, result}
defp mark_failure(result), do: result
end
18 changes: 18 additions & 0 deletions lib/exlasticsearch/retry/decorator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule ExlasticSearch.Retry.Decorator do
@moduledoc """
Decorator for applying retry strategies to a function. Configure with
```
config :exlasticsearch, :retry, strategy: MyStrategy, additional_opts
```
"""
use Decorator.Define, retry: 0
@config Application.get_env(:exlasticsearch, :retry, [])

def retry(body, _ctx) do
{strategy, config} = Keyword.pop(@config, :strategy, ExlasticSearch.Retry.ExponentialBackoff)
quote do
unquote(strategy).retry(fn -> unquote(body) end, unquote(config))
end
end
end
32 changes: 32 additions & 0 deletions lib/exlasticsearch/retry/exponential_backoff.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule ExlasticSearch.Retry.ExponentialBackoff do
@moduledoc """
Retry Strategy implementation utilizing exponential backoffs
"""
@behaviour ExlasticSearch.RetryStrategy

def retry(fun, opts) do
initial = Keyword.get(opts, :initial, 1)
max = Keyword.get(opts, :max, 3)
jitter = Keyword.get(opts, :jitter, 4)

do_retry(fun, max, initial, jitter, 0)
end

defp do_retry(fun, max, _, _, max), do: fun.()
defp do_retry(fun, max, initial, jitter, retry) do
case fun.() do
{:ok, result} -> {:ok, result}
{:error, _} ->
sleep(initial, retry, jitter)
|> :timer.sleep()

do_retry(fun, max, initial, jitter, retry + 1)
end
end

defp sleep(initial, retry, jitter) do
jitter = :rand.uniform(jitter)
exp = :math.pow(2, retry) |> round()
jitter + (initial * exp)
end
end
8 changes: 8 additions & 0 deletions lib/exlasticsearch/retry_strategy.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule ExlasticSearch.RetryStrategy do
@moduledoc """
Behavior for retrying a 0-arity function according to some strategy
"""
@type response :: {:ok, any} | {:error, any}
@type callable :: (-> {:ok, any} | {:error, any})
@callback retry(fnc :: callable, opts :: list) :: response
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Exlasticsearch.MixProject do
{:elastix, "~> 0.5.0"},
{:ecto, "~> 2.1.0"},
{:scrivener_ecto, "~> 1.0"},
{:decorator, "~> 1.2"},
{:ex_doc, ">= 0.0.0", only: :dev}
]
end
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%{
"certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
"decimal": {:hex, :decimal, "1.5.0", "b0433a36d0e2430e3d50291b1c65f53c37d56f83665b43d79963684865beab68", [:mix], [], "hexpm"},
"decorator": {:hex, :decorator, "1.2.3", "258681ae943e57bd92d821ea995e3994b4e0b62ae8404b5d892cb8b23b55b050", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "2.1.6", "29b45f393c2ecd99f83e418ea9b0a2af6078ecb30f401481abac8a473c490f84", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"},
"elastix": {:hex, :elastix, "0.5.0", "ff7b9b88c8bf3ba473f7f4bca76c658b2ca9fa59c7988aaea411ed5ed4c4cbd1", [:mix], [{:httpoison, ">= 0.7.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:poison, "~> 3.1", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
Expand Down
38 changes: 38 additions & 0 deletions test/exlasticsearch/repo_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule ExlasticSearch.RepoTest do
use ExUnit.Case, async: true
alias ExlasticSearch.{
Repo,
TestModel
}

setup_all do
Repo.create_index(TestModel)
Repo.create_mapping(TestModel)
:ok
end

describe "#index" do
test "It will index an element in es" do
model = %ExlasticSearch.TestModel{id: Ecto.UUID.generate()}
{:ok, _} = Repo.index(model)

assert exists?(model)
end
end

describe "#bulk" do
test "It will bulk index/delete from es" do
model = %ExlasticSearch.TestModel{id: Ecto.UUID.generate()}
{:ok, _} = Repo.bulk([{:index, model}])

assert exists?(model)
end
end

defp exists?(model) do
case Repo.get(model) do
{:ok, %{found: true}} -> true
_ -> false
end
end
end
12 changes: 12 additions & 0 deletions test/support/test_model.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,16 @@ defmodule ExlasticSearch.TestModel do

mapping :user, properties: %{ext_name: %{type: :text}}
end
end

defimpl ExlasticSearch.Indexable, for: ExlasticSearch.TestModel do
def id(%{id: id}), do: id

def document(struct) do
struct
|> Map.from_struct()
|> Map.take(@for.__mappings__())
end

def preload(struct), do: struct
end

0 comments on commit fe4d588

Please sign in to comment.