Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Streams: SubFlow DSL is broken and requires lots of casting to be workable #7512

Open
Aaronontheweb opened this issue Mar 4, 2025 · 0 comments
Labels
akka-streams DX Developer experience issues - papercuts, footguns, and other non-bug problems.

Comments

@Aaronontheweb
Copy link
Member

Version Information
Version of Akka.NET? v1.5.38
Which Akka.NET Modules? Akka.Streams

Describe the bug

Here's a small reproduction sample using the GroupBy operator in Akka.Streams:

async Task Main()
{
    var system = ActorSystem.Create("TestSystem");

	// Generate a stable set of data records.
	var records = GenerateRecords(numEntities: 50, numPeriods: 10, recordsPerPeriod: 3);

	// Build the stream graph.
	// Group by EntityId with parallelism of 5.
	var stream = Source
		.From(records)
		.GroupBy(10, r => r.EntityId.GetHashCode() % 5)
		.Via(Flow.Create<DataRecord>()
			// Simulate some async processing.
			.SelectAsync(10, async record =>
			{
				await Task.Delay(10); // simulate async work
				return record;
			})
			// Aggregate the records for each group into a list.
			.Aggregate(new List<DataRecord>(), (list, record) =>
			{
				list.Add(record);
				return list;
			})
			// For each entity, group records by EventDate and select the record with the latest UpdateDate.
			.SelectMany(recordList =>
				recordList
					.GroupBy(r => r.EventDate)
					.SelectMany(g =>
					{
						var maxUpdate = g.Max(r => r.UpdateDate);
						return g.Where(r => r.UpdateDate == maxUpdate);
					}))
			// Group the resulting records into batches of 25.
			.Grouped(25)
			// Simulate asynchronous batch processing.
			.SelectAsync(5, async group =>
			{
				await Task.Delay(10); // simulate async batch work
				return group.Count();
			})
			.Async()
		).AsInstanceOf<SubFlow<int, NotUsed, IRunnableGraph<NotUsed>>>()
		.MergeSubstreams()
		.AsInstanceOf<Source<int, NotUsed>>()
		// Sum up the counts from each batch.
		.RunAggregate(0, (total, batchCount) => total + batchCount, system);

	// Wait for the stream to complete and output the total count.
	var finalCount = await stream;
	Console.WriteLine($"Total count: {finalCount}");

	await system.Terminate();
}

// A stable set of generated data records.
// Generates a record for each combination of entity, event date, and a number of updates per date.
// Using a fixed seed ensures the data remains stable between runs.
IEnumerable<DataRecord> GenerateRecords(int numEntities, int numPeriods, int recordsPerPeriod)
{
	var baseDate = DateTimeOffset.UtcNow.Date;
	var random = new Random(42); // fixed seed for stability

	for (int entityIndex = 0; entityIndex < numEntities; entityIndex++)
	{
		var entityId = $"Entity{entityIndex:D3}";
		for (int periodIndex = 0; periodIndex < numPeriods; periodIndex++)
		{
			var eventDate = baseDate.AddDays(-periodIndex);
			// Generate multiple records for the same event date with increasing update times.
			for (int recordIndex = 0; recordIndex < recordsPerPeriod; recordIndex++)
			{
				var updateDate = eventDate.AddMinutes(recordIndex * 10);
				// Generate a measurement value between 0 and 100.
				var measurement = random.NextDouble() * 100;
				yield return new DataRecord(entityId, eventDate, updateDate, measurement);
			}
		}
	}
}

// You can define other methods, fields, classes and namespaces here.
public sealed record DataRecord(string EntityId, DateTimeOffset EventDate, DateTimeOffset UpdateDate, double Measurement);

You can run this in LINQPad here: https://share.linqpad.net/bkgp72uf.linq

Expected behavior

I should be able to call .MergeSubstreams() and .RunAggregate(0, (total, batchCount) => total + batchCount, system); without having to cast the substreams back into their underlying parts.

Actual behavior

As you can see from the sample, I do have to cast them all currently.

Additional context

Fixing this might be considered a big breaking API change, potentially, even though it's really just up-casting from an interface IFlow<.... to SubFlow<.... Not sure what the best way to package this is but it's an issue that needs to be addressed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
akka-streams DX Developer experience issues - papercuts, footguns, and other non-bug problems.
Projects
None yet
Development

No branches or pull requests

1 participant