Skip to content

Commit

Permalink
Remove some duplicated tests in kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed Jan 25, 2023
1 parent 8a1d1c0 commit 513538b
Showing 1 changed file with 0 additions and 128 deletions.
128 changes: 0 additions & 128 deletions tests/MassTransit.KafkaIntegration.Tests/Receive_Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,79 +111,6 @@ public interface KafkaMessage
}
}


public class ConcurrentReceive_Specs :
InMemoryTestFixture
{
const string Topic = "test-concurrent";
const int NumMessages = 100;

[Test]
public async Task Should_receive_concurrently()
{
await using var provider = new ServiceCollection()
.ConfigureKafkaTestOptions(options =>
{
options.CreateTopicsIfNotExists = true;
options.TopicNames = new[] { Topic };
})
.AddMassTransitTestHarness(x =>
{
x.AddTaskCompletionSource<ConsumeContext<KafkaMessage>>();
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.AddProducer<KafkaMessage>(Topic);

rider.UsingKafka((context, k) =>
{
k.TopicEndpoint<KafkaMessage>(Topic, nameof(ConcurrentReceive_Specs), c =>
{
c.AutoOffsetReset = AutoOffsetReset.Earliest;
c.CheckpointMessageCount = 10;
c.ConcurrentMessageLimit = NumMessages;

c.ConfigureConsumer<KafkaMessageConsumer>(context);
});
});
});
}).BuildServiceProvider();

var harness = provider.GetTestHarness();
await harness.Start();

ITopicProducer<KafkaMessage> producer = harness.GetProducer<KafkaMessage>();
await Task.WhenAll(Enumerable.Range(0, NumMessages).Select(_ => producer.Produce(new { }, harness.CancellationToken)));
await provider.GetTask<ConsumeContext<KafkaMessage>>();
}


class KafkaMessageConsumer :
IConsumer<KafkaMessage>
{
static int _index = NumMessages;
readonly TaskCompletionSource<ConsumeContext<KafkaMessage>> _taskCompletionSource;

public KafkaMessageConsumer(TaskCompletionSource<ConsumeContext<KafkaMessage>> taskCompletionSource)
{
_taskCompletionSource = taskCompletionSource;
}

public Task Consume(ConsumeContext<KafkaMessage> context)
{
if (Interlocked.Decrement(ref _index) <= 0)
_taskCompletionSource.TrySetResult(context);
return Task.CompletedTask;
}
}


public interface KafkaMessage
{
}
}


public class ConcurrentKeysReceive_Specs :
InMemoryTestFixture
{
Expand Down Expand Up @@ -410,61 +337,6 @@ public interface KafkaMessage
}
}


public class ReceiveWithPayload_Specs :
InMemoryTestFixture
{
const string Topic = "test-payload";

[Test]
public async Task Should_receive_with_payload()
{
await using var provider = new ServiceCollection()
.ConfigureKafkaTestOptions(options =>
{
options.CreateTopicsIfNotExists = true;
options.TopicNames = new[] { Topic };
})
.AddMassTransitTestHarness(x =>
{
x.AddTaskCompletionSource<ConsumeContext<KafkaMessage>>();
x.AddRider(rider =>
{
rider.AddConsumer<TestKafkaMessageConsumer<KafkaMessage>>();
rider.AddProducer<string, KafkaMessage>(Topic, (_, c) => c.SetKeySerializer(Serializers.Utf8));

rider.UsingKafka((context, k) =>
{
k.TopicEndpoint<string, KafkaMessage>(Topic, nameof(ReceiveWithPayload_Specs), c =>
{
c.ConfigureConsumer<TestKafkaMessageConsumer<KafkaMessage>>(context);

c.SetKeyDeserializer(Deserializers.Utf8);
});
});
});
}).BuildServiceProvider();

var harness = provider.GetRequiredService<ITestHarness>();
await harness.Start();

ITopicProducer<string, KafkaMessage> producer = harness.GetProducer<string, KafkaMessage>();
var key = NewId.NextGuid().ToString();
await producer.Produce(key, new { }, harness.CancellationToken);

var result = await provider.GetTask<ConsumeContext<KafkaMessage>>();

Assert.IsTrue(result.TryGetPayload(out KafkaConsumeContext<string> _));
Assert.AreEqual(key, result.GetKey<string>());
}


public interface KafkaMessage
{
}
}


public class MultiGroupReceive_Specs :
InMemoryTestFixture
{
Expand Down

0 comments on commit 513538b

Please sign in to comment.