Skip to content

Commit

Permalink
Fix errors caught by Dialyzer (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
Stratus3D authored Apr 29, 2024
1 parent 17f624a commit 32199fe
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
16 changes: 11 additions & 5 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ defmodule BroadwayKafka.BrodClient do
offset_reset_policy: offset_reset_policy,
begin_offset: begin_offset,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
fetch_config: Map.new(fetch_config),
client_config: client_config,
shared_client: shared_client,
shared_client_id: build_shared_client_id(opts)
Expand Down Expand Up @@ -109,10 +109,12 @@ defmodule BroadwayKafka.BrodClient do

@impl true
def ack(group_coordinator, generation_id, topic, partition, offset, config) do
:brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset)
if group_coordinator do
:brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset)

if group_coordinator && config.offset_commit_on_ack do
:brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}])
if config.offset_commit_on_ack do
:brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}])
end
end

:ok
Expand Down Expand Up @@ -188,7 +190,11 @@ defmodule BroadwayKafka.BrodClient do

@impl true
def update_topics(group_coordinator, topics) do
:brod_group_coordinator.update_topics(group_coordinator, topics)
if group_coordinator do
:brod_group_coordinator.update_topics(group_coordinator, topics)
end

:ok
end

defp start_link_group_coordinator(stage_pid, client_id, callback_module, config) do
Expand Down
7 changes: 4 additions & 3 deletions lib/broadway_kafka/kafka_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule BroadwayKafka.KafkaClient do
}

@typep offset_reset_policy :: :earliest | :latest
@typep brod_group_coordinator :: pid() | nil

@callback init(opts :: any) :: {:ok, config} | {:error, any}
@callback setup(
Expand All @@ -23,9 +24,9 @@ defmodule BroadwayKafka.KafkaClient do
callback_module :: module,
config
) ::
{:ok, group_coordinator :: pid} | {:error, any}
{:ok, group_coordinator :: brod_group_coordinator()} | {:error, any}
@callback ack(
group_coordinator :: pid,
group_coordinator :: brod_group_coordinator(),
generation_id :: integer,
topic :: binary,
partition :: integer,
Expand All @@ -51,7 +52,7 @@ defmodule BroadwayKafka.KafkaClient do
) ::
offset :: integer | no_return()

@callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok
@callback update_topics(brod_group_coordinator(), [:brod.topic()]) :: :ok
@callback connected?(:brod.client()) :: boolean
@callback disconnect(:brod.client()) :: :ok
end

0 comments on commit 32199fe

Please sign in to comment.