Skip to content

Commit

Permalink
feat: Add support for :on_conflict to inserts
Browse files Browse the repository at this point in the history
This enables upserts using Ecto's `:on_conflict` option on inserts.

There a couple limitations since this isn't backed by a sql database:

  * Ecto queries aren't supported for resolving conflicts.
  * The `:conflict_target` must be the table's primary key.

Additionally, this does not add support for `insert_all`.
  • Loading branch information
jaredhoyt committed Jan 4, 2024
1 parent 6e0ed2d commit 0504dc8
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 61 deletions.
82 changes: 53 additions & 29 deletions lib/etso/adapter/behaviour/schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ defmodule Etso.Adapter.Behaviour.Schema do
@moduledoc false
@behaviour Ecto.Adapter.Schema

alias Etso.Adapter.TableRegistry
alias Etso.ETS.TableStructure
alias Etso.ETS
alias Etso.ETS.Table

@impl Ecto.Adapter.Schema
def autogenerate(:id), do: :erlang.unique_integer()
Expand All @@ -12,48 +12,72 @@ defmodule Etso.Adapter.Behaviour.Schema do

@impl Ecto.Adapter.Schema
def insert_all(%{repo: repo}, %{schema: schema}, _, entries, _, _, _, _) do
{:ok, ets_table} = TableRegistry.get_table(repo, schema)
ets_field_names = TableStructure.field_names(schema)
ets_changes = TableStructure.entries_to_tuples(ets_field_names, entries)
ets_result = :ets.insert_new(ets_table, ets_changes)
if ets_result, do: {length(ets_changes), nil}, else: {0, nil}
table = Table.build(repo, schema)
result = ETS.insert_all(table, entries)

case result do
:ok -> {length(entries), []}
:error -> {0, nil}
end
end

@impl Ecto.Adapter.Schema
def insert(%{repo: repo}, %{schema: schema}, fields, _, _, _) do
{:ok, ets_table} = TableRegistry.get_table(repo, schema)
ets_field_names = TableStructure.field_names(schema)
ets_changes = TableStructure.fields_to_tuple(ets_field_names, fields)
ets_result = :ets.insert_new(ets_table, ets_changes)
if ets_result, do: {:ok, []}, else: {:invalid, [unique: "primary_key"]}
def insert(%{repo: repo}, %{schema: schema}, fields, on_conflict, _, _) do
table = Table.build(repo, schema)
result = ETS.insert(table, fields)

case result do
:ok -> {:ok, []}
:error -> handle_conflict(table, fields, on_conflict)
end
end

@impl Ecto.Adapter.Schema
def update(%{repo: repo}, %{schema: schema}, fields, filters, [], _) do
{:ok, ets_table} = TableRegistry.get_table(repo, schema)
[key_name] = schema.__schema__(:primary_key)
[{^key_name, key}] = filters
ets_updates = build_ets_updates(schema, fields)
ets_result = :ets.update_element(ets_table, key, ets_updates)
if ets_result, do: {:ok, []}, else: {:error, :stale}
table = Table.build(repo, schema)
key = Keyword.fetch!(filters, table.primary_key)
result = ETS.update(table, key, fields)

case result do
:ok -> {:ok, []}
:error -> {:error, :stale}
end
end

@impl Ecto.Adapter.Schema
def delete(%{repo: repo}, %{schema: schema}, filters, _, _) do
{:ok, ets_table} = TableRegistry.get_table(repo, schema)
[key_name] = schema.__schema__(:primary_key)
[{^key_name, key}] = filters
:ets.delete(ets_table, key)
table = Table.build(repo, schema)
key = Keyword.fetch!(filters, table.primary_key)
ETS.delete(table, key)

{:ok, []}
end

defp handle_conflict(_table, _fields, {:raise, _, _}) do
{:invalid, [unique: "primary_key"]}
end

defp handle_conflict(_table, _fields, {:nothing, _, _}) do
{:ok, []}
end

defp build_ets_updates(schema, fields) do
ets_field_names = TableStructure.field_names(schema)
defp handle_conflict(_table, _fields, {%Ecto.Query{}, _, _}) do
raise ArgumentError,
"queries cannot be used to handle insert conflicts for ets tables"
end

defp handle_conflict(table, fields, {replace, _, [target]}) do
unless target == table.primary_key do
raise ArgumentError, "invalid field `#{inspect(target)}` in :conflict_target"
end

{key, fields} = Keyword.pop(fields, table.primary_key)
{fields, _} = Keyword.split(fields, replace)
result = ETS.update(table, key, fields)

for {field_name, field_value} <- fields do
position_fun = fn x -> x == field_name end
position = 1 + Enum.find_index(ets_field_names, position_fun)
{position, field_value}
case result do
:ok -> {:ok, []}
:error -> {:error, :stale}
end
end
end
52 changes: 52 additions & 0 deletions lib/etso/ets.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Etso.ETS do
@moduledoc false

import Kernel, except: [apply: 2]

alias Etso.ETS.Table

def insert(%Table{} = table, data) do
changes = changeset(table, {:insert, data})

apply(:insert_new, [table.reference, changes])
end

def insert_all(%Table{} = table, data) do
changes = changeset(table, {:insert_all, data})

apply(:insert_new, [table.reference, changes])
end

def delete(%Table{} = table, key) do
apply(:delete, [table.reference, key])
end

def update(%Table{} = table, key, data) do
changes = changeset(table, {:update, data})

apply(:update_element, [table.reference, key, changes])
end

defp apply(fun, args) do
if apply(:ets, fun, args), do: :ok, else: :error
end

defp changeset(table, {:insert, data}) do
table.fields
|> Keyword.keys()
|> Enum.map(&Keyword.get(data, &1, nil))
|> List.to_tuple()
end

defp changeset(table, {:insert_all, data}) do
for entry <- data do
changeset(table, {:insert, entry})
end
end

defp changeset(table, {:update, data}) do
for {name, value} <- data do
{table.fields[name], value}
end
end
end
4 changes: 2 additions & 2 deletions lib/etso/ets/match_specification.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ defmodule Etso.ETS.MatchSpecification do
execute the given queries with ETS with as much pushed down to ETS as possible.
The basic shape of the match head is `[$1, $2, $3, …]` where each field is a named variable, the
ordering of the fields is determined by `Etso.ETS.TableStructure`.
ordering of the fields is determined by `Etso.ETS.Table`.
Conditions are compiled according to the wheres in the underlying Ecto query, while the body is
compiled based on the selected fields in the underlying Ecto query.
"""

def build(query, params) do
{_, schema} = query.from.source
field_names = Etso.ETS.TableStructure.field_names(schema)
field_names = Etso.ETS.Table.field_names(schema)
match_head = build_head(field_names)
match_conditions = build_conditions(field_names, params, query)
match_body = [build_body(field_names, query)]
Expand Down
35 changes: 35 additions & 0 deletions lib/etso/ets/table.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Etso.ETS.Table do
@moduledoc false

alias Etso.Adapter.TableRegistry

defstruct [:fields, :primary_key, :reference]

def build(repo, schema) do
%__MODULE__{}
|> set_fields(schema)
|> set_primary_key(schema)
|> set_reference(repo, schema)
end

def field_names(schema) do
fields = schema.__schema__(:fields)
primary_key = schema.__schema__(:primary_key)
primary_key ++ (fields -- primary_key)
end

defp set_fields(table, schema) do
fields = field_names(schema) |> Enum.with_index(1)
%{table | fields: fields}
end

defp set_primary_key(table, schema) do
[primary_key] = schema.__schema__(:primary_key)
%{table | primary_key: primary_key}
end

defp set_reference(table, repo, schema) do
{:ok, reference} = TableRegistry.get_table(repo, schema)
%{table | reference: reference}
end
end
25 changes: 0 additions & 25 deletions lib/etso/ets/table_structure.ex

This file was deleted.

70 changes: 65 additions & 5 deletions test/northwind/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,66 @@ defmodule Northwind.RepoTest do
Repo.delete(employee)
end

describe "inserts with :on_conflict" do
setup do
changes = %{first_name: "John", last_name: "Smith", employee_id: 300}
changeset = Model.Employee.changeset(changes)
Repo.insert(changeset)

changes = %{first_name: "Peter", last_name: "Wyatt", employee_id: 300}
[changeset: Model.Employee.changeset(changes)]
end

test "`:raise`", %{changeset: changeset} do
assert_raise Ecto.ConstraintError, fn ->
Repo.insert(changeset, on_conflict: :raise)
end

%{first_name: "John", last_name: "Smith"} = Repo.get(Model.Employee, 300)
end

test "`:nothing`", %{changeset: changeset} do
{:ok, _} = Repo.insert(changeset, on_conflict: :nothing, conflict_target: :employee_id)
%{first_name: "John", last_name: "Smith"} = Repo.get(Model.Employee, 300)
end

test "`:replace_all`", %{changeset: changeset} do
{:ok, _} = Repo.insert(changeset, on_conflict: :replace_all, conflict_target: :employee_id)
%{first_name: "Peter", last_name: "Wyatt"} = Repo.get(Model.Employee, 300)
end

test "`{:replace, fields}`", %{changeset: changeset} do
{:ok, _} =
Repo.insert(changeset,
on_conflict: {:replace, [:first_name]},
conflict_target: :employee_id
)

%{first_name: "Peter", last_name: "Smith"} = Repo.get(Model.Employee, 300)
end

test "`{:replace_all_except, fields}`", %{changeset: changeset} do
{:ok, _} =
Repo.insert(changeset,
on_conflict: {:replace_all_except, [:first_name]},
conflict_target: :employee_id
)

%{first_name: "John", last_name: "Wyatt"} = Repo.get(Model.Employee, 300)
end

test "`Ecto.Query`", %{changeset: changeset} do
assert_raise ArgumentError, fn ->
Repo.insert(changeset,
on_conflict: [set: [first_name: "Peter"]],
conflict_target: :employee_id
)
end

%{first_name: "John", last_name: "Smith"} = Repo.get(Model.Employee, 300)
end
end

test "Insert Employees" do
changes = [%{first_name: "Fred", employee_id: 100}, %{first_name: "Steven", employee_id: 200}]
nil = Repo.get(Model.Employee, 100)
Expand Down Expand Up @@ -203,20 +263,20 @@ defmodule Northwind.RepoTest do
count =
Model.Employee
|> select([e], count(e.employee_id))
|> Repo.one
|> Repo.one()

assert count ==
Model.Employee
|> Repo.all
|> length
Model.Employee
|> Repo.all()
|> length
end

test "Aggregate `count` with query" do
count =
Model.Employee
|> where([e], e.metadata["twitter"] == "@andrew_fuller")
|> select([e], count(e.employee_id))
|> Repo.one
|> Repo.one()

assert count == 1
end
Expand Down

0 comments on commit 0504dc8

Please sign in to comment.