diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index e291eb3..7a29a04 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -297,15 +297,25 @@ defmodule BroadwayKafka.BrodClient do defp validate_option(:sasl, value = {:callback, _callback_module, _opts}), do: {:ok, value} + defp validate_option(:sasl, {mechanism, username, password} = value) + when mechanism in [:plain, :scram_sha_256, :scram_sha_512] and + is_binary(username) and + is_binary(password) do + {:ok, value} + end + + defp validate_option(:sasl, {mechanism, path} = value) + when mechanism in [:plain, :scram_sha_256, :scram_sha_512] and + is_binary(path) do + {:ok, value} + end + defp validate_option(:sasl, value) do - with {mechanism, username, password} - when mechanism in [:plain, :scram_sha_256, :scram_sha_512] and - is_binary(username) and - is_binary(password) <- value do - {:ok, value} - else - _value -> validation_error(:sasl, "a tuple of SASL mechanism, username and password", value) - end + validation_error( + :sasl, + "a tuple of SASL mechanism, username and password, or mechanism and path", + value + ) end defp validate_option(:query_api_versions, value) when not is_boolean(value), diff --git a/test/brod_client_test.exs b/test/brod_client_test.exs index 121ad05..8419d5b 100644 --- a/test/brod_client_test.exs +++ b/test/brod_client_test.exs @@ -257,13 +257,13 @@ defmodule BroadwayKafka.BrodClientTest do assert BrodClient.init(opts) == {:error, - "expected :sasl to be a tuple of SASL mechanism, username and password, got: :an_atom"} + "expected :sasl to be a tuple of SASL mechanism, username and password, or mechanism and path, got: :an_atom"} opts = put_in(@opts, [:client_config, :sasl], {:an_atom, "username", "password"}) assert BrodClient.init(opts) == {:error, - "expected :sasl to be a tuple of SASL mechanism, username and password, got: {:an_atom, \"username\", \"password\"}"} + "expected :sasl to be a tuple of SASL mechanism, username and password, or mechanism and path, got: {:an_atom, \"username\", \"password\"}"} opts = put_in(@opts, [:client_config, :sasl], {:plain, "username", "password"}) @@ -273,6 +273,15 @@ defmodule BroadwayKafka.BrodClientTest do sasl: {:plain, "username", "password"} ] }} = BrodClient.init(opts) + + opts = put_in(@opts, [:client_config, :sasl], {:plain, "filepath"}) + + assert {:ok, [], + %{ + client_config: [ + sasl: {:plain, "filepath"} + ] + }} = BrodClient.init(opts) end test ":sasl is an optional tuple of :callback, SASL Authentication Plugin module and opts" do @@ -390,8 +399,7 @@ defmodule BroadwayKafka.BrodClientTest do %{ shared_client: true, shared_client_id: :"my_prefix.Elixir.my_broadway_name.SharedClient" - }} = - BrodClient.init(opts) + }} = BrodClient.init(opts) assert [ %{ @@ -414,8 +422,7 @@ defmodule BroadwayKafka.BrodClientTest do %{ shared_client: false, shared_client_id: nil - }} = - BrodClient.init(opts) + }} = BrodClient.init(opts) end end