From 7fc87343f3758025d76a635a73c6939370346437 Mon Sep 17 00:00:00 2001 From: Trevor Brown Date: Mon, 29 Apr 2024 16:10:30 -0400 Subject: [PATCH] Address PR feedback --- lib/broadway_kafka/brod_client.ex | 14 ++++++++++---- lib/broadway_kafka/kafka_client.ex | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index fae2212..6171dd7 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -109,10 +109,12 @@ defmodule BroadwayKafka.BrodClient do @impl true def ack(group_coordinator, generation_id, topic, partition, offset, config) do - :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) + if group_coordinator do + :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) - if config.offset_commit_on_ack do - :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) + if config.offset_commit_on_ack do + :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) + end end :ok @@ -188,7 +190,11 @@ defmodule BroadwayKafka.BrodClient do @impl true def update_topics(group_coordinator, topics) do - :brod_group_coordinator.update_topics(group_coordinator, topics) + if group_coordinator do + :brod_group_coordinator.update_topics(group_coordinator, topics) + end + + :ok end defp start_link_group_coordinator(stage_pid, client_id, callback_module, config) do diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index 0976c56..4ae70fd 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -15,7 +15,7 @@ defmodule BroadwayKafka.KafkaClient do } @typep offset_reset_policy :: :earliest | :latest - @typep brod_group_coordinator :: pid() + @typep brod_group_coordinator :: pid() | nil @callback init(opts :: any) :: {:ok, config} | {:error, any} @callback setup( @@ -26,7 +26,7 @@ defmodule BroadwayKafka.KafkaClient do ) :: {:ok, group_coordinator :: brod_group_coordinator()} | {:error, any} @callback ack( - group_coordinator :: brod_group_coordinator, + group_coordinator :: brod_group_coordinator(), generation_id :: integer, topic :: binary, partition :: integer,