Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom oauth bearer authentication support #548

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Collections.Generic;

namespace KafkaFlow.Authentication;

/// <summary>
/// Authentication handler for OAuth Bearer.
/// </summary>
public interface IOAuthBearerAuthenticator
{
/// <summary>
/// Set SASL/OAUTHBEARER token and metadata. The SASL/OAUTHBEARER token refresh callback or event handler should invoke this method upon
/// success. The extension keys must not include the reserved key "`auth`", and all extension keys and values must conform to the required
/// format as per https://tools.ietf.org/html/rfc7628#section-3.1.
/// </summary>
/// <param name="tokenValue">The mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1</param>
/// <param name="lifetimeMs">When the token expires, in terms of the number of milliseconds since the epoch</param>
/// <param name="principalName">The mandatory Kafka principal name associated with the token</param>
/// <param name="extensions">Optional SASL extensions dictionary, to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1</param>
void SetToken(string tokenValue, long lifetimeMs, string principalName, IDictionary<string, string> extensions = null);

/// <summary>
/// SASL/OAUTHBEARER token refresh failure indicator. The SASL/OAUTHBEARER token refresh callback or event handler should invoke this method upon failure.
/// </summary>
/// <param name="error">Mandatory human readable error reason for failing to acquire a token</param>
void SetTokenFailure(string error);
}
24 changes: 12 additions & 12 deletions src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
namespace KafkaFlow.Configuration
{
/// <summary>SaslOauthbearerMethod enum values</summary>
public enum SaslOauthbearerMethod
{
/// <summary>Default</summary>
Default,

/// <summary>Oidc</summary>
Oidc,
}
}
namespace KafkaFlow.Configuration
{
/// <summary>SaslOauthbearerMethod enum values</summary>
public enum SaslOauthbearerMethod
{
/// <summary>Default</summary>
Default,

/// <summary>Oidc</summary>
Oidc,
}
}
10 changes: 9 additions & 1 deletion src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using KafkaFlow.Authentication;

Check warning on line 1 in src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs

View workflow job for this annotation

GitHub Actions / build

Using directive for 'System' should appear before directive for 'KafkaFlow.Authentication' (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1208.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
using System;

Check warning on line 2 in src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive for 'System' should appear before directive for 'KafkaFlow.Authentication' (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1208.md)

namespace KafkaFlow.Configuration;

/// <summary>
Expand Down Expand Up @@ -249,4 +252,9 @@
/// importance: low
/// </summary>
public string SaslOauthbearerScope { get; set; }
}

/// <summary>
/// Gets or sets the OAuthBearerTokenRefreshHandler for custom OAuth authentication.
/// </summary>
public Action<IOAuthBearerAuthenticator> OAuthBearerTokenRefreshHandler { get; set; }
}
24 changes: 24 additions & 0 deletions src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Collections.Generic;
using Confluent.Kafka;

namespace KafkaFlow.Authentication;

internal readonly struct OAuthBearerAuthenticator : IOAuthBearerAuthenticator
{
private readonly IClient _client;

public OAuthBearerAuthenticator(IClient client)
{
_client = client;
}

public void SetToken(string tokenValue, long lifetimeMs, string principalName, IDictionary<string, string> extensions = null)
{
_client.OAuthBearerSetToken(tokenValue, lifetimeMs, principalName, extensions);
}

public void SetTokenFailure(string error)
{
_client.OAuthBearerSetTokenFailure(error);
}
}
19 changes: 17 additions & 2 deletions src/KafkaFlow/Clusters/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using KafkaFlow.Authentication;
using KafkaFlow.Configuration;

namespace KafkaFlow.Clusters;
Expand Down Expand Up @@ -32,8 +33,22 @@ public ClusterManager(ILogHandler logHandler, ClusterConfiguration configuration

config.ReadSecurityInformationFrom(configuration);

return new AdminClientBuilder(config)
.Build();
var adminClientBuilder = new AdminClientBuilder(config);

var security = configuration.GetSecurityInformation();

if (security?.OAuthBearerTokenRefreshHandler != null)
{
var handler = security.OAuthBearerTokenRefreshHandler;

adminClientBuilder.SetOAuthBearerTokenRefreshHandler((client, _) =>
{
var authenticator = new OAuthBearerAuthenticator(client);
handler(authenticator);
});
}

return adminClientBuilder.Build();
});
}

Expand Down
13 changes: 12 additions & 1 deletion src/KafkaFlow/Configuration/ClusterConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class ClusterConfiguration
private readonly List<IProducerConfiguration> _producers = new();
private readonly List<IConsumerConfiguration> _consumers = new();
private readonly ReadOnlyCollection<TopicConfiguration> _topicsToCreateIfNotExist;
private SecurityInformation _securityInformation;
private bool _securityInformationLoaded;

/// <summary>
/// Initializes a new instance of the <see cref="ClusterConfiguration"/> class.
Expand Down Expand Up @@ -103,5 +105,14 @@ public void AddProducers(IEnumerable<IProducerConfiguration> configurations) =>
/// Gets the kafka security information
/// </summary>
/// <returns></returns>
public SecurityInformation GetSecurityInformation() => _securityInformationHandler?.Invoke();
public SecurityInformation GetSecurityInformation()
{
if (!_securityInformationLoaded)
{
_securityInformation = _securityInformationHandler?.Invoke();
_securityInformationLoaded = true;
}

return _securityInformation;
}
}
49 changes: 30 additions & 19 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Authentication;
using KafkaFlow.Configuration;

namespace KafkaFlow.Consumers;
Expand Down Expand Up @@ -235,25 +236,35 @@ private void EnsureConsumer()

var kafkaConfig = this.Configuration.GetKafkaConfig();

var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(kafkaConfig);

_consumer =
consumerBuilder
.SetPartitionsAssignedHandler(
(consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions))
.SetPartitionsRevokedHandler(
(consumer, partitions) =>
{
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();

_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
})
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)))
.Build();
var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(kafkaConfig)
.SetPartitionsAssignedHandler(
(consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions))
.SetPartitionsRevokedHandler(
(consumer, partitions) =>
{
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
})
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));

var security = this.Configuration.ClusterConfiguration.GetSecurityInformation();

if (security?.OAuthBearerTokenRefreshHandler != null)
{
var handler = security.OAuthBearerTokenRefreshHandler;

consumerBuilder.SetOAuthBearerTokenRefreshHandler((client, _) =>
{
var authenticator = new OAuthBearerAuthenticator(client);
handler(authenticator);
});
}

_consumer = consumerBuilder.Build();

if (this.Configuration.Topics.Any())
{
Expand Down
14 changes: 14 additions & 0 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Authentication;
using KafkaFlow.Configuration;

namespace KafkaFlow.Producers;
Expand Down Expand Up @@ -261,6 +262,19 @@ private IProducer<byte[], byte[]> EnsureProducer()
}
});

var security = _configuration.Cluster.GetSecurityInformation();

if (security?.OAuthBearerTokenRefreshHandler != null)
{
var handler = security.OAuthBearerTokenRefreshHandler;

producerBuilder.SetOAuthBearerTokenRefreshHandler((client, _) =>
{
var authenticator = new OAuthBearerAuthenticator(client);
handler(authenticator);
});
}

return _producer = _configuration.CustomFactory(
producerBuilder.Build(),
_producerDependencyScope.Resolver);
Expand Down
Loading