diff --git a/.github/workflows/deploy-website.yml b/.github/workflows/deploy-website.yml
index 129761e90..7fa32abe6 100644
--- a/.github/workflows/deploy-website.yml
+++ b/.github/workflows/deploy-website.yml
@@ -36,12 +36,6 @@ jobs:
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Admin.dll website/docs/reference/KafkaFlow.Admin --type-folders
shell: bash
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.BatchConsume.dll website/docs/reference/KafkaFlow.BatchConsume --type-folders
- shell: bash
-
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.Compressor.dll website/docs/reference/KafkaFlow.Compressor --type-folders
- shell: bash
-
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Extensions.Hosting.dll website/docs/reference/KafkaFlow.Extensions.Hosting --type-folders
shell: bash
@@ -57,9 +51,6 @@ jobs:
- run: xmldocmd-docusaurus ./drop/KafkaFlow.SchemaRegistry.dll website/docs/reference/KafkaFlow.SchemaRegistry --type-folders
shell: bash
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.dll website/docs/reference/KafkaFlow.Serializer --type-folders
- shell: bash
-
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.JsonCore.dll website/docs/reference/KafkaFlow.Serializer.JsonCore --type-folders
shell: bash
@@ -78,9 +69,6 @@ jobs:
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.dll website/docs/reference/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf --type-folders
shell: bash
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.TypedHandler.dll website/docs/reference/KafkaFlow.TypedHandler --type-folders
- shell: bash
-
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Unity.dll website/docs/reference/KafkaFlow.Unity --type-folders
shell: bash
diff --git a/.github/workflows/test-deploy-website.yml b/.github/workflows/test-deploy-website.yml
index b2b49043d..04584aec9 100644
--- a/.github/workflows/test-deploy-website.yml
+++ b/.github/workflows/test-deploy-website.yml
@@ -30,12 +30,6 @@ jobs:
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Admin.dll website/docs/reference/KafkaFlow.Admin --type-folders
shell: bash
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.BatchConsume.dll website/docs/reference/KafkaFlow.BatchConsume --type-folders
- shell: bash
-
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.Compressor.dll website/docs/reference/KafkaFlow.Compressor --type-folders
- shell: bash
-
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Extensions.Hosting.dll website/docs/reference/KafkaFlow.Extensions.Hosting --type-folders
shell: bash
@@ -51,9 +45,6 @@ jobs:
- run: xmldocmd-docusaurus ./drop/KafkaFlow.SchemaRegistry.dll website/docs/reference/KafkaFlow.SchemaRegistry --type-folders
shell: bash
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.dll website/docs/reference/KafkaFlow.Serializer --type-folders
- shell: bash
-
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.JsonCore.dll website/docs/reference/KafkaFlow.Serializer.JsonCore --type-folders
shell: bash
@@ -72,9 +63,6 @@ jobs:
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.dll website/docs/reference/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf --type-folders
shell: bash
- - run: xmldocmd-docusaurus ./drop/KafkaFlow.TypedHandler.dll website/docs/reference/KafkaFlow.TypedHandler --type-folders
- shell: bash
-
- run: xmldocmd-docusaurus ./drop/KafkaFlow.Unity.dll website/docs/reference/KafkaFlow.Unity --type-folders
shell: bash
diff --git a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj
index f51dbe58e..6336785ab 100644
--- a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj
+++ b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj
@@ -22,11 +22,9 @@
-
-
diff --git a/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs b/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs
index 317098b43..438689b7a 100644
--- a/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs
+++ b/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs
@@ -1,7 +1,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
-using KafkaFlow.BatchConsume;
+using KafkaFlow.Batching;
namespace KafkaFlow.Sample.BatchOperations;
diff --git a/samples/KafkaFlow.Sample.BatchOperations/Program.cs b/samples/KafkaFlow.Sample.BatchOperations/Program.cs
index 1515286ed..65fc08fc2 100644
--- a/samples/KafkaFlow.Sample.BatchOperations/Program.cs
+++ b/samples/KafkaFlow.Sample.BatchOperations/Program.cs
@@ -1,7 +1,6 @@
using System;
using System.Linq;
using KafkaFlow;
-using KafkaFlow.BatchConsume;
using KafkaFlow.Producers;
using KafkaFlow.Sample.BatchOperations;
using KafkaFlow.Serializer;
@@ -35,8 +34,8 @@
.WithWorkersCount(1)
.AddMiddlewares(
middlewares => middlewares
- .AddSerializer()
- .BatchConsume(10, TimeSpan.FromSeconds(10))
+ .AddDeserializer()
+ .AddBatching(10, TimeSpan.FromSeconds(10))
.Add()
)
)
diff --git a/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj b/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj
index 25d305383..ba75e35c2 100644
--- a/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj
+++ b/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj
@@ -22,8 +22,6 @@
-
-
diff --git a/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs b/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs
index 3e6699c70..d6396e9dc 100644
--- a/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs
+++ b/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs
@@ -48,7 +48,7 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1
.WithWorkersCount(1)
.AddMiddlewares(
middlewares => middlewares
- .AddSerializer()
+ .AddDeserializer()
.Add()
)
)
@@ -68,7 +68,7 @@ The ThrottleConsumer mechanism works by checking the lag of "consumerA" every 1
.AddAction(a => a.AboveThreshold(10).ApplyDelay(1_000))
.AddAction(a => a.AboveThreshold(20).ApplyDelay(5_000))
.AddAction(a => a.AboveThreshold(30).ApplyDelay(10_000)))
- .AddSerializer()
+ .AddDeserializer()
.Add()
)
)
diff --git a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj
index e4a1914e1..20946db23 100644
--- a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj
+++ b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj
@@ -26,7 +26,6 @@
-
diff --git a/samples/KafkaFlow.Sample.FlowControl/Program.cs b/samples/KafkaFlow.Sample.FlowControl/Program.cs
index 87a783ce1..26ed362fd 100644
--- a/samples/KafkaFlow.Sample.FlowControl/Program.cs
+++ b/samples/KafkaFlow.Sample.FlowControl/Program.cs
@@ -35,7 +35,7 @@
.WithWorkersCount(1)
.AddMiddlewares(
m => m
- .AddSingleTypeSerializer()
+ .AddSingleTypeDeserializer()
.Add()
)
);
diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj
index f6f6d1e42..7a55c9fd7 100644
--- a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj
+++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj
@@ -14,8 +14,6 @@
-
-
diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs
index 114dae019..77b597265 100644
--- a/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs
+++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs
@@ -1,4 +1,4 @@
-using KafkaFlow.TypedHandler;
+using KafkaFlow.Middlewares.TypedHandler;
namespace KafkaFlow.Sample.PauseConsumerOnError;
diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs
index 478aa838b..35a2ce5b6 100644
--- a/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs
+++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs
@@ -2,7 +2,6 @@
using KafkaFlow.Producers;
using KafkaFlow.Sample.PauseConsumerOnError;
using KafkaFlow.Serializer;
-using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;
var services = new ServiceCollection();
@@ -33,7 +32,7 @@
middlewares =>
middlewares
.Add()
- .AddSerializer()
+ .AddDeserializer()
.AddTypedHandlers(h => h.AddHandler())
)
)
diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs
index f7f70e702..0893f1225 100644
--- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs
+++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs
@@ -1,20 +1,21 @@
-namespace KafkaFlow.Sample.SchemaRegistry.Handlers;
-
-using System;
-using System.Threading.Tasks;
-using global::SchemaRegistry;
-using TypedHandler;
-
-public class AvroMessageHandler : IMessageHandler
+namespace KafkaFlow.Sample.SchemaRegistry.Handlers
{
- public Task Handle(IMessageContext context, AvroLogMessage message)
+ using System;
+ using System.Threading.Tasks;
+ using KafkaFlow.Middlewares.TypedHandler;
+ using global::SchemaRegistry;
+
+ public class AvroMessageHandler : IMessageHandler
{
- Console.WriteLine(
- "Partition: {0} | Offset: {1} | Message: {2} | Avro",
- context.ConsumerContext.Partition,
- context.ConsumerContext.Offset,
- message.Severity.ToString());
+ public Task Handle(IMessageContext context, AvroLogMessage message)
+ {
+ Console.WriteLine(
+ "Partition: {0} | Offset: {1} | Message: {2} | Avro",
+ context.ConsumerContext.Partition,
+ context.ConsumerContext.Offset,
+ message.Severity.ToString());
- return Task.CompletedTask;
+ return Task.CompletedTask;
+ }
}
}
\ No newline at end of file
diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs
index ab35a60e3..65660f5f3 100644
--- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs
+++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs
@@ -3,7 +3,7 @@
using System;
using System.Threading.Tasks;
using global::SchemaRegistry;
-using TypedHandler;
+using KafkaFlow.Middlewares.TypedHandler;
public class AvroMessageHandler2 : IMessageHandler
{
diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs
index f17b98a8b..39b446434 100644
--- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs
+++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs
@@ -3,7 +3,7 @@
using System;
using System.Threading.Tasks;
using global::SchemaRegistry;
-using TypedHandler;
+using KafkaFlow.Middlewares.TypedHandler;
public class JsonMessageHandler : IMessageHandler
{
diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs
index aee7c1cd9..e1158c9c8 100644
--- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs
+++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs
@@ -1,7 +1,7 @@
using System;
using System.Threading.Tasks;
-using KafkaFlow.TypedHandler;
+using KafkaFlow.Middlewares.TypedHandler;
using SchemaRegistry;
namespace KafkaFlow.Sample.SchemaRegistry.Handlers;
diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj
index 220139cdf..ddef38718 100644
--- a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj
+++ b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj
@@ -24,8 +24,6 @@
-
-
diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs
index bc539818d..926879042 100644
--- a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs
+++ b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs
@@ -5,7 +5,6 @@
using KafkaFlow;
using KafkaFlow.Producers;
using KafkaFlow.Sample.SchemaRegistry.Handlers;
-using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;
using SchemaRegistry;
@@ -74,7 +73,7 @@
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
- .AddSchemaRegistryAvroSerializer()
+ .AddSchemaRegistryAvroDeserializer()
.AddTypedHandlers(
handlers => handlers
.AddHandler()
@@ -103,7 +102,7 @@
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
- .AddSchemaRegistryProtobufSerializer()
+ .AddSchemaRegistryProtobufDeserializer()
.AddTypedHandlers(handlers => handlers.AddHandler())
)
)
diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
index dcf23e1b3..a6743af44 100644
--- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
+++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj
@@ -19,13 +19,10 @@
-
-
-
diff --git a/samples/KafkaFlow.Sample/PrintConsoleHandler.cs b/samples/KafkaFlow.Sample/PrintConsoleHandler.cs
index 11276440c..57a1a48b8 100644
--- a/samples/KafkaFlow.Sample/PrintConsoleHandler.cs
+++ b/samples/KafkaFlow.Sample/PrintConsoleHandler.cs
@@ -1,6 +1,6 @@
using System;
using System.Threading.Tasks;
-using KafkaFlow.TypedHandler;
+using KafkaFlow.Middlewares.TypedHandler;
namespace KafkaFlow.Sample;
diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs
index 874962de1..b0be4f2c7 100644
--- a/samples/KafkaFlow.Sample/Program.cs
+++ b/samples/KafkaFlow.Sample/Program.cs
@@ -4,7 +4,6 @@
using KafkaFlow.Producers;
using KafkaFlow.Sample;
using KafkaFlow.Serializer;
-using KafkaFlow.TypedHandler;
using Microsoft.Extensions.DependencyInjection;
var services = new ServiceCollection();
@@ -33,7 +32,7 @@
.WithWorkersCount(3)
.AddMiddlewares(
middlewares => middlewares
- .AddSerializer()
+ .AddDeserializer()
.AddTypedHandlers(h => h.AddHandler())
)
)
diff --git a/src/KafkaFlow.Abstractions/IMessageCompressor.cs b/src/KafkaFlow.Abstractions/ICompressor.cs
similarity index 55%
rename from src/KafkaFlow.Abstractions/IMessageCompressor.cs
rename to src/KafkaFlow.Abstractions/ICompressor.cs
index 53fffb40f..8b1d6a1b5 100644
--- a/src/KafkaFlow.Abstractions/IMessageCompressor.cs
+++ b/src/KafkaFlow.Abstractions/ICompressor.cs
@@ -3,7 +3,7 @@ namespace KafkaFlow
///
/// Used to create a message compressor
///
- public interface IMessageCompressor
+ public interface ICompressor
{
///
/// Compress the given message
@@ -11,12 +11,5 @@ public interface IMessageCompressor
/// The message to be compressed
/// The compressed message
byte[] Compress(byte[] message);
-
- ///
- /// Decompress the given message
- ///
- /// The message to be decompressed
- /// The decompressed message
- byte[] Decompress(byte[] message);
}
}
diff --git a/src/KafkaFlow.Abstractions/IDecompressor.cs b/src/KafkaFlow.Abstractions/IDecompressor.cs
new file mode 100644
index 000000000..69e140ffb
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/IDecompressor.cs
@@ -0,0 +1,15 @@
+namespace KafkaFlow
+{
+ ///
+ /// Used to create a message decompressor
+ ///
+ public interface IDecompressor
+ {
+ ///
+ /// Decompress the given message
+ ///
+ /// The message to be decompressed
+ /// The decompressed message
+ byte[] Decompress(byte[] message);
+ }
+}
diff --git a/src/KafkaFlow.Abstractions/IDeserializer.cs b/src/KafkaFlow.Abstractions/IDeserializer.cs
new file mode 100644
index 000000000..0ecfd2c86
--- /dev/null
+++ b/src/KafkaFlow.Abstractions/IDeserializer.cs
@@ -0,0 +1,21 @@
+namespace KafkaFlow
+{
+ using System;
+ using System.IO;
+ using System.Threading.Tasks;
+
+ ///
+ /// Used to implement a message serializer
+ ///
+ public interface IDeserializer
+ {
+ ///
+ /// Deserializes the given message
+ ///
+ /// A stream to read the data to be deserialized
+ /// The type to be created
+ /// An object containing metadata
+ /// The deserialized message
+ Task