Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report]: Scope Middleware is Disposed of in OnConsumeCompleted or OnConsumeError Global Events #588

Open
1 task done
c5racing opened this issue Aug 5, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@c5racing
Copy link

c5racing commented Aug 5, 2024

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

Using the setup below, I am able to resolve services in MessageConsumeStarted such as

using (var scope = context.DependencyResolver.CreateScope()) { //Code Here }

If I attempt to use the same code in OnConsumeCompleted or OnConsumeError, I get an error the following error:

System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)

In my message handlers, I'm injecting my DBContext for EF, so it's important that I'm able register these as scoped. I can't use Transient as I set different parameters that need to be available vi scoped DI down the pipeline.

services.AddKafkaFlowHostedService(kafka => kafka
                    .UseMicrosoftLog()
                    .SubscribeGlobalEvents(observers =>
                    {
                        observers.MessageConsumeStarted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext));
                        observers.MessageConsumeCompleted.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext));
                        observers.MessageConsumeError.Subscribe(async eventContext => await ActivityLogConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception));
                    })
                    .AddCluster(cluster => cluster
                    .WithBrokers(brokers)
                    .CreateTopicIfNotExists("sample-topic", 1, 1)
                    .AddConsumer(consumer => consumer
                    .Topic("sample-topic")
                    .WithGroupId("test-group")
                    .WithWorkersCount(1)
                    .WithAutoOffsetReset(AutoOffsetReset.Earliest)
                    .WithBufferSize(200)
                    .WithAutoCommitIntervalMs(2000)
                    .WithMaxPollIntervalMs(null)
                    .WithPartitionsRevokedHandler((resolver, partitions) =>
                    {
                        var logger = resolver.Resolve<ILogHandler>();

                        partitions.ForEach(partition =>
                        {
                            logger?.Warning($"Partition is being revoked: [{partition}]", null);
                        });
                    })
                    .AddMiddlewares(middlewares => middlewares
                    .AddSingleTypeDeserializer<NewtonsoftJsonDeserializer>(typeof(BaseEntity))
                    .AddTypedHandlers(h => h.AddHandler<UpdateDbForLanguage>().WithHandlerLifetime(InstanceLifetime.Scoped))
                    ))
                    .EnableAdminMessages("kafkaflow.admin")
            ));
        });

Steps to reproduce

  1. Create Scope Consumer
  2. Create 3 Global Events, MessageConsumeStarted, MessageConsumeCompleted, and MessageConsumeError
  3. Observe dependencies can be resolved in MessageConsumeStarted but not in MessageConsumeCompleted or MessageConsumeError

Expected behavior

All Global Events can resolve dependencies

Actual behavior

In the Global Event Handlers, OnConsumeCompleted or OnConsumeError, Dependancies cannot be resolved.

Using var scope = context.DependencyResolver.CreateScope() or context.DependencyResolver.Resolve<BaseEntity>(); results in the folowwing error:

System.AggregateException: One or more errors occurred. (Instances cannot be resolved and nested lifetimes cannot be created from this LifetimeScope as it (or one of its parent scopes) has already been disposed.)

KafkaFlow version

3.0.10

@c5racing c5racing added the bug Something isn't working label Aug 5, 2024
@tomaszprasolek
Copy link

I have similar problem. I have the error:

System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.

on MessageConsumeError

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

No branches or pull requests

2 participants