Skip to content

Commit

Permalink
feat!: async support on message type and schema registry resolvers
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-sousa-8 committed Sep 15, 2023
1 parent a30f81d commit da43538
Show file tree
Hide file tree
Showing 28 changed files with 403 additions and 143 deletions.
1 change: 1 addition & 0 deletions samples/KafkaFlow.Sample.SchemaRegistry/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using KafkaFlow;
using KafkaFlow.Producers;
using KafkaFlow.Sample.SchemaRegistry.Handlers;
using KafkaFlow.Serializer.SchemaRegistry;
using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;
using SchemaRegistry;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace KafkaFlow
{
using System.Threading.Tasks;

/// <summary>
/// An interface to implement a type name resolver to messages serialized with schema registry serializers
/// </summary>
Expand All @@ -10,6 +12,6 @@ public interface ISchemaRegistryTypeNameResolver
/// </summary>
/// <param name="schemaId">Identifier of the schema</param>
/// <returns></returns>
string Resolve(int schemaId);
Task<string> ResolveAsync(int schemaId);
}
}
17 changes: 4 additions & 13 deletions src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,25 @@ namespace KafkaFlow
/// <summary>
/// The message type resolver to be used with schema registry serializers
/// </summary>
public class SchemaRegistryTypeResolver : IAsyncMessageTypeResolver
public class SchemaRegistryTypeResolver : IMessageTypeResolver
{
private static readonly ConcurrentDictionary<int, Type> Types = new();

private static readonly SemaphoreSlim Semaphore = new(1, 1);

private readonly IAsyncSchemaRegistryTypeNameResolver typeNameResolver;
private readonly ISchemaRegistryTypeNameResolver typeNameResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SchemaRegistryTypeResolver"/> class.
/// </summary>
/// <param name="typeNameResolver">A instance of the <see cref="ISchemaRegistryTypeNameResolver"/> interface.</param>
public SchemaRegistryTypeResolver(ISchemaRegistryTypeNameResolver typeNameResolver)
: this(new AsyncSchemaRegistryTypeNameResolverWrapper(typeNameResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SchemaRegistryTypeResolver"/> class.
/// </summary>
/// <param name="typeNameResolver">A instance of the <see cref="ISchemaRegistryTypeNameResolver"/> interface.</param>
public SchemaRegistryTypeResolver(IAsyncSchemaRegistryTypeNameResolver typeNameResolver)
{
this.typeNameResolver = typeNameResolver;
}

/// <inheritdoc />
public async Task<Type> OnConsumeAsync(IMessageContext context)
public async ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var schemaId = BinaryPrimitives.ReadInt32BigEndian(
((byte[]) context.Message.Value).AsSpan().Slice(1, 4));
Expand Down Expand Up @@ -70,6 +61,6 @@ public async Task<Type> OnConsumeAsync(IMessageContext context)
}

/// <inheritdoc />
public Task OnProduceAsync(IMessageContext context) => Task.CompletedTask;
public ValueTask OnProduceAsync(IMessageContext context) => default(ValueTask);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
namespace KafkaFlow
namespace KafkaFlow.Serializer.SchemaRegistry
{
using System.Threading.Tasks;
using Confluent.SchemaRegistry;
using Newtonsoft.Json;

internal class ConfluentAvroTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver
internal class ConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver
{
private readonly ISchemaRegistryClient client;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
namespace KafkaFlow
namespace KafkaFlow.Serializer.SchemaRegistry
{
using System.Linq;
using System.Threading.Tasks;
using Confluent.SchemaRegistry;
using Google.Protobuf;
using Google.Protobuf.Reflection;

internal class ConfluentProtobufTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver
internal class ConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver
{
private readonly ISchemaRegistryClient client;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace KafkaFlow
namespace KafkaFlow.Serializer.SchemaRegistry
{
using Confluent.SchemaRegistry;
using KafkaFlow.Configuration;
using KafkaFlow.Serializer.SchemaRegistry;

/// <summary>
/// No needed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
namespace KafkaFlow
namespace KafkaFlow.Serializer.SchemaRegistry
{
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using KafkaFlow.Configuration;
using KafkaFlow.Serializer.SchemaRegistry;

/// <summary>
/// No needed
Expand Down
21 changes: 0 additions & 21 deletions src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs

This file was deleted.

13 changes: 8 additions & 5 deletions src/KafkaFlow.Serializer/DefaultTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

internal class DefaultTypeResolver : IMessageTypeResolver
{
private const string MessageType = "Message-Type";

public Type OnConsume(IMessageContext context)
public ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);

return typeName is null ?
null :
Type.GetType(typeName);
new ValueTask<Type>((Type) null) :
new ValueTask<Type>(Type.GetType(typeName));
}

public void OnProduce(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
if (context.Message.Value is null)
{
return;
return default(ValueTask);
}

var messageType = context.Message.Value.GetType();

context.Headers.SetString(
MessageType,
$"{messageType.FullName}, {messageType.Assembly.GetName().Name}");

return default(ValueTask);
}
}
}
24 changes: 0 additions & 24 deletions src/KafkaFlow.Serializer/IAsyncMessageTypeResolver.cs

This file was deleted.

5 changes: 3 additions & 2 deletions src/KafkaFlow.Serializer/IMessageTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Used by the serializer middleware to resolve the type when consuming and store it when producing
Expand All @@ -12,12 +13,12 @@ public interface IMessageTypeResolver
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
/// <returns></returns>
Type OnConsume(IMessageContext context);
ValueTask<Type> OnConsumeAsync(IMessageContext context);

/// <summary>
/// Stores the message type somewhere when producing
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
void OnProduce(IMessageContext context);
ValueTask OnProduceAsync(IMessageContext context);
}
}
1 change: 1 addition & 0 deletions src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.1.3" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

</Project>
15 changes: 1 addition & 14 deletions src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
public class SerializerConsumerMiddleware : IMessageMiddleware
{
private readonly ISerializer serializer;

private readonly IAsyncMessageTypeResolver typeResolver;
private readonly IMessageTypeResolver typeResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SerializerConsumerMiddleware"/> class.
Expand All @@ -21,18 +20,6 @@ public class SerializerConsumerMiddleware : IMessageMiddleware
public SerializerConsumerMiddleware(
ISerializer serializer,
IMessageTypeResolver typeResolver)
: this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SerializerConsumerMiddleware"/> class.
/// </summary>
/// <param name="serializer">Instance of <see cref="ISerializer"/></param>
/// <param name="typeResolver">Instance of <see cref="IAsyncMessageTypeResolver"/></param>
public SerializerConsumerMiddleware(
ISerializer serializer,
IAsyncMessageTypeResolver typeResolver)
{
this.serializer = serializer;
this.typeResolver = typeResolver;
Expand Down
14 changes: 1 addition & 13 deletions src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class SerializerProducerMiddleware : IMessageMiddleware

private readonly ISerializer serializer;

private readonly IAsyncMessageTypeResolver typeResolver;
private readonly IMessageTypeResolver typeResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SerializerProducerMiddleware"/> class.
Expand All @@ -22,18 +22,6 @@ public class SerializerProducerMiddleware : IMessageMiddleware
public SerializerProducerMiddleware(
ISerializer serializer,
IMessageTypeResolver typeResolver)
: this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SerializerProducerMiddleware"/> class.
/// </summary>
/// <param name="serializer">Instance of <see cref="ISerializer"/></param>
/// <param name="typeResolver">Instance of <see cref="IAsyncMessageTypeResolver"/></param>
public SerializerProducerMiddleware(
ISerializer serializer,
IAsyncMessageTypeResolver typeResolver)
{
this.serializer = serializer;
this.typeResolver = typeResolver;
Expand Down
6 changes: 4 additions & 2 deletions src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// The message type resolver to be used when all messages are the same type
Expand All @@ -19,12 +20,13 @@ public SingleMessageTypeResolver(Type messageType)
}

/// <inheritdoc/>
public Type OnConsume(IMessageContext context) => this.messageType;
public ValueTask<Type> OnConsumeAsync(IMessageContext context) => new ValueTask<Type>(this.messageType);

/// <inheritdoc/>
public void OnProduce(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
// Do nothing
return default(ValueTask);
}
}
}
Loading

0 comments on commit da43538

Please sign in to comment.