Skip to content

Commit

Permalink
Add a second pipeline to demonstrate navigation (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
philss authored Jan 9, 2024
1 parent 4f4c394 commit db16ed0
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,45 @@ defmodule Demo.Pipeline do
end
end

defmodule Demo.SecondPipeline do
use Broadway

def start_link(opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Broadway.DummyProducer, opts},
concurrency: 1
],
processors: [
default: [concurrency: 2]
],
batchers: [
default: [batch_size: 6, concurrency: 2, batch_timeout: 1000],
]
)
end

@impl true
def handle_message(_, %Broadway.Message{} = message, _) do
Broadway.Message.update_data(message, fn data ->
hex = Base.encode16(:crypto.strong_rand_bytes(64))

String.upcase(data <> hex)
end)
|> Broadway.Message.put_batcher(:default)
end

@impl true
def handle_batch(:default, messages, _, _) do
Enum.map(messages, fn message ->
Broadway.Message.update_data(message, fn data ->
String.downcase(data)
end)
end)
end
end

defmodule FakeProducer do
use GenServer

Expand All @@ -74,6 +113,7 @@ defmodule FakeProducer do

def handle_info(:publish, _timer) do
for i <- 1..1234, do: Broadway.test_message(Demo.Pipeline, "hello #{i}")
for i <- 1..100, do: Broadway.test_message(Demo.SecondPipeline, "hello #{i}")

timer = Process.send_after(self(), :publish, 100)

Expand Down Expand Up @@ -164,6 +204,7 @@ Task.start(fn ->
children = [
{Phoenix.PubSub, [name: Demo.PubSub, adapter: Phoenix.PubSub.PG2]},
Demo.Pipeline,
Demo.SecondPipeline,
FakeProducer,
DemoWeb.Endpoint
]
Expand Down

0 comments on commit db16ed0

Please sign in to comment.