Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Stratus3D committed Apr 29, 2024
1 parent 9f09706 commit 7fc8734
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
14 changes: 10 additions & 4 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ defmodule BroadwayKafka.BrodClient do

@impl true
def ack(group_coordinator, generation_id, topic, partition, offset, config) do
:brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset)
if group_coordinator do
:brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset)

if config.offset_commit_on_ack do
:brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}])
if config.offset_commit_on_ack do
:brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}])
end
end

:ok
Expand Down Expand Up @@ -188,7 +190,11 @@ defmodule BroadwayKafka.BrodClient do

@impl true
def update_topics(group_coordinator, topics) do
:brod_group_coordinator.update_topics(group_coordinator, topics)
if group_coordinator do
:brod_group_coordinator.update_topics(group_coordinator, topics)
end

:ok
end

defp start_link_group_coordinator(stage_pid, client_id, callback_module, config) do
Expand Down
4 changes: 2 additions & 2 deletions lib/broadway_kafka/kafka_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule BroadwayKafka.KafkaClient do
}

@typep offset_reset_policy :: :earliest | :latest
@typep brod_group_coordinator :: pid()
@typep brod_group_coordinator :: pid() | nil

@callback init(opts :: any) :: {:ok, config} | {:error, any}
@callback setup(
Expand All @@ -26,7 +26,7 @@ defmodule BroadwayKafka.KafkaClient do
) ::
{:ok, group_coordinator :: brod_group_coordinator()} | {:error, any}
@callback ack(
group_coordinator :: brod_group_coordinator,
group_coordinator :: brod_group_coordinator(),
generation_id :: integer,
topic :: binary,
partition :: integer,
Expand Down

0 comments on commit 7fc8734

Please sign in to comment.