Skip to content

Commit

Permalink
feat: update message type resolver methods to return ValueTask instea…
Browse files Browse the repository at this point in the history
…d of Task
  • Loading branch information
jose-sousa committed Sep 15, 2023
1 parent 684338e commit 332fbd8
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 13 deletions.
4 changes: 2 additions & 2 deletions src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public SchemaRegistryTypeResolver(ISchemaRegistryTypeNameResolver typeNameResolv
}

/// <inheritdoc />
public async Task<Type> OnConsumeAsync(IMessageContext context)
public async ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var schemaId = BinaryPrimitives.ReadInt32BigEndian(
((byte[]) context.Message.Value).AsSpan().Slice(1, 4));
Expand Down Expand Up @@ -61,6 +61,6 @@ public async Task<Type> OnConsumeAsync(IMessageContext context)
}

/// <inheritdoc />
public Task OnProduceAsync(IMessageContext context) => Task.CompletedTask;
public ValueTask OnProduceAsync(IMessageContext context) => default(ValueTask);
}
}
12 changes: 7 additions & 5 deletions src/KafkaFlow.Serializer/DefaultTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ internal class DefaultTypeResolver : IMessageTypeResolver
{
private const string MessageType = "Message-Type";

public Task<Type> OnConsumeAsync(IMessageContext context)
public ValueTask<Type> OnConsumeAsync(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);

return typeName is null ? null : Task.FromResult(Type.GetType(typeName));
return typeName is null ?
new ValueTask<Type>((Type) null) :
new ValueTask<Type>(Type.GetType(typeName));
}

public Task OnProduceAsync(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
if (context.Message.Value is null)
{
return Task.CompletedTask;
return default(ValueTask);
}

var messageType = context.Message.Value.GetType();
Expand All @@ -27,7 +29,7 @@ public Task OnProduceAsync(IMessageContext context)
MessageType,
$"{messageType.FullName}, {messageType.Assembly.GetName().Name}");

return Task.CompletedTask;
return default(ValueTask);
}
}
}
4 changes: 2 additions & 2 deletions src/KafkaFlow.Serializer/IMessageTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ public interface IMessageTypeResolver
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
/// <returns></returns>
Task<Type> OnConsumeAsync(IMessageContext context);
ValueTask<Type> OnConsumeAsync(IMessageContext context);

/// <summary>
/// Stores the message type somewhere when producing
/// </summary>
/// <param name="context">The <see cref="IMessageContext"/> containing the message and the metadata</param>
Task OnProduceAsync(IMessageContext context);
ValueTask OnProduceAsync(IMessageContext context);
}
}
1 change: 1 addition & 0 deletions src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="2.1.3" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

</Project>
6 changes: 3 additions & 3 deletions src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public SingleMessageTypeResolver(Type messageType)
}

/// <inheritdoc/>
public Task<Type> OnConsumeAsync(IMessageContext context) => Task.FromResult(this.messageType);
public ValueTask<Type> OnConsumeAsync(IMessageContext context) => new ValueTask<Type>(this.messageType);

/// <inheritdoc/>
public Task OnProduceAsync(IMessageContext context)
public ValueTask OnProduceAsync(IMessageContext context)
{
// Do nothing
return Task.CompletedTask;
return default(ValueTask);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ namespace KafkaFlow.UnitTests.Middlewares.Serialization
using System;
using System.Threading.Tasks;
using Confluent.SchemaRegistry;

using FluentAssertions;

using Google.Protobuf;
using KafkaFlow.Serializer.SchemaRegistry;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -40,7 +43,7 @@ public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields()
var protoFields = await this.schemaRegistryTypeResolver.ResolveAsync(schemaId);

// Assert
// TODO fix returning empty
protoFields.Should().NotBeNull();
}
}
}

0 comments on commit 332fbd8

Please sign in to comment.