diff --git a/tests/MassTransit.KafkaIntegration.Tests/Receive_Specs.cs b/tests/MassTransit.KafkaIntegration.Tests/Receive_Specs.cs index a275471b2bd..b1484dcafdb 100644 --- a/tests/MassTransit.KafkaIntegration.Tests/Receive_Specs.cs +++ b/tests/MassTransit.KafkaIntegration.Tests/Receive_Specs.cs @@ -111,79 +111,6 @@ public interface KafkaMessage } } - - public class ConcurrentReceive_Specs : - InMemoryTestFixture - { - const string Topic = "test-concurrent"; - const int NumMessages = 100; - - [Test] - public async Task Should_receive_concurrently() - { - await using var provider = new ServiceCollection() - .ConfigureKafkaTestOptions(options => - { - options.CreateTopicsIfNotExists = true; - options.TopicNames = new[] { Topic }; - }) - .AddMassTransitTestHarness(x => - { - x.AddTaskCompletionSource>(); - x.AddRider(rider => - { - rider.AddConsumer(); - rider.AddProducer(Topic); - - rider.UsingKafka((context, k) => - { - k.TopicEndpoint(Topic, nameof(ConcurrentReceive_Specs), c => - { - c.AutoOffsetReset = AutoOffsetReset.Earliest; - c.CheckpointMessageCount = 10; - c.ConcurrentMessageLimit = NumMessages; - - c.ConfigureConsumer(context); - }); - }); - }); - }).BuildServiceProvider(); - - var harness = provider.GetTestHarness(); - await harness.Start(); - - ITopicProducer producer = harness.GetProducer(); - await Task.WhenAll(Enumerable.Range(0, NumMessages).Select(_ => producer.Produce(new { }, harness.CancellationToken))); - await provider.GetTask>(); - } - - - class KafkaMessageConsumer : - IConsumer - { - static int _index = NumMessages; - readonly TaskCompletionSource> _taskCompletionSource; - - public KafkaMessageConsumer(TaskCompletionSource> taskCompletionSource) - { - _taskCompletionSource = taskCompletionSource; - } - - public Task Consume(ConsumeContext context) - { - if (Interlocked.Decrement(ref _index) <= 0) - _taskCompletionSource.TrySetResult(context); - return Task.CompletedTask; - } - } - - - public interface KafkaMessage - { - } - } - - public class ConcurrentKeysReceive_Specs : InMemoryTestFixture { @@ -410,61 +337,6 @@ public interface KafkaMessage } } - - public class ReceiveWithPayload_Specs : - InMemoryTestFixture - { - const string Topic = "test-payload"; - - [Test] - public async Task Should_receive_with_payload() - { - await using var provider = new ServiceCollection() - .ConfigureKafkaTestOptions(options => - { - options.CreateTopicsIfNotExists = true; - options.TopicNames = new[] { Topic }; - }) - .AddMassTransitTestHarness(x => - { - x.AddTaskCompletionSource>(); - x.AddRider(rider => - { - rider.AddConsumer>(); - rider.AddProducer(Topic, (_, c) => c.SetKeySerializer(Serializers.Utf8)); - - rider.UsingKafka((context, k) => - { - k.TopicEndpoint(Topic, nameof(ReceiveWithPayload_Specs), c => - { - c.ConfigureConsumer>(context); - - c.SetKeyDeserializer(Deserializers.Utf8); - }); - }); - }); - }).BuildServiceProvider(); - - var harness = provider.GetRequiredService(); - await harness.Start(); - - ITopicProducer producer = harness.GetProducer(); - var key = NewId.NextGuid().ToString(); - await producer.Produce(key, new { }, harness.CancellationToken); - - var result = await provider.GetTask>(); - - Assert.IsTrue(result.TryGetPayload(out KafkaConsumeContext _)); - Assert.AreEqual(key, result.GetKey()); - } - - - public interface KafkaMessage - { - } - } - - public class MultiGroupReceive_Specs : InMemoryTestFixture {