-
Notifications
You must be signed in to change notification settings - Fork 119
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat!: async support on message type and schema registry resolvers
- Loading branch information
1 parent
4e37f85
commit 0057d79
Showing
25 changed files
with
399 additions
and
138 deletions.
There are no files selected for viewing
14 changes: 0 additions & 14 deletions
14
src/KafkaFlow.SchemaRegistry/AsyncSchemaRegistryTypeNameResolverWrapper.cs
This file was deleted.
Oops, something went wrong.
17 changes: 0 additions & 17 deletions
17
src/KafkaFlow.SchemaRegistry/IAsyncSchemaRegistryTypeNameResolver.cs
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
3 changes: 3 additions & 0 deletions
3
src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/AssemblyInfo.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
using System.Runtime.CompilerServices; | ||
|
||
[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] |
4 changes: 2 additions & 2 deletions
4
src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
3 changes: 3 additions & 0 deletions
3
src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
using System.Runtime.CompilerServices; | ||
|
||
[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 0 additions & 21 deletions
21
src/KafkaFlow.Serializer/AsyncMessageTypeResolverWrapper.cs
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,35 @@ | ||
namespace KafkaFlow | ||
{ | ||
using System; | ||
using System.Threading.Tasks; | ||
|
||
internal class DefaultTypeResolver : IMessageTypeResolver | ||
{ | ||
private const string MessageType = "Message-Type"; | ||
|
||
public Type OnConsume(IMessageContext context) | ||
public ValueTask<Type> OnConsumeAsync(IMessageContext context) | ||
{ | ||
var typeName = context.Headers.GetString(MessageType); | ||
|
||
return typeName is null ? | ||
null : | ||
Type.GetType(typeName); | ||
new ValueTask<Type>((Type) null) : | ||
new ValueTask<Type>(Type.GetType(typeName)); | ||
} | ||
|
||
public void OnProduce(IMessageContext context) | ||
public ValueTask OnProduceAsync(IMessageContext context) | ||
{ | ||
if (context.Message.Value is null) | ||
{ | ||
return; | ||
return default(ValueTask); | ||
} | ||
|
||
var messageType = context.Message.Value.GetType(); | ||
|
||
context.Headers.SetString( | ||
MessageType, | ||
$"{messageType.FullName}, {messageType.Assembly.GetName().Name}"); | ||
|
||
return default(ValueTask); | ||
} | ||
} | ||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.