Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include Open Telemetry tracing #451

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
Expand Down
38 changes: 38 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace KafkaFlow.Configuration
{
/// <summary>
/// Provides access to events fired by the internals of the library
/// </summary>
public interface IGlobalEvents
{
/// <summary>
/// Gets the message consume completed event
/// </summary>
IEvent<MessageEventContext> MessageConsumeCompleted { get; }

/// <summary>
/// Gets the message consume error event
/// </summary>
IEvent<MessageErrorEventContext> MessageConsumeError { get; }

/// <summary>
/// Gets the message consume started event
/// </summary>
IEvent<MessageEventContext> MessageConsumeStarted { get; }

/// <summary>
/// Gets the message produce completed event
/// </summary>
IEvent<MessageEventContext> MessageProduceCompleted { get; }

/// <summary>
/// Gets the message produce error event
/// </summary>
IEvent<MessageErrorEventContext> MessageProduceError { get; }

/// <summary>
/// Gets the message produce started event
/// </summary>
IEvent<MessageEventContext> MessageProduceStarted { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,12 @@ public interface IKafkaConfigurationBuilder
/// <returns></returns>
IKafkaConfigurationBuilder UseLogHandler<TLogHandler>()
where TLogHandler : ILogHandler;

/// <summary>
/// Subscribe the global events defined in <see cref="IGlobalEvents"/>
/// </summary>
/// <param name="observers">A handle to subscribe the events</param>
/// <returns></returns>
IKafkaConfigurationBuilder SubscribeGlobalEvents(Action<IGlobalEvents> observers);
}
}
32 changes: 32 additions & 0 deletions src/KafkaFlow.Abstractions/IEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace KafkaFlow

Check warning on line 1 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / build

File is required to end with a single newline character [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
public interface IEvent
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<Task> handler);
}

/// <summary>
/// Represents an Event to be subscribed.
/// </summary>
/// <typeparam name="TArg">The argument expected by the event.</typeparam>
public interface IEvent<out TArg>
joelfoliveira marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Subscribes to the event.
/// </summary>
/// <param name="handler">The handler to be called when the event is fired.</param>
/// <returns>Event subscription reference</returns>
IEventSubscription Subscribe(Func<TArg, Task> handler);
}
}

Check warning on line 32 in src/KafkaFlow.Abstractions/IEvent.cs

View workflow job for this annotation

GitHub Actions / Test deployment

File is required to end with a single newline character
13 changes: 13 additions & 0 deletions src/KafkaFlow.Abstractions/IEventSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace KafkaFlow
{
/// <summary>
/// Represents an Event subscription.
/// </summary>
public interface IEventSubscription
{
/// <summary>
/// Cancels the subscription to the event.
/// </summary>
void Cancel();
}
}
6 changes: 6 additions & 0 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Collections.Generic;

/// <summary>
/// A context that contains the message and metadata
Expand All @@ -27,6 +28,11 @@ public interface IMessageContext
/// </summary>
IProducerContext ProducerContext { get; }

/// <summary>
/// Gets the items
/// </summary>
IDictionary<string, object> Items { get; }

/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
Expand Down
26 changes: 26 additions & 0 deletions src/KafkaFlow.Abstractions/MessageErrorEventContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace KafkaFlow
{
using System;

/// <summary>
/// Represents the errors in message context used in the events
/// </summary>
public class MessageErrorEventContext : MessageEventContext
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageErrorEventContext"/> class.
/// </summary>
/// <param name="messageContext">The message context</param>
joelfoliveira marked this conversation as resolved.
Show resolved Hide resolved
/// <param name="exception">The event exception</param>
public MessageErrorEventContext(IMessageContext messageContext, Exception exception)
: base(messageContext)
{
this.Exception = exception;
}

/// <summary>
/// Gets the exception
/// </summary>
public Exception Exception { get; }
}
}
22 changes: 22 additions & 0 deletions src/KafkaFlow.Abstractions/MessageEventContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace KafkaFlow
{
/// <summary>
/// Represents a message context used in the events
/// </summary>
public class MessageEventContext
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageEventContext"/> class.
/// </summary>
/// <param name="messageContext">The message context</param>
public MessageEventContext(IMessageContext messageContext)
{
this.MessageContext = messageContext;
}

/// <summary>
/// Gets the message context
/// </summary>
public IMessageContext MessageContext { get; }
}
}
3 changes: 3 additions & 0 deletions src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public BatchConsumeMessageContext(
{
this.ConsumerContext = consumer;
this.Message = new Message(null, batchMessage);
this.Items = new Dictionary<string, object>();
}

public Message Message { get; }
Expand All @@ -21,6 +22,8 @@ public BatchConsumeMessageContext(

public IProducerContext ProducerContext => null;

public IDictionary<string, object> Items { get; }

public IMessageContext SetMessage(object key, object value) =>
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message");

Expand Down
8 changes: 5 additions & 3 deletions src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
};

services.AddKafka(
kafka => kafka
.UseLogHandler<TraceLogHandler>()
kafka =>
{
kafka.UseLogHandler<TraceLogHandler>()
.AddCluster(
cluster => cluster
.WithBrokers(kafkaBrokers.Split(';'))
Expand Down Expand Up @@ -332,7 +333,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection
.DefaultTopic(GzipTopicName)
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()))));
.AddCompressor<GzipMessageCompressor>())));
});
filipeesch marked this conversation as resolved.
Show resolved Hide resolved

services.AddSingleton<JsonProducer>();
services.AddSingleton<JsonGzipProducer>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow.IntegrationTests.Core.Exceptions
{
using System;

public class ErrorExecutingMiddlewareException : Exception
{
public ErrorExecutingMiddlewareException(string middlewareName)
: base($"Exception thrown executing {middlewareName}")
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace KafkaFlow.IntegrationTests.Core.Exceptions
{
using System;

public class PartitionAssignmentException : Exception
{
private const string ExceptionMessage = "Partition assignment hasn't occurred yet.";

public PartitionAssignmentException()
: base(ExceptionMessage)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.IntegrationTests.Core.Producers
{
internal class JsonProducer2
{
}
}
Loading
Loading