Skip to content

Commit

Permalink
brod supports sasl credentials from file (#135)
Browse files Browse the repository at this point in the history
`brod` supports having credentials in a file (which is good since they do not stay in the producer state and they are not exposed upon inspection).

https://github.com/kafka4beam/brod/blob/master/README.md?plain=1#L425-L428
  • Loading branch information
rewritten authored Jan 3, 2024
1 parent 6803054 commit 3f2ab90
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
26 changes: 18 additions & 8 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,25 @@ defmodule BroadwayKafka.BrodClient do
defp validate_option(:sasl, value = {:callback, _callback_module, _opts}),
do: {:ok, value}

defp validate_option(:sasl, {mechanism, username, password} = value)
when mechanism in [:plain, :scram_sha_256, :scram_sha_512] and
is_binary(username) and
is_binary(password) do
{:ok, value}
end

defp validate_option(:sasl, {mechanism, path} = value)
when mechanism in [:plain, :scram_sha_256, :scram_sha_512] and
is_binary(path) do
{:ok, value}
end

defp validate_option(:sasl, value) do
with {mechanism, username, password}
when mechanism in [:plain, :scram_sha_256, :scram_sha_512] and
is_binary(username) and
is_binary(password) <- value do
{:ok, value}
else
_value -> validation_error(:sasl, "a tuple of SASL mechanism, username and password", value)
end
validation_error(
:sasl,
"a tuple of SASL mechanism, username and password, or mechanism and path",
value
)
end

defp validate_option(:query_api_versions, value) when not is_boolean(value),
Expand Down
19 changes: 13 additions & 6 deletions test/brod_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,13 @@ defmodule BroadwayKafka.BrodClientTest do

assert BrodClient.init(opts) ==
{:error,
"expected :sasl to be a tuple of SASL mechanism, username and password, got: :an_atom"}
"expected :sasl to be a tuple of SASL mechanism, username and password, or mechanism and path, got: :an_atom"}

opts = put_in(@opts, [:client_config, :sasl], {:an_atom, "username", "password"})

assert BrodClient.init(opts) ==
{:error,
"expected :sasl to be a tuple of SASL mechanism, username and password, got: {:an_atom, \"username\", \"password\"}"}
"expected :sasl to be a tuple of SASL mechanism, username and password, or mechanism and path, got: {:an_atom, \"username\", \"password\"}"}

opts = put_in(@opts, [:client_config, :sasl], {:plain, "username", "password"})

Expand All @@ -273,6 +273,15 @@ defmodule BroadwayKafka.BrodClientTest do
sasl: {:plain, "username", "password"}
]
}} = BrodClient.init(opts)

opts = put_in(@opts, [:client_config, :sasl], {:plain, "filepath"})

assert {:ok, [],
%{
client_config: [
sasl: {:plain, "filepath"}
]
}} = BrodClient.init(opts)
end

test ":sasl is an optional tuple of :callback, SASL Authentication Plugin module and opts" do
Expand Down Expand Up @@ -390,8 +399,7 @@ defmodule BroadwayKafka.BrodClientTest do
%{
shared_client: true,
shared_client_id: :"my_prefix.Elixir.my_broadway_name.SharedClient"
}} =
BrodClient.init(opts)
}} = BrodClient.init(opts)

assert [
%{
Expand All @@ -414,8 +422,7 @@ defmodule BroadwayKafka.BrodClientTest do
%{
shared_client: false,
shared_client_id: nil
}} =
BrodClient.init(opts)
}} = BrodClient.init(opts)
end
end

Expand Down

0 comments on commit 3f2ab90

Please sign in to comment.