From db16ed0f5305a25e30f72ef937e8efd627c0e3a3 Mon Sep 17 00:00:00 2001 From: Philip Sampaio Date: Tue, 9 Jan 2024 16:24:10 -0300 Subject: [PATCH] Add a second pipeline to demonstrate navigation (#30) --- dev.exs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/dev.exs b/dev.exs index 990258a..a17446b 100644 --- a/dev.exs +++ b/dev.exs @@ -59,6 +59,45 @@ defmodule Demo.Pipeline do end end +defmodule Demo.SecondPipeline do + use Broadway + + def start_link(opts) do + Broadway.start_link(__MODULE__, + name: __MODULE__, + producer: [ + module: {Broadway.DummyProducer, opts}, + concurrency: 1 + ], + processors: [ + default: [concurrency: 2] + ], + batchers: [ + default: [batch_size: 6, concurrency: 2, batch_timeout: 1000], + ] + ) + end + + @impl true + def handle_message(_, %Broadway.Message{} = message, _) do + Broadway.Message.update_data(message, fn data -> + hex = Base.encode16(:crypto.strong_rand_bytes(64)) + + String.upcase(data <> hex) + end) + |> Broadway.Message.put_batcher(:default) + end + + @impl true + def handle_batch(:default, messages, _, _) do + Enum.map(messages, fn message -> + Broadway.Message.update_data(message, fn data -> + String.downcase(data) + end) + end) + end +end + defmodule FakeProducer do use GenServer @@ -74,6 +113,7 @@ defmodule FakeProducer do def handle_info(:publish, _timer) do for i <- 1..1234, do: Broadway.test_message(Demo.Pipeline, "hello #{i}") + for i <- 1..100, do: Broadway.test_message(Demo.SecondPipeline, "hello #{i}") timer = Process.send_after(self(), :publish, 100) @@ -164,6 +204,7 @@ Task.start(fn -> children = [ {Phoenix.PubSub, [name: Demo.PubSub, adapter: Phoenix.PubSub.PG2]}, Demo.Pipeline, + Demo.SecondPipeline, FakeProducer, DemoWeb.Endpoint ]