From 188b21d3b6becb80578e86e23e7f9a0e67ce8f88 Mon Sep 17 00:00:00 2001
From: Robert Coltheart <13191652+robertcoltheart@users.noreply.github.com>
Date: Tue, 9 Apr 2024 19:08:14 +1000
Subject: [PATCH] feat: add custom oauth bearer authentication support (#548)
Co-authored-by: kikofps <22330887+kikofps@users.noreply.github.com>
---
.../IOAuthBearerAuthenticator.cs | 26 ++++++++++
.../Configuration/SaslOauthbearerMethod.cs | 24 ++++-----
.../Configuration/SecurityInformation.cs | 10 +++-
.../OAuthBearerAuthenticator.cs | 24 +++++++++
src/KafkaFlow/Clusters/ClusterManager.cs | 19 ++++++-
.../Configuration/ClusterConfiguration.cs | 13 ++++-
src/KafkaFlow/Consumers/Consumer.cs | 49 ++++++++++++-------
src/KafkaFlow/Producers/MessageProducer.cs | 14 ++++++
8 files changed, 144 insertions(+), 35 deletions(-)
create mode 100644 src/KafkaFlow.Abstractions/Authentication/IOAuthBearerAuthenticator.cs
create mode 100644 src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs
diff --git a/src/KafkaFlow.Abstractions/Authentication/IOAuthBearerAuthenticator.cs b/src/KafkaFlow.Abstractions/Authentication/IOAuthBearerAuthenticator.cs
new file mode 100644
index 000000000..21953f160
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/Authentication/IOAuthBearerAuthenticator.cs
@@ -0,0 +1,26 @@
+using System.Collections.Generic;
+
+namespace KafkaFlow.Authentication;
+
+///
+/// Authentication handler for OAuth Bearer.
+///
+public interface IOAuthBearerAuthenticator
+{
+ ///
+ /// Set SASL/OAUTHBEARER token and metadata. The SASL/OAUTHBEARER token refresh callback or event handler should invoke this method upon
+ /// success. The extension keys must not include the reserved key "`auth`", and all extension keys and values must conform to the required
+ /// format as per https://tools.ietf.org/html/rfc7628#section-3.1.
+ ///
+ /// The mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1
+ /// When the token expires, in terms of the number of milliseconds since the epoch
+ /// The mandatory Kafka principal name associated with the token
+ /// Optional SASL extensions dictionary, to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1
+ void SetToken(string tokenValue, long lifetimeMs, string principalName, IDictionary extensions = null);
+
+ ///
+ /// SASL/OAUTHBEARER token refresh failure indicator. The SASL/OAUTHBEARER token refresh callback or event handler should invoke this method upon failure.
+ ///
+ /// Mandatory human readable error reason for failing to acquire a token
+ void SetTokenFailure(string error);
+}
diff --git a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
index 26d64c25e..3b4177c05 100644
--- a/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/SaslOauthbearerMethod.cs
@@ -1,12 +1,12 @@
-namespace KafkaFlow.Configuration
-{
- /// SaslOauthbearerMethod enum values
- public enum SaslOauthbearerMethod
- {
- /// Default
- Default,
-
- /// Oidc
- Oidc,
- }
-}
+namespace KafkaFlow.Configuration
+{
+ /// SaslOauthbearerMethod enum values
+ public enum SaslOauthbearerMethod
+ {
+ /// Default
+ Default,
+
+ /// Oidc
+ Oidc,
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs b/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs
index f6b614289..2c0654282 100644
--- a/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/SecurityInformation.cs
@@ -1,3 +1,6 @@
+using KafkaFlow.Authentication;
+using System;
+
namespace KafkaFlow.Configuration;
///
@@ -249,4 +252,9 @@ public class SecurityInformation
/// importance: low
///
public string SaslOauthbearerScope { get; set; }
-}
\ No newline at end of file
+
+ ///
+ /// Gets or sets the OAuthBearerTokenRefreshHandler for custom OAuth authentication.
+ ///
+ public Action OAuthBearerTokenRefreshHandler { get; set; }
+}
diff --git a/src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs b/src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs
new file mode 100644
index 000000000..7aa57f3a1
--- /dev/null
+++ b/src/KafkaFlow/Authentication/OAuthBearerAuthenticator.cs
@@ -0,0 +1,24 @@
+using System.Collections.Generic;
+using Confluent.Kafka;
+
+namespace KafkaFlow.Authentication;
+
+internal readonly struct OAuthBearerAuthenticator : IOAuthBearerAuthenticator
+{
+ private readonly IClient _client;
+
+ public OAuthBearerAuthenticator(IClient client)
+ {
+ _client = client;
+ }
+
+ public void SetToken(string tokenValue, long lifetimeMs, string principalName, IDictionary extensions = null)
+ {
+ _client.OAuthBearerSetToken(tokenValue, lifetimeMs, principalName, extensions);
+ }
+
+ public void SetTokenFailure(string error)
+ {
+ _client.OAuthBearerSetTokenFailure(error);
+ }
+}
diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs
index 28e7381f0..7a4b84620 100644
--- a/src/KafkaFlow/Clusters/ClusterManager.cs
+++ b/src/KafkaFlow/Clusters/ClusterManager.cs
@@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
+using KafkaFlow.Authentication;
using KafkaFlow.Configuration;
namespace KafkaFlow.Clusters;
@@ -32,8 +33,22 @@ public ClusterManager(ILogHandler logHandler, ClusterConfiguration configuration
config.ReadSecurityInformationFrom(configuration);
- return new AdminClientBuilder(config)
- .Build();
+ var adminClientBuilder = new AdminClientBuilder(config);
+
+ var security = configuration.GetSecurityInformation();
+
+ if (security?.OAuthBearerTokenRefreshHandler != null)
+ {
+ var handler = security.OAuthBearerTokenRefreshHandler;
+
+ adminClientBuilder.SetOAuthBearerTokenRefreshHandler((client, _) =>
+ {
+ var authenticator = new OAuthBearerAuthenticator(client);
+ handler(authenticator);
+ });
+ }
+
+ return adminClientBuilder.Build();
});
}
diff --git a/src/KafkaFlow/Configuration/ClusterConfiguration.cs b/src/KafkaFlow/Configuration/ClusterConfiguration.cs
index c1b3fee25..6e35340a0 100644
--- a/src/KafkaFlow/Configuration/ClusterConfiguration.cs
+++ b/src/KafkaFlow/Configuration/ClusterConfiguration.cs
@@ -14,6 +14,8 @@ public class ClusterConfiguration
private readonly List _producers = new();
private readonly List _consumers = new();
private readonly ReadOnlyCollection _topicsToCreateIfNotExist;
+ private SecurityInformation _securityInformation;
+ private bool _securityInformationLoaded;
///
/// Initializes a new instance of the class.
@@ -103,5 +105,14 @@ public void AddProducers(IEnumerable configurations) =>
/// Gets the kafka security information
///
///
- public SecurityInformation GetSecurityInformation() => _securityInformationHandler?.Invoke();
+ public SecurityInformation GetSecurityInformation()
+ {
+ if (!_securityInformationLoaded)
+ {
+ _securityInformation = _securityInformationHandler?.Invoke();
+ _securityInformationLoaded = true;
+ }
+
+ return _securityInformation;
+ }
}
diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs
index 8dad2ab5d..099b5b482 100644
--- a/src/KafkaFlow/Consumers/Consumer.cs
+++ b/src/KafkaFlow/Consumers/Consumer.cs
@@ -5,6 +5,7 @@
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
+using KafkaFlow.Authentication;
using KafkaFlow.Configuration;
namespace KafkaFlow.Consumers;
@@ -235,25 +236,35 @@ private void EnsureConsumer()
var kafkaConfig = this.Configuration.GetKafkaConfig();
- var consumerBuilder = new ConsumerBuilder(kafkaConfig);
-
- _consumer =
- consumerBuilder
- .SetPartitionsAssignedHandler(
- (consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions))
- .SetPartitionsRevokedHandler(
- (consumer, partitions) =>
- {
- this.Assignment = new List();
- this.Subscription = new List();
- _currentPartitionsOffsets.Clear();
- _flowManager.Stop();
-
- _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
- })
- .SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
- .SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)))
- .Build();
+ var consumerBuilder = new ConsumerBuilder(kafkaConfig)
+ .SetPartitionsAssignedHandler(
+ (consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions))
+ .SetPartitionsRevokedHandler(
+ (consumer, partitions) =>
+ {
+ this.Assignment = new List();
+ this.Subscription = new List();
+ _currentPartitionsOffsets.Clear();
+ _flowManager.Stop();
+ _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
+ })
+ .SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
+ .SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));
+
+ var security = this.Configuration.ClusterConfiguration.GetSecurityInformation();
+
+ if (security?.OAuthBearerTokenRefreshHandler != null)
+ {
+ var handler = security.OAuthBearerTokenRefreshHandler;
+
+ consumerBuilder.SetOAuthBearerTokenRefreshHandler((client, _) =>
+ {
+ var authenticator = new OAuthBearerAuthenticator(client);
+ handler(authenticator);
+ });
+ }
+
+ _consumer = consumerBuilder.Build();
if (this.Configuration.Topics.Any())
{
diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs
index f696214f4..13161360f 100644
--- a/src/KafkaFlow/Producers/MessageProducer.cs
+++ b/src/KafkaFlow/Producers/MessageProducer.cs
@@ -2,6 +2,7 @@
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
+using KafkaFlow.Authentication;
using KafkaFlow.Configuration;
namespace KafkaFlow.Producers;
@@ -261,6 +262,19 @@ private IProducer EnsureProducer()
}
});
+ var security = _configuration.Cluster.GetSecurityInformation();
+
+ if (security?.OAuthBearerTokenRefreshHandler != null)
+ {
+ var handler = security.OAuthBearerTokenRefreshHandler;
+
+ producerBuilder.SetOAuthBearerTokenRefreshHandler((client, _) =>
+ {
+ var authenticator = new OAuthBearerAuthenticator(client);
+ handler(authenticator);
+ });
+ }
+
return _producer = _configuration.CustomFactory(
producerBuilder.Build(),
_producerDependencyScope.Resolver);