Skip to content

Commit

Permalink
feat!: async support on message type and schema registry resolvers
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-sousa-8 committed Sep 15, 2023
1 parent a30f81d commit e744fea
Show file tree
Hide file tree
Showing 25 changed files with 399 additions and 138 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace KafkaFlow
{
using System.Threading.Tasks;

/// <summary>
/// An interface to implement a type name resolver to messages serialized with schema registry serializers
/// </summary>
Expand All @@ -10,6 +12,6 @@ public interface ISchemaRegistryTypeNameResolver
/// </summary>
/// <param name="schemaId">Identifier of the schema</param>
/// <returns></returns>
string Resolve(int schemaId);
Task<string> ResolveAsync(int schemaId);
}
}
17 changes: 4 additions & 13 deletions src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,25 @@ namespace KafkaFlow
/// <summary>
/// The message type resolver to be used with schema registry serializers
/// </summary>
public class SchemaRegistryTypeResolver : IAsyncMessageTypeResolver
public class SchemaRegistryTypeResolver : IMessageTypeResolver
{
private static readonly ConcurrentDictionary<int, Type> Types = new();

private static readonly SemaphoreSlim Semaphore = new(1, 1);

private readonly IAsyncSchemaRegistryTypeNameResolver typeNameResolver;
private readonly ISchemaRegistryTypeNameResolver typeNameResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SchemaRegistryTypeResolver"/> class.
/// </summary>
/// <param name="typeNameResolver">A instance of the <see cref="ISchemaRegistryTypeNameResolver"/> interface.</param>
public SchemaRegistryTypeResolver(ISchemaRegistryTypeNameResolver typeNameResolver)
: this(new AsyncSchemaRegistryTypeNameResolverWrapper(typeNameResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SchemaRegistryTypeResolver"/> class.
/// </summary>
/// <param name="typeNameResolver">A instance of the <see cref="ISchemaRegistryTypeNameResolver"/> interface.</param>
public SchemaRegistryTypeResolver(IAsyncSchemaRegistryTypeNameResolver typeNameResolver)
{
this.typeNameResolver = typeNameResolver;
}

/// <inheritdoc />
public async Task<Type> OnConsumeAsync(IMessageContext context)
public async ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var schemaId = BinaryPrimitives.ReadInt32BigEndian(
((byte[]) context.Message.Value).AsSpan().Slice(1, 4));
Expand Down Expand Up @@ -70,6 +61,6 @@ public async Task<Type> OnConsumeAsync(IMessageContext context)
}

/// <inheritdoc />
public Task OnProduceAsync(IMessageContext context) => Task.CompletedTask;
public ValueTask OnProduceAsync(IMessageContext context) => default(ValueTask);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
namespace KafkaFlow
namespace KafkaFlow.Serializer.SchemaRegistry
{
using System.Threading.Tasks;
using Confluent.SchemaRegistry;
using Newtonsoft.Json;

internal class ConfluentAvroTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver
internal class ConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver
{
private readonly ISchemaRegistryClient client;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace KafkaFlow
using Google.Protobuf;
using Google.Protobuf.Reflection;

internal class ConfluentProtobufTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver
internal class ConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver
{
private readonly ISchemaRegistryClient client;

Expand Down
21 changes: 0 additions & 21 deletions src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs

This file was deleted.

13 changes: 8 additions & 5 deletions src/KafkaFlow.Serializer/DefaultTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

internal class DefaultTypeResolver : IMessageTypeResolver
{
private const string MessageType = "Message-Type";

public Type OnConsume(IMessageContext context)
public ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);

return typeName is null ?
null :
Type.GetType(typeName);
new ValueTask<Type>((Type) null) :
new ValueTask<Type>(Type.GetType(typeName));
}

public void OnProduce(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
if (context.Message.Value is null)
{
return;
return default(ValueTask);
}

var messageType = context.Message.Value.GetType();

context.Headers.SetString(
MessageType,
$"{messageType.FullName}, {messageType.Assembly.GetName().Name}");

return default(ValueTask);
}
}
}
24 changes: 0 additions & 24 deletions src/KafkaFlow.Serializer/IAsyncMessageTypeResolver.cs

This file was deleted.

5 changes: 3 additions & 2 deletions src/KafkaFlow.Serializer/IMessageTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Used by the serializer middleware to resolve the type when consuming and store it when producing
Expand All @@ -12,12 +13,12 @@ public interface IMessageTypeResolver
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
/// <returns></returns>
Type OnConsume(IMessageContext context);
ValueTask<Type> OnConsumeAsync(IMessageContext context);

/// <summary>
/// Stores the message type somewhere when producing
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
void OnProduce(IMessageContext context);
ValueTask OnProduceAsync(IMessageContext context);
}
}
1 change: 1 addition & 0 deletions src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.1.3" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

</Project>
15 changes: 1 addition & 14 deletions src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
public class SerializerConsumerMiddleware : IMessageMiddleware
{
private readonly ISerializer serializer;

private readonly IAsyncMessageTypeResolver typeResolver;
private readonly IMessageTypeResolver typeResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SerializerConsumerMiddleware"/> class.
Expand All @@ -21,18 +20,6 @@ public class SerializerConsumerMiddleware : IMessageMiddleware
public SerializerConsumerMiddleware(
ISerializer serializer,
IMessageTypeResolver typeResolver)
: this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SerializerConsumerMiddleware"/> class.
/// </summary>
/// <param name="serializer">Instance of <see cref="ISerializer"/></param>
/// <param name="typeResolver">Instance of <see cref="IAsyncMessageTypeResolver"/></param>
public SerializerConsumerMiddleware(
ISerializer serializer,
IAsyncMessageTypeResolver typeResolver)
{
this.serializer = serializer;
this.typeResolver = typeResolver;
Expand Down
14 changes: 1 addition & 13 deletions src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class SerializerProducerMiddleware : IMessageMiddleware

private readonly ISerializer serializer;

private readonly IAsyncMessageTypeResolver typeResolver;
private readonly IMessageTypeResolver typeResolver;

/// <summary>
/// Initializes a new instance of the <see cref="SerializerProducerMiddleware"/> class.
Expand All @@ -22,18 +22,6 @@ public class SerializerProducerMiddleware : IMessageMiddleware
public SerializerProducerMiddleware(
ISerializer serializer,
IMessageTypeResolver typeResolver)
: this(serializer, new AsyncMessageTypeResolverWrapper(typeResolver))
{
}

/// <summary>
/// Initializes a new instance of the <see cref="SerializerProducerMiddleware"/> class.
/// </summary>
/// <param name="serializer">Instance of <see cref="ISerializer"/></param>
/// <param name="typeResolver">Instance of <see cref="IAsyncMessageTypeResolver"/></param>
public SerializerProducerMiddleware(
ISerializer serializer,
IAsyncMessageTypeResolver typeResolver)
{
this.serializer = serializer;
this.typeResolver = typeResolver;
Expand Down
6 changes: 4 additions & 2 deletions src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// The message type resolver to be used when all messages are the same type
Expand All @@ -19,12 +20,13 @@ public SingleMessageTypeResolver(Type messageType)
}

/// <inheritdoc/>
public Type OnConsume(IMessageContext context) => this.messageType;
public ValueTask<Type> OnConsumeAsync(IMessageContext context) => new ValueTask<Type>(this.messageType);

/// <inheritdoc/>
public void OnProduce(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
// Do nothing
return default(ValueTask);
}
}
}
Loading

0 comments on commit e744fea

Please sign in to comment.