From ebc6db54c91f93e19e65feec5a56c067dc533777 Mon Sep 17 00:00:00 2001 From: Dario Heinisch Date: Fri, 11 Oct 2024 15:25:36 -0700 Subject: [PATCH] FEAT: Add stream_without_transaction --- lib/ecto/repo.ex | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/lib/ecto/repo.ex b/lib/ecto/repo.ex index 5214d59916..cb1d0d04c3 100644 --- a/lib/ecto/repo.ex +++ b/lib/ecto/repo.ex @@ -430,6 +430,46 @@ defmodule Ecto.Repo do end end + def stream_without_transaction(queryable, opts \\ []) do + limit = Keyword.get(opts, :limit, 500) + + base_query = queryable |> limit(^limit) + + state = case Keyword.get(opts, :next, nil) do + nil -> %{offset: 0} + next -> %{next: next, query: base_query} + end + + all_ops = Keyword.drop(opts, [:limit, :next]) + + Stream.resource( + fn -> state end, + fn state -> + {data, state} = case state do + %{next: next, query: query} -> + data = all(query, all_ops) + + query = case data do + [] -> nil + _ -> next.(base_query, data) + end + + {data, %{next: next, query: query}} + + %{offset: o} -> + data = base_query |> offset(^o) |> all(all_ops) + {data, %{offset: o + limit}} + end + + case data do + [] -> {:halt, :done} + _ -> {data, state} + end + end, + fn _ -> :ok end + ) + end + def all(queryable, opts \\ []) do repo = get_dynamic_repo()