Skip to content

Commit

Permalink
Update AppInsightsProducerEventsHandler.cs (#2)
Browse files Browse the repository at this point in the history
* Update AppInsightsProducerEventsHandler.cs

* Update AppInsightsProducerEventsHandler.cs

* Update ExtensionMethods.cs

* Update AppInsightsConsumerEventsHandler.cs

* Update AppInsightsProducerEventsHandler.cs

* Update AppInsightsProducerEventsHandler.cs
  • Loading branch information
dicko2 authored Dec 21, 2023
1 parent 55e9761 commit 95a514a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ public static Task OnConsumeStarted(IMessageContext eventContextMessageContext,
return Task.CompletedTask;
}

public static Task OnConsumeError(IMessageContext eventContextMessageContext, Exception eventContextException,
TelemetryClient telemetryClient)
public static Task OnConsumeError(IMessageContext eventContextMessageContext, Exception eventContextException)
{
eventContextMessageContext.Items.TryGetValue("telemetryClient", out var telemetryClientOut);
var telemetryClient = (TelemetryClient)telemetryClientOut;
telemetryClient.TrackException(eventContextException, new Dictionary<string, string>()
{
{"topic" , eventContextMessageContext.ConsumerContext.Topic},
Expand Down Expand Up @@ -50,4 +51,4 @@ public static Task OnConsumeCompleted(IMessageContext eventContextMessageContext
theTimer.Elapsed, "200", true);
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ public class AppInsightsProducerEventsHandler
public static Task OnProducerStarted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
{
eventContextMessageContext.Items.Add("timer", Stopwatch.StartNew());
eventContextMessageContext.Items.Add("telemetryClient", telemetryClient);
return Task.CompletedTask;
}

public static Task OnProducerError(IMessageContext eventContextMessageContext, Exception eventContextException,
TelemetryClient telemetryClient)
public static Task OnProducerError(IMessageContext eventContextMessageContext, Exception eventContextException)
{
eventContextMessageContext.Items.TryGetValue("telemetryClient", out var telemetryClientOut);
var telemetryClient = (TelemetryClient)telemetryClientOut;
telemetryClient.TrackException(eventContextException, new Dictionary<string, string>()
{
{"topic" , eventContextMessageContext.ProducerContext.Topic},
Expand All @@ -35,8 +37,10 @@ public static Task OnProducerError(IMessageContext eventContextMessageContext, E
return Task.CompletedTask;
}

public static Task OnProducerCompleted(IMessageContext eventContextMessageContext, TelemetryClient telemetryClient)
public static Task OnProducerCompleted(IMessageContext eventContextMessageContext)
{
eventContextMessageContext.Items.TryGetValue("telemetryClient", out var telemetryClientOut);
var telemetryClient = (TelemetryClient)telemetryClientOut;
eventContextMessageContext.Items.TryGetValue("timer", out var timer);
var theTimer = (Stopwatch)timer;
theTimer.Stop();
Expand All @@ -49,4 +53,4 @@ public static Task OnProducerCompleted(IMessageContext eventContextMessageContex
theTimer.Elapsed, "200", true);
return Task.CompletedTask;
}
}
}
8 changes: 4 additions & 4 deletions src/KafkaFlow.ApplicationInsights/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ public static IKafkaConfigurationBuilder AddAppInsightsInstrumentation(this IKaf
{
hub.MessageConsumeStarted.Subscribe(eventContext => AppInsightsConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext, eventContext.MessageContext.DependencyResolver.Resolve<TelemetryClient>()));

hub.MessageConsumeError.Subscribe(eventContext => AppInsightsConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception, eventContext.MessageContext.DependencyResolver.Resolve<TelemetryClient>()));
hub.MessageConsumeError.Subscribe(eventContext => AppInsightsConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));

hub.MessageConsumeCompleted.Subscribe(eventContext => AppInsightsConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));

hub.MessageProduceStarted.Subscribe(eventContext => AppInsightsProducerEventsHandler.OnProducerStarted(eventContext.MessageContext, eventContext.MessageContext.DependencyResolver.Resolve<TelemetryClient>()));

hub.MessageProduceError.Subscribe(eventContext => AppInsightsProducerEventsHandler.OnProducerError(eventContext.MessageContext, eventContext.Exception, eventContext.MessageContext.DependencyResolver.Resolve<TelemetryClient>()));
hub.MessageProduceError.Subscribe(eventContext => AppInsightsProducerEventsHandler.OnProducerError(eventContext.MessageContext, eventContext.Exception));

hub.MessageProduceCompleted.Subscribe(eventContext => AppInsightsProducerEventsHandler.OnProducerCompleted(eventContext.MessageContext, eventContext.MessageContext.DependencyResolver.Resolve<TelemetryClient>()));
hub.MessageProduceCompleted.Subscribe(eventContext => AppInsightsProducerEventsHandler.OnProducerCompleted(eventContext.MessageContext));
});

return builder;
}
}
}

0 comments on commit 95a514a

Please sign in to comment.