diff --git a/README.md b/README.md
index 042efef9..303584eb 100644
--- a/README.md
+++ b/README.md
@@ -116,10 +116,10 @@ static async System.Threading.Tasks.Task Main(string[] args)
 | Sliding window                                               |              X                     |                        | No plan for now                            |
 | Session window                                               |              X                     |                        | No plan for now                            |
 | Cache                                                        |              X                     |         X              | EA 1.6.0                                   |
-| Suppress(..)                                                 |              X                     |                        | No plan for now                            |
+| Suppress(..)                                                 |              X                     |           X             | Plan for 1.7.0                           |
 | Interactive Queries                                          |              X                     |                        | No plan for now                            |
 | State store batch restoring                                  |              X                     |                        | No plan for now                            |
-| Exactly Once (v1 and v2)                                     |              X                     |         X              | EOS V1 supported, EOS V2 not supported yet |
+| Exactly Once v2                                     |              X                     |         X              |  |
 
 # Community Support
 
diff --git a/core/Crosscutting/ByteBuffer.cs b/core/Crosscutting/ByteBuffer.cs
index 3deeb3c9..a869ad73 100644
--- a/core/Crosscutting/ByteBuffer.cs
+++ b/core/Crosscutting/ByteBuffer.cs
@@ -88,6 +88,31 @@ public ByteBuffer PutInt(int @int)
 
         #region ReadOperation
 
+        public long GetLong()
+        {
+            var bytes = reader.ReadBytes(sizeof(long));
+            
+            if (BitConverter.IsLittleEndian && bigEndian)
+                bytes = bytes.Reverse().ToArray();
+
+            return BitConverter.ToInt64(bytes, 0);
+        }
+
+        public int GetInt()
+        {
+            var bytes = reader.ReadBytes(sizeof(int));
+            
+            if (BitConverter.IsLittleEndian && bigEndian)
+                bytes = bytes.Reverse().ToArray();
+            
+            return BitConverter.ToInt32(bytes, 0);
+        }
+
+        public byte[] GetBytes(int size)
+        {
+            return reader.ReadBytes(size);
+        }
+        
         public long GetLong(int offset)
         {
             reader.BaseStream.Seek(offset, SeekOrigin.Begin);
@@ -116,6 +141,13 @@ public byte[] GetBytes(int offset, int size)
             return reader.ReadBytes(size);
         }
 
+        public byte[] GetNullableSizePrefixedArray()
+        {
+            var size = GetInt();
+            if (size == -1)
+                return null;
+            return GetBytes(size);
+        }
         #endregion
 
         public byte[] ToArray()
diff --git a/core/Metrics/Internal/ProcessorNodeMetrics.cs b/core/Metrics/Internal/ProcessorNodeMetrics.cs
index f97fe5c6..6f000846 100644
--- a/core/Metrics/Internal/ProcessorNodeMetrics.cs
+++ b/core/Metrics/Internal/ProcessorNodeMetrics.cs
@@ -28,7 +28,6 @@ internal class ProcessorNodeMetrics
         internal static readonly string RETRY_AVG_DESCRIPTION = StreamMetricsRegistry.RATE_DESCRIPTION_PREFIX + RETRY_DESCRIPTION;
         internal static readonly string RETRY_MAX_DESCRIPTION = StreamMetricsRegistry.TOTAL_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
         
-        // NOT USE FOR MOMENT
         public static Sensor SuppressionEmitSensor(string threadId,
             TaskId taskId,
             string processorNodeId,
diff --git a/core/ProcessorContext.cs b/core/ProcessorContext.cs
index 35f6cce7..a322390e 100644
--- a/core/ProcessorContext.cs
+++ b/core/ProcessorContext.cs
@@ -24,7 +24,7 @@ public class ProcessorContext
         internal virtual SerDesContext SerDesContext { get; }
         internal virtual IStreamConfig Configuration { get; }
         internal virtual IRecordContext RecordContext { get; private set; }
-        internal IRecordCollector RecordCollector { get; private set; }
+        internal virtual IRecordCollector RecordCollector { get; private set; }
         internal virtual IStateManager States { get; }
         internal virtual bool FollowMetadata { get; set; }
         
diff --git a/core/Processors/IRecordContext.cs b/core/Processors/IRecordContext.cs
index 556a35f7..96a6d2e2 100644
--- a/core/Processors/IRecordContext.cs
+++ b/core/Processors/IRecordContext.cs
@@ -39,5 +39,16 @@ public interface IRecordContext
         /// </summary>
         /// <param name="headers">new headers</param>
         void SetHeaders(Headers headers);
+        
+        /// <summary>
+        /// Estimate amount of memory used to store this record context 
+        /// </summary>
+        long MemorySizeEstimate { get; }
+
+        /// <summary>
+        /// Serialize all the metadata into a byte array
+        /// </summary>
+        /// <returns>Return a byte array which contains all the metadata fields serialized</returns>
+        byte[] Serialize();
     }
 }
diff --git a/core/Processors/Internal/RecordContext.cs b/core/Processors/Internal/RecordContext.cs
index 4aa76a05..1477051e 100644
--- a/core/Processors/Internal/RecordContext.cs
+++ b/core/Processors/Internal/RecordContext.cs
@@ -1,4 +1,7 @@
-using Confluent.Kafka;
+using System.Collections.Generic;
+using System.Text;
+using Confluent.Kafka;
+using Streamiz.Kafka.Net.Crosscutting;
 
 namespace Streamiz.Kafka.Net.Processors.Internal
 {
@@ -42,5 +45,77 @@ public void SetHeaders(Headers headers)
         {
             Headers = headers;
         }
+
+        public long MemorySizeEstimate
+            => sizeof(int) + // partition
+               sizeof(long) + //offset
+               sizeof(long) + // timestamp
+               Topic.Length + // topic length
+               Headers.GetEstimatedSize(); // headers size
+
+        public byte[] Serialize()
+        {
+            var size = sizeof(int) + // partition
+                       sizeof(long) + //offset
+                       sizeof(long) + // timestamp 
+                       sizeof(int) + // topic length
+                       Topic.Length + // topic
+                       sizeof(int); // number of headers
+            
+            List<(byte[], byte[])> byteHeaders = new();
+            
+            foreach(var header in Headers)
+            {
+                size += 2 * sizeof(int); // size of key and value
+                size += header.Key.Length;
+                if (header.GetValueBytes() != null)
+                    size += header.GetValueBytes().Length;
+                byteHeaders.Add((Encoding.UTF8.GetBytes(header.Key), header.GetValueBytes()));
+            }
+
+            var buffer = ByteBuffer.Build(size, true);
+            buffer.PutInt(Partition)
+                .PutLong(Offset)
+                .PutLong(Timestamp)
+                .PutInt(Topic.Length)
+                .Put(Encoding.UTF8.GetBytes(Topic))
+                .PutInt(byteHeaders.Count);
+
+            foreach (var byteHeader in byteHeaders)
+            {
+                buffer.PutInt(byteHeader.Item1.Length).Put(byteHeader.Item1);
+                if (byteHeader.Item2 != null)
+                    buffer.PutInt(byteHeader.Item2.Length).Put(byteHeader.Item2);
+                else
+                    buffer.PutInt(-1);
+            }
+
+            return buffer.ToArray();
+        }
+
+        internal static IRecordContext Deserialize(ByteBuffer buffer)
+        {
+            var partition = buffer.GetInt();
+            var offset = buffer.GetLong();
+            var timestamp = buffer.GetLong();
+            var topic = Encoding.UTF8.GetString(buffer.GetBytes(buffer.GetInt()));
+            var headerCount = buffer.GetInt();
+            Headers headers = new Headers();
+            
+            for (int i = 0; i < headerCount; ++i)
+            {
+                var headerKeyLength = buffer.GetInt();
+                var headerKey = buffer.GetBytes(headerKeyLength);
+                byte[] headerValue = null;
+                var headerValueLength = buffer.GetInt();
+
+                if (headerValueLength > 0)
+                    headerValue = buffer.GetBytes(headerValueLength);
+                
+                headers.Add(Encoding.UTF8.GetString(headerKey), headerValue);
+            }
+            
+            return new RecordContext(headers, offset, timestamp, partition, topic);
+        } 
     }
 }
diff --git a/core/Processors/KTableSuppressProcessor.cs b/core/Processors/KTableSuppressProcessor.cs
new file mode 100644
index 00000000..3f3bc621
--- /dev/null
+++ b/core/Processors/KTableSuppressProcessor.cs
@@ -0,0 +1,107 @@
+using System;
+using System.Threading;
+using Streamiz.Kafka.Net.Errors;
+using Streamiz.Kafka.Net.Metrics;
+using Streamiz.Kafka.Net.Metrics.Internal;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State.Suppress;
+using Streamiz.Kafka.Net.Table;
+using Streamiz.Kafka.Net.Table.Internal;
+
+namespace Streamiz.Kafka.Net.Processors
+{
+    internal class KTableSuppressProcessor<K, V> : AbstractProcessor<K, Change<V>>
+    {
+        private readonly long maxRecords;
+        private readonly long maxBytes;
+        private readonly long suppressionDurationMs;
+        private readonly BUFFER_FULL_STRATEGY bufferFullStrategy;
+        private readonly bool safeToDropTombstone;
+        private readonly string storeName;
+        private readonly Func<ProcessorContext, K, long> timeDefinition;
+        
+        private Sensor suppressionEmitSensor;
+        private long observedStreamTime = -1;
+        private ITimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;
+        private readonly ISerDes<K> keySerdes;
+        private readonly ISerDes<V> valueSerdes;
+
+        private bool OverCapacity => buffer.BufferSize > maxBytes || buffer.NumRecords > maxRecords;
+
+        public KTableSuppressProcessor(Suppressed<K, V> suppressed, string storeName)
+        {
+            maxRecords = suppressed.BufferConfig.MaxRecords;
+            maxBytes = suppressed.BufferConfig.MaxBytes;
+            bufferFullStrategy = suppressed.BufferConfig.BufferFullStrategy;
+            suppressionDurationMs = (long)suppressed.SuppressionTime.TotalMilliseconds;
+            safeToDropTombstone = suppressed.SafeToDropTombstones;
+            this.storeName = storeName;
+            timeDefinition = suppressed.TimeDefinition;
+            keySerdes = suppressed.KeySerdes;
+            valueSerdes = suppressed.ValueSerdes;
+        }
+
+        public override void Init(ProcessorContext context)
+        {
+            base.Init(context);
+            suppressionEmitSensor = ProcessorNodeMetrics.SuppressionEmitSensor(
+                Thread.CurrentThread.Name,
+                context.Id,
+                Name,
+                context.Metrics);
+            
+            buffer = (ITimeOrderedKeyValueBuffer<K, V, Change<V>>)context.GetStateStore(storeName);
+            buffer.SetSerdesIfNull(keySerdes, valueSerdes);
+        }
+
+        public override void Process(K key, Change<V> value)
+        {
+            observedStreamTime = Math.Max(observedStreamTime, Context.Timestamp);
+            Buffer(key, value);
+            EnforceConstraints();
+        }
+
+        private void Buffer(K key, Change<V> value)
+        {
+            var bufferTime = timeDefinition(Context, key);
+            buffer.Put(bufferTime, key, value, Context.RecordContext);
+        }
+
+        private void EnforceConstraints()
+        {
+            long expiryTime = observedStreamTime - suppressionDurationMs;
+            buffer.EvictWhile(() => buffer.MinTimestamp <= expiryTime, Emit);
+
+            if (OverCapacity)
+            {
+                switch (bufferFullStrategy)
+                {
+                    case BUFFER_FULL_STRATEGY.EMIT:
+                        buffer.EvictWhile(() => OverCapacity, Emit);
+                        break;
+                    case BUFFER_FULL_STRATEGY.SHUTDOWN:
+                        throw new StreamsException(
+                            $"{Name} buffer exceeded its max capacity. Currently [{buffer.NumRecords}/{maxRecords}] records and [{buffer.BufferSize}/{maxBytes}] bytes.");
+                }
+            }
+        }
+
+        private void Emit(K key, Change<V> value, IRecordContext context)
+        {
+            if (value.NewValue != null || !safeToDropTombstone)
+            {
+                var currentRecordContext = Context.RecordContext;
+                try
+                {
+                    Context.SetRecordMetaData(context);
+                    Forward(key, value);
+                    suppressionEmitSensor.Record();
+                }
+                finally
+                {
+                    Context.SetRecordMetaData(currentRecordContext);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/State/Cache/CacheEntryValue.cs b/core/State/Cache/CacheEntryValue.cs
index e5d32313..5e2c31f7 100644
--- a/core/State/Cache/CacheEntryValue.cs
+++ b/core/State/Cache/CacheEntryValue.cs
@@ -24,10 +24,6 @@ internal CacheEntryValue(byte[] value, Headers headers, long offset, long timest
 
         public long Size =>
             (Value != null ? Value.LongLength : 0) +
-            sizeof(int) + // partition
-            sizeof(long) + //offset
-            sizeof(long) + // timestamp
-            Context.Topic.Length + // topic length
-            Context.Headers.GetEstimatedSize(); // headers size
+            Context.MemorySizeEstimate;
     }
 }
\ No newline at end of file
diff --git a/core/State/Suppress/ITimeOrderedKeyValueBuffer.cs b/core/State/Suppress/ITimeOrderedKeyValueBuffer.cs
new file mode 100644
index 00000000..a1b1e767
--- /dev/null
+++ b/core/State/Suppress/ITimeOrderedKeyValueBuffer.cs
@@ -0,0 +1,19 @@
+using System;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State.Suppress.Internal;
+
+namespace Streamiz.Kafka.Net.State.Suppress
+{
+    internal interface ITimeOrderedKeyValueBuffer<K, V, T> : IStateStore
+    {
+        long NumRecords { get; }
+        long BufferSize { get; }
+        long MinTimestamp { get; }
+        
+        bool Put(long timestamp, K key, T value, IRecordContext recordContext);
+        Maybe<ValueAndTimestamp<V>> PriorValueForBuffered(K key);
+        void EvictWhile(Func<bool> predicate, Action<K, T, IRecordContext> evictHandler);
+        void SetSerdesIfNull(ISerDes<K> contextKeySerdes, ISerDes<V> contextValueSerdes);
+    }
+}
\ No newline at end of file
diff --git a/core/State/Suppress/InMemoryTimeOrderedKeyValueChangeBufferBuilder.cs b/core/State/Suppress/InMemoryTimeOrderedKeyValueChangeBufferBuilder.cs
new file mode 100644
index 00000000..83ce2041
--- /dev/null
+++ b/core/State/Suppress/InMemoryTimeOrderedKeyValueChangeBufferBuilder.cs
@@ -0,0 +1,26 @@
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State.Suppress.Internal;
+
+namespace Streamiz.Kafka.Net.State.Suppress
+{
+    internal class InMemoryTimeOrderedKeyValueChangeBufferBuilder<K, V>
+        : AbstractStoreBuilder<K, V, InMemoryTimeOrderedKeyValueChangeBuffer<K, V>>
+    {
+        public InMemoryTimeOrderedKeyValueChangeBufferBuilder(string name, ISerDes<K> keySerde, ISerDes<V> valueSerde) 
+            : base(name, keySerde, valueSerde)
+        {
+            WithCachingDisabled(); // disable explicitly the cache 
+        }
+
+        // Not used
+        public override bool IsWindowStore => false;
+        public override long RetentionMs => -1;
+        
+        public override InMemoryTimeOrderedKeyValueChangeBuffer<K, V> Build() =>
+            new(
+                Name,
+                LoggingEnabled,
+                keySerdes,
+                valueSerdes);
+    }
+}
\ No newline at end of file
diff --git a/core/State/Suppress/Internal/BufferKey.cs b/core/State/Suppress/Internal/BufferKey.cs
new file mode 100644
index 00000000..bc790daf
--- /dev/null
+++ b/core/State/Suppress/Internal/BufferKey.cs
@@ -0,0 +1,30 @@
+using System;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.Crosscutting;
+
+namespace Streamiz.Kafka.Net.State.Suppress.Internal
+{
+    internal class BufferKeyComparer : IComparer<BufferKey>
+    {
+        public int Compare(BufferKey x, BufferKey y)
+            => x.CompareTo(y);
+    }
+    
+    internal class BufferKey : IComparable<BufferKey>
+    {
+        public long Time { get; }
+        public Bytes Key { get; }
+
+        public BufferKey(long time, Bytes key)
+        {
+            Time = time;
+            Key = key;
+        }
+
+        public int CompareTo(BufferKey other)
+        {
+            var compared = Time.CompareTo(other.Time);
+            return compared == 0 ? Key.CompareTo(other.Key) : compared;
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/State/Suppress/Internal/BufferValue.cs b/core/State/Suppress/Internal/BufferValue.cs
new file mode 100644
index 00000000..3b535123
--- /dev/null
+++ b/core/State/Suppress/Internal/BufferValue.cs
@@ -0,0 +1,123 @@
+using System;
+using System.Linq;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.Processors;
+
+namespace Streamiz.Kafka.Net.State.Suppress.Internal
+{
+    internal class BufferValue
+    {
+        private const int NULL_VALUE = -1;
+        private const int OLD_PREV_DUPLICATE = -2;
+        
+        internal IRecordContext RecordContext { get; }
+        internal byte[] PriorValue { get; }
+        internal byte[] OldValue{ get; }
+        internal byte[] NewValue{ get; }
+
+        internal BufferValue(byte[] priorValue, byte[] oldValue, byte[] newValue, IRecordContext recordContext)
+        {
+            OldValue = oldValue;
+            NewValue = newValue;
+            RecordContext = recordContext;
+
+            if (ReferenceEquals(priorValue, oldValue))
+                this.PriorValue = oldValue;
+            else
+                this.PriorValue = priorValue;
+        }
+
+        public override bool Equals(object obj)
+        {
+            return obj is BufferValue value &&
+                   ((PriorValue == null && value.PriorValue == null) || PriorValue.SequenceEqual(value.PriorValue)) &&
+                   ((OldValue == null && value.OldValue == null) || OldValue.SequenceEqual(value.OldValue)) &&
+                       ((NewValue == null && value.NewValue == null) || NewValue.SequenceEqual(value.NewValue));
+        }
+
+        public override int GetHashCode()
+        {
+            #if NETSTANDARD2_0
+                uint hashCode1 = NewValue != null ? (uint) NewValue.GetHashCode() : 0U;
+                uint hashCode2 = PriorValue != null ? (uint) PriorValue.GetHashCode() : 0U;
+                uint hashCode3 = OldValue != null ? (uint) OldValue.GetHashCode() : 0U;
+                return (int)(hashCode1 & hashCode2 & hashCode3);
+            #else
+                return HashCode.Combine(NewValue, PriorValue, OldValue);
+            #endif
+        }
+
+        public long MemoryEstimatedSize()
+        {
+            return (PriorValue?.Length ?? 0) +
+                   (OldValue == null || PriorValue == OldValue ? 0 : OldValue.Length) +
+                   (NewValue?.Length ?? 0) +
+                   RecordContext.MemorySizeEstimate;
+        }
+
+        public ByteBuffer Serialize(int endPadding)
+        {
+            void AddValue(ByteBuffer buffer, byte[] value)
+            {
+                if (value == null) {
+                    buffer.PutInt(NULL_VALUE);
+                } else {
+                    buffer.PutInt(value.Length);
+                    buffer.Put(value);
+                }
+            }
+            
+            int sizeOfValueLength = sizeof(Int32);
+
+            int sizeOfPriorValue = PriorValue?.Length ?? 0;
+            int sizeOfOldValue = OldValue == null || PriorValue == OldValue ? 0 : OldValue.Length;
+            int sizeOfNewValue = NewValue?.Length ?? 0;
+
+            byte[] serializedContext = RecordContext.Serialize();
+            
+            var buffer = ByteBuffer.Build(
+                serializedContext.Length +
+                sizeOfValueLength + sizeOfPriorValue +
+                sizeOfValueLength + sizeOfOldValue +
+                sizeOfValueLength + sizeOfNewValue +
+                endPadding, true);
+
+            buffer.Put(serializedContext);
+            AddValue(buffer, PriorValue);
+
+            if (OldValue == null)
+                buffer.PutInt(NULL_VALUE);
+            else if (ReferenceEquals(PriorValue, OldValue))
+                buffer.PutInt(OLD_PREV_DUPLICATE);
+            else
+            {
+                buffer.PutInt(OldValue.Length);
+                buffer.Put(OldValue);
+            }
+            
+            AddValue(buffer, NewValue);
+
+            return buffer;
+        }
+
+        public static BufferValue Deserialize(ByteBuffer buffer)
+        {
+            var context = Processors.Internal.RecordContext.Deserialize(buffer);
+            var priorValue = buffer.GetNullableSizePrefixedArray();
+            byte[] oldValue;
+            
+            var oldValueLength = buffer.GetInt();
+            if (oldValueLength == OLD_PREV_DUPLICATE)
+                oldValue = priorValue;
+            else if (oldValueLength == NULL_VALUE)
+                oldValue = null;
+            else
+                oldValue = buffer.GetBytes(oldValueLength);
+
+            var newValue = buffer.GetNullableSizePrefixedArray();
+            
+            return new BufferValue(priorValue, oldValue, newValue, context);
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/core/State/Suppress/Internal/InMemoryTimeOrderedKeyValueChangeBuffer.cs b/core/State/Suppress/Internal/InMemoryTimeOrderedKeyValueChangeBuffer.cs
new file mode 100644
index 00000000..30a407f0
--- /dev/null
+++ b/core/State/Suppress/Internal/InMemoryTimeOrderedKeyValueChangeBuffer.cs
@@ -0,0 +1,331 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using Confluent.Kafka;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.Errors;
+using Streamiz.Kafka.Net.Metrics;
+using Streamiz.Kafka.Net.Metrics.Internal;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.Table.Internal;
+
+namespace Streamiz.Kafka.Net.State.Suppress.Internal
+{
+    internal class InMemoryTimeOrderedKeyValueChangeBuffer<K, V> : ITimeOrderedKeyValueBuffer<K, V, Change<V>>
+    {
+        public long NumRecords => index.Count;
+        public long BufferSize { get; private set; } = 0L;
+        public long MinTimestamp { get; private set; } = Int64.MaxValue;
+        public string Name { get; }
+        public bool Persistent => false;
+        public bool IsLocally => true;
+        public bool IsOpen { get; private set; }
+
+        private Sensor bufferSizeSensor;
+        private Sensor bufferCountSensor;
+
+        private const string metricScope = "in-memory-suppress";
+        private string changelogTopic;
+        private ProcessorContext processorContext;
+        private readonly bool loggingEnabled;
+        private ISerDes<K> keySerdes;
+        private ISerDes<V> valueSerdes;
+        private Func<Change<V>, Change<byte[]>> fullChangeSerializer;
+        private Func<Change<byte[]>, Change<V>> fullChangeDeserializer;
+
+        private Dictionary<Bytes, BufferKey> index = new();
+        private SortedDictionary<BufferKey, BufferValue> sortedValues = new(new BufferKeyComparer());
+        private HashSet<Bytes> dirtyKeys = new(new BytesComparer());
+
+        private Func<Bytes, BufferValue, long> ComputeRecordSizeFc => (key, value) =>
+        {
+            long size = 0L;
+            size += 8; // buffer time;
+            size += key.Get.Length;
+            if (value != null)
+                size += value.MemoryEstimatedSize();
+            return size;
+        };
+        
+        public InMemoryTimeOrderedKeyValueChangeBuffer(
+            string storeName, 
+            bool loggingEnabled,
+            ISerDes<K> keySerdes,
+            ISerDes<V> valueSerdes)
+        {
+            Name = storeName;
+            this.loggingEnabled = loggingEnabled;
+            this.keySerdes = keySerdes;
+            this.valueSerdes = valueSerdes;
+
+            fullChangeSerializer = (change) =>
+            {
+                byte[] oldBytes = change.OldValue == null
+                    ? null
+                    : valueSerdes.Serialize(change.OldValue,
+                        new SerializationContext(MessageComponentType.Value, changelogTopic));
+                byte[] newBytes = change.NewValue == null
+                    ? null
+                    : valueSerdes.Serialize(change.NewValue,
+                        new SerializationContext(MessageComponentType.Value, changelogTopic));
+                return new Change<byte[]>(oldBytes, newBytes);
+            };
+
+            fullChangeDeserializer = (change) =>
+            {
+                V oldValue = change.OldValue == null
+                    ? default
+                    : valueSerdes.Deserialize(change.OldValue,
+                        new SerializationContext(MessageComponentType.Value, changelogTopic));
+                V newValue = change.NewValue == null
+                    ? default
+                    : valueSerdes.Deserialize(change.NewValue,
+                        new SerializationContext(MessageComponentType.Value, changelogTopic));
+
+                return new Change<V>(oldValue, newValue);
+            };
+        }
+        
+        public void SetSerdesIfNull(ISerDes<K> contextKeySerdes, ISerDes<V> contextValueSerdes) {
+            keySerdes ??= contextKeySerdes;
+            valueSerdes ??= contextValueSerdes;
+        }
+        
+        public bool Put(long timestamp, K key, Change<V> value, IRecordContext recordContext)
+        {
+            if (value == null)
+                throw new ArgumentNullException(nameof(value));
+            if (recordContext == null)
+                throw new ArgumentNullException(nameof(recordContext));
+            
+            var serializedKey = Bytes.Wrap(keySerdes.Serialize(key,
+                new SerializationContext(MessageComponentType.Key, changelogTopic)));
+
+            var serializedChange = fullChangeSerializer(value);
+            
+            var buffered = GetBuffered(serializedKey);
+            var serializedPriorValue = buffered == null ? serializedChange.OldValue : buffered.PriorValue;
+            
+            CleanPut(timestamp, 
+                serializedKey, 
+                new BufferValue(serializedPriorValue, serializedChange.OldValue, serializedChange.NewValue, recordContext));
+
+            if (loggingEnabled)
+                dirtyKeys.Add(serializedKey);
+            UpdateBufferMetrics();
+            
+            return true;
+        }
+
+        public Maybe<ValueAndTimestamp<V>> PriorValueForBuffered(K key)
+        {
+            var serializedKey = Bytes.Wrap(keySerdes.Serialize(key,
+                new SerializationContext(MessageComponentType.Key, changelogTopic)));
+            
+            if (index.ContainsKey(serializedKey))
+            {
+                var serializedValue = InternalPriorValueForBuffered(serializedKey);
+                var deserializedValue = valueSerdes.Deserialize(serializedValue,
+                    new SerializationContext(MessageComponentType.Value, changelogTopic));
+
+                return Maybe<ValueAndTimestamp<V>>.Defined(ValueAndTimestamp<V>.Make(deserializedValue, -1));
+            }
+            
+            return Maybe<ValueAndTimestamp<V>>.Undefined();
+        }
+
+        public void EvictWhile(Func<bool> predicate, Action<K, Change<V>, IRecordContext> evictHandler)
+        {
+            int evictions = 0;
+            if (predicate())
+            {
+                List<BufferKey> keyToRemove = new();
+
+                using var eumerator = sortedValues.GetEnumerator();
+                bool @continue = eumerator.MoveNext();
+                
+                while (@continue && predicate())
+                {
+                    if (eumerator.Current.Key.Time != MinTimestamp)
+                        throw new IllegalStateException(
+                            $"minTimestamp [{MinTimestamp}] did not match the actual min timestamp {eumerator.Current.Key.Time}");
+                    
+                    K key = keySerdes.Deserialize(eumerator.Current.Key.Key.Get,
+                        new SerializationContext(MessageComponentType.Key, changelogTopic));
+                    BufferValue bufferValue = eumerator.Current.Value;
+                    Change<V> value =
+                        fullChangeDeserializer(new Change<byte[]>(bufferValue.OldValue, bufferValue.NewValue));
+                    
+                    evictHandler(key, value, bufferValue.RecordContext);
+                    keyToRemove.Add(eumerator.Current.Key);
+                    index.Remove(eumerator.Current.Key.Key);
+
+                    if (loggingEnabled)
+                        dirtyKeys.Add(eumerator.Current.Key.Key);
+
+                    BufferSize -= ComputeRecordSizeFc(eumerator.Current.Key.Key, bufferValue);
+                    
+                    ++evictions;
+                    
+                    @continue = eumerator.MoveNext();
+                    MinTimestamp = @continue ? eumerator.Current.Key.Time : long.MaxValue;
+                }
+                
+                if(keyToRemove.Any())
+                    sortedValues.RemoveAll(keyToRemove);
+            }
+            
+            if(evictions > 0)
+                UpdateBufferMetrics();
+        }
+        
+        public void Init(ProcessorContext context, IStateStore root)
+        {
+            keySerdes.Initialize(context.SerDesContext);
+            valueSerdes.Initialize(context.SerDesContext);
+            
+            changelogTopic = context.ChangelogFor(Name);
+            processorContext = context;
+
+            bufferSizeSensor = StateStoreMetrics.SuppressionBufferSizeSensor(
+                context.Id,
+                metricScope,
+                Name,
+                context.Metrics);
+            
+            bufferCountSensor = StateStoreMetrics.SuppressionBufferCountSensor(
+                context.Id,
+                metricScope,
+                Name,
+                context.Metrics);
+            
+            context.Register(root, Restore);
+            UpdateBufferMetrics();
+            IsOpen = true;
+        }
+
+        public void Flush()
+        {
+            if (loggingEnabled)
+            {
+                foreach (var dirtyKey in dirtyKeys)
+                {
+                    var bufferKey = index.Get(dirtyKey);
+                    if (bufferKey == null)
+                        LogTombstone(dirtyKey);
+                    else
+                        LogValue(dirtyKey, bufferKey, sortedValues[bufferKey]);
+                }
+                dirtyKeys.Clear();
+            }
+        }
+
+        public void Close()
+        {
+            IsOpen = false;
+            index.Clear();
+            sortedValues.Clear();
+            dirtyKeys.Clear();
+            BufferSize = 0;
+            MinTimestamp = Int64.MaxValue;
+            UpdateBufferMetrics();
+            processorContext.Metrics.RemoveStoreSensors(Thread.CurrentThread.Name, 
+                processorContext.Id.ToString(),
+                Name);
+        }
+        
+        #region Private
+        
+        private void Restore(Bytes key, byte[] value, long timestamp)
+        {
+            if (value == null)
+            {
+                if (index.ContainsKey(key))
+                {
+                    var bufferedKey = index[key];
+                    index.Remove(key);
+
+                    var bufferValue = sortedValues.Get(bufferedKey);
+                    if (bufferValue != null)
+                    {
+                        sortedValues.Remove(bufferedKey);
+                        BufferSize -= ComputeRecordSizeFc(bufferedKey.Key, bufferValue);
+                    }
+
+                    if (bufferedKey.Time == MinTimestamp)
+                        MinTimestamp = !sortedValues.Any() ? long.MaxValue : sortedValues.First().Key.Time;
+                }
+            }
+            else
+            {
+                var bufferValueAndTime = ByteBuffer.Build(value, true);
+                var bufferValue = BufferValue.Deserialize(bufferValueAndTime);
+                var time = bufferValueAndTime.GetLong();
+                CleanPut(time, key, bufferValue);
+            }
+            UpdateBufferMetrics();
+        }
+
+        private void UpdateBufferMetrics()
+        {
+            var now = DateTime.Now.GetMilliseconds();
+            bufferSizeSensor.Record(BufferSize, now);
+            bufferCountSensor.Record(index.Count, now);
+        }
+
+        private void LogValue(Bytes key, BufferKey bufferKey, BufferValue bufferValue)
+        {
+            var buffer = bufferValue.Serialize(sizeof(long));
+            buffer.PutLong(bufferKey.Time);
+            
+            var array = buffer.ToArray();
+            processorContext.Log(Name, key, array, bufferKey.Time);
+        }
+
+        private void LogTombstone(Bytes key)
+            => processorContext.Log(Name, key, null, DateTime.Now.GetMilliseconds());
+        
+        private void CleanPut(long time, Bytes key, BufferValue bufferValue)
+        {
+            BufferKey previousKey = index.Get(key);
+            if (previousKey == null)
+            {
+                BufferKey nextKey = new BufferKey(time, key);
+                index.Add(key, nextKey);
+                sortedValues.Add(nextKey, bufferValue);
+                MinTimestamp = Math.Min(MinTimestamp, time);
+                BufferSize += ComputeRecordSizeFc(key, bufferValue);
+            }
+            else
+            {
+                BufferValue removedValue = sortedValues.Get(previousKey);
+                sortedValues.Remove(previousKey);
+                sortedValues.Add(previousKey, bufferValue);
+                BufferSize =
+                    BufferSize
+                    + ComputeRecordSizeFc(key, bufferValue)
+                    - (removedValue == null ? 0 : ComputeRecordSizeFc(key, removedValue));
+            }
+        }
+
+        private BufferValue GetBuffered(Bytes key)
+        {
+            BufferKey bufferKey = index.Get(key);
+            return bufferKey == null ? null : sortedValues.Get(bufferKey);
+        }
+        
+        private byte[] InternalPriorValueForBuffered(Bytes key) {
+            BufferKey bufferKey = index.Get(key);
+            if (bufferKey == null) {
+                throw new ArgumentException($"Key [{key}] is not in the buffer.");
+            }
+
+            BufferValue bufferValue = sortedValues.Get(bufferKey);
+            return bufferValue.PriorValue;
+        }
+        
+        #endregion
+    }
+}
\ No newline at end of file
diff --git a/core/State/Suppress/Internal/Maybe.cs b/core/State/Suppress/Internal/Maybe.cs
new file mode 100644
index 00000000..d68274c3
--- /dev/null
+++ b/core/State/Suppress/Internal/Maybe.cs
@@ -0,0 +1,33 @@
+using Streamiz.Kafka.Net.Errors;
+
+namespace Streamiz.Kafka.Net.State.Suppress.Internal
+{
+    internal sealed class Maybe<T>
+        where T : class
+    {
+        private T nullableValue;
+
+        public bool IsDefined { get; private set; }
+
+        public T Value
+        {
+            get
+            {
+                if (IsDefined)
+                    return nullableValue;
+                
+                throw new IllegalStateException("Value is not defined");
+            }
+            private set => nullableValue = value;
+        }
+        
+        public Maybe(T nullableValue, bool isDefined)
+        {
+            Value = nullableValue;
+            IsDefined = isDefined;
+        }
+
+        public static Maybe<T> Defined(T value) => new Maybe<T>(value, true);
+        public static Maybe<T> Undefined() => new Maybe<T>(null, false);
+    }
+}
\ No newline at end of file
diff --git a/core/State/Windowed.cs b/core/State/Windowed.cs
index a8cfe34e..1b06ec28 100644
--- a/core/State/Windowed.cs
+++ b/core/State/Windowed.cs
@@ -12,7 +12,23 @@ namespace Streamiz.Kafka.Net.State
     /// Thus, a windowed <see cref="IKTable{K, V}"/> has type <code><see cref="Windowed{K}"/>, V</code>.
     /// </summary>
     /// <typeparam name="K">type of key</typeparam>
-    public class Windowed<K>
+    public abstract class Windowed
+    {
+        /// <summary>
+        /// Return the window containing the values associated with this key.
+        /// </summary>
+        public abstract Window Window { get; }
+    }
+    
+    /// <summary>
+    /// The result key type of a windowed stream aggregation.
+    /// If a <see cref="IKStream{K, V}"/> gets grouped and aggregated using a window-aggregation the resulting <see cref="IKTable{K, V}"/>is a
+    /// so-called "windowed <see cref="IKTable{K, V}"/>" with a combined key type that encodes the corresponding aggregation window and
+    /// the original record key.
+    /// Thus, a windowed <see cref="IKTable{K, V}"/> has type <code><see cref="Windowed{K}"/>, V</code>.
+    /// </summary>
+    /// <typeparam name="K">type of key</typeparam>
+    public class Windowed<K> : Windowed
     {
         /// <summary>
         /// Constructor with Key and Window data
@@ -33,7 +49,7 @@ public Windowed(K key, Window window)
         /// <summary>
         /// Return the window containing the values associated with this key.
         /// </summary>
-        public Window Window { get; }
+        public override Window Window { get; }
 
         /// <summary>
         /// 
diff --git a/core/Table/IKTable.cs b/core/Table/IKTable.cs
index e8c6771d..6fe4fc70 100644
--- a/core/Table/IKTable.cs
+++ b/core/Table/IKTable.cs
@@ -949,5 +949,14 @@ public interface IKTable<K, V>
         /// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
         /// <returns>a <see cref="IKTable{K, VR}"/> that contains join-records for each key and values computed by the given value joiner, one for each matched record-pair with the same key plus one for each non-matching record both KTables</returns>
         IKTable<K, VR> OuterJoin<VT, VR>(IKTable<K, VT> table, Func<V, VT, VR> valueJoiner, Materialized<K, VR, IKeyValueStore<Bytes, byte[]>> materialized, string named = null);
+
+        /// <summary>
+        /// Suppress some updates from this changelog stream, determined by the supplied <paramref name="suppressed"/> configuration.
+        /// This controls what updates downstream table and stream operations will receive.
+        /// </summary>
+        /// <param name="suppressed">Configuration object determining what, if any, updates to suppress</param>
+        /// <param name="named">A <see cref="string"/> config used to name the processor in the topology. Default : null</param>
+        /// <returns>A new KTable with the desired suppression characteristics.</returns>
+        IKTable<K, V> Suppress(Suppressed<K, V> suppressed, string named = null);
     }
 }
\ No newline at end of file
diff --git a/core/Table/Internal/Graph/KTableSuppress.cs b/core/Table/Internal/Graph/KTableSuppress.cs
new file mode 100644
index 00000000..02e142ad
--- /dev/null
+++ b/core/Table/Internal/Graph/KTableSuppress.cs
@@ -0,0 +1,74 @@
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.State;
+using Streamiz.Kafka.Net.State.Suppress;
+
+namespace Streamiz.Kafka.Net.Table.Internal.Graph
+{
+    internal class KTableSuppress<K, S, V> : IKTableProcessorSupplier<K, V, V>
+    {
+        private class KTableSuppressValueGetter : IKTableValueGetter<K, V>
+        {
+            private readonly IKTableValueGetter<K, V> _parentKTableValueGetter;
+            private readonly string _storeName;
+            private ITimeOrderedKeyValueBuffer<K,V,Change<V>> buffer;
+
+            public KTableSuppressValueGetter(IKTableValueGetterSupplier<K,V> parentKTableValueGetterSupplier, string storeName)
+            {
+                _parentKTableValueGetter = parentKTableValueGetterSupplier.Get();
+                _storeName = storeName;
+            }
+
+            public void Init(ProcessorContext context)
+            {
+                _parentKTableValueGetter.Init(context);
+                buffer = (ITimeOrderedKeyValueBuffer<K, V, Change<V>>)context.GetStateStore(_storeName);
+            }
+
+            public ValueAndTimestamp<V> Get(K key)
+            {
+                var container = buffer.PriorValueForBuffered(key);
+                return container.IsDefined ? container.Value : _parentKTableValueGetter.Get(key);
+            }
+
+            public void Close()
+            {
+                _parentKTableValueGetter.Close();
+            }
+        }
+        
+        private readonly Suppressed<K, V> _suppressed;
+        private readonly string _storeName;
+        private readonly KTable<K, S, V> _parent;
+
+        public KTableSuppress(
+            Suppressed<K, V> suppressed,
+            string storeName,
+            KTable<K, S, V> parentTable)
+        {
+            _suppressed = suppressed;
+            _storeName = storeName;
+            _parent = parentTable;
+            // The suppress buffer requires seeing the old values, to support the prior value view.
+            _parent.EnableSendingOldValues();
+        }
+        
+        public void EnableSendingOldValues()
+            => _parent.EnableSendingOldValues();
+
+        public IProcessor<K, Change<V>> Get()
+            => new KTableSuppressProcessor<K, V>(
+                _suppressed,
+                _storeName);
+
+        public IKTableValueGetterSupplier<K, V> View
+        {
+            get
+            {
+                var parentKTableValueGetterSupplier = _parent.ValueGetterSupplier;
+                return new GenericKTableValueGetterSupplier<K, V>(
+                    parentKTableValueGetterSupplier.StoreNames,
+                    new KTableSuppressValueGetter(parentKTableValueGetterSupplier, _storeName));
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/Table/Internal/KTable.cs b/core/Table/Internal/KTable.cs
index a3c0922d..7524fba5 100644
--- a/core/Table/Internal/KTable.cs
+++ b/core/Table/Internal/KTable.cs
@@ -11,6 +11,7 @@
 using Streamiz.Kafka.Net.Table.Internal.Graph.Nodes;
 using System;
 using System.Collections.Generic;
+using Streamiz.Kafka.Net.State.Suppress;
 
 namespace Streamiz.Kafka.Net.Table.Internal
 {
@@ -238,6 +239,46 @@ public IKTable<K, VR> OuterJoin<VT, VR>(IKTable<K, VT> table, Func<V, VT, VR> va
             => OuterJoin(table, new WrappedValueJoiner<V, VT, VR>(valueJoiner), materialized, named);
 
         #endregion
+        
+        #region Suppress
+        
+        public IKTable<K, V> Suppress(Suppressed<K, V> suppressed, string named = null)
+        {
+            var name = new Named(named).OrElseGenerateWithPrefix(builder, KTable.SUPPRESS_NAME);
+            suppressed.Name = name;
+
+            var storeName = !string.IsNullOrEmpty(named)
+                ? $"{named}-store"
+                : builder.NewStoreName(KTable.SUPPRESS_NAME);
+
+            IProcessorSupplier<K, Change<V>> processorSupplier = new KTableSuppress<K, S, V>(suppressed, storeName, this);
+
+            var storeBuilder =
+                new InMemoryTimeOrderedKeyValueChangeBufferBuilder<K, V>(storeName, KeySerdes, ValueSerdes);
+
+            if (suppressed.BufferConfig.LoggingEnabled)
+                storeBuilder.WithLoggingEnabled(suppressed.BufferConfig.Config);
+            else
+                storeBuilder.WithLoggingDisabled();
+
+            var processorNode = new StatefulProcessorNode<K, Change<V>>(name,
+                new ProcessorParameters<K, Change<V>>(processorSupplier, name),
+                storeBuilder, null);
+            
+            builder.AddGraphNode(Node, processorNode);
+
+            return new KTable<K, V, V>(
+                name,
+                KeySerdes,
+                ValueSerdes,
+                SetSourceNodes,
+                null,
+                processorSupplier,
+                processorNode,
+                builder);
+        }
+        
+        #endregion
 
         #endregion
 
diff --git a/core/Table/Suppressed.cs b/core/Table/Suppressed.cs
new file mode 100644
index 00000000..a774da02
--- /dev/null
+++ b/core/Table/Suppressed.cs
@@ -0,0 +1,358 @@
+using System;
+using System.Collections.Generic;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State;
+
+namespace Streamiz.Kafka.Net.Table
+{
+    /// <summary>
+    /// <see cref="Suppressed{K,V}"/> builder
+    /// </summary>
+    public static class SuppressedBuilder
+    {
+        /// <summary>
+        /// Configure the suppression to emit only the "final results" from the window.
+        /// <para>
+        /// By default all Streams operators emit results whenever new results are available. This includes windowed operations.
+        /// </para>
+        /// <para>
+        /// Using a <see cref="StrictBufferConfig"/> configuration will instead emit just one result per key for each window, guaranteeing
+        /// to deliver only the final result. This option is suitable for use cases in which the business logic
+        /// requires a hard guarantee that only the final result is propagated. For example, sending alerts.
+        /// </para>
+        /// To accomplish this, the operator will buffer events from the window until the window close (that is,
+        /// until the end-time passes, and additionally until the grace period expires). Since windowed operators
+        /// are required to reject out-of-order events for a window whose grace period is expired, there is an additional
+        /// guarantee that the final results emitted from this suppression will match any queryable state upstream.
+        /// <para>
+        /// Using a <see cref="EagerConfig"/> configuration will define how much space to use for buffering intermediate results.
+        /// It may results multiple values for the same window depending your throughput (# of keys or # of bytes).
+        /// In this case, it can be appropriate to limit the number of downstream records and limiting the memory usage of your application.
+        /// </para>
+        /// </summary>
+        /// <param name="gracePeriod">Define the grace period where your window is considered expired</param>
+        /// <param name="bufferConfig">A configuration specifying how much space to use for buffering intermediate results.</param>
+        /// <typeparam name="K">Key type</typeparam>
+        /// <typeparam name="V">Value type</typeparam>
+        /// <returns>a suppression configuration</returns>
+        public static Suppressed<K, V> UntilWindowClose<K, V>(TimeSpan gracePeriod, IBufferConfig bufferConfig)
+            where K : Windowed => new(
+            null, 
+            gracePeriod, 
+            bufferConfig, 
+            (_, key) => key.Window.EndMs,
+            true);
+            
+        /// <summary>
+        /// Configure the suppression to wait a specific amount of time after receiving a record before emitting it further downstream.
+        /// If another record for the same key arrives in the mean time, it replaces the first record in the buffer but does not re-start the timer.
+        /// </summary>
+        /// <param name="timeToWaitMoreEvents">The amount of time to wait, per record, for new events.</param>
+        /// <param name="bufferConfig">A configuration specifying how much space to use for buffering intermediate results.</param>
+        /// <typeparam name="K">Key type</typeparam>
+        /// <typeparam name="V">Value type</typeparam>
+        /// <returns>a suppression configuration</returns>
+        public static Suppressed<K, V> UntilTimeLimit<K, V>(TimeSpan timeToWaitMoreEvents, IBufferConfig bufferConfig) 
+            => new(
+                null, 
+                timeToWaitMoreEvents, 
+                bufferConfig, 
+                (context, _) => context.Timestamp,
+                false);
+    }
+
+    /// <summary>
+    /// Buffer Full Strategy
+    /// </summary>
+    public enum BUFFER_FULL_STRATEGY
+    {
+        /// <summary>
+        /// Emit early the results
+        /// </summary>
+        EMIT,
+        /// <summary>
+        /// Stop the stream
+        /// </summary>
+        SHUTDOWN
+    }
+    
+    /// <summary>
+    /// Determine the buffer rules for the suppress processor in term of records, cache size and the strategy in case the byffer is full
+    /// </summary>
+    public interface IBufferConfig
+    {
+        /// <summary>
+        /// Define how many records the buffer can accept
+        /// </summary>
+        long MaxRecords{ get; }
+        /// <summary>
+        /// Define how many bytes the buffer can store
+        /// </summary>
+        long MaxBytes { get; }
+        /// <summary>
+        /// Define the buffer full strategy policy
+        /// </summary>
+        BUFFER_FULL_STRATEGY BufferFullStrategy { get; }
+        /// <summary>
+        /// Enable the change logging
+        /// </summary>
+        bool LoggingEnabled { get; }
+        /// <summary>
+        /// Explicit config for the backup changelog topic
+        /// </summary>
+        IDictionary<string, string> Config { get; }
+    }
+    
+    /// <summary>
+    /// Marker class for a buffer configuration that is "strict" in the sense that it will strictly enforce the time bound and never emit early.
+    /// </summary>
+    public class StrictBufferConfig : IBufferConfig
+    {
+        /// <summary>
+        /// Define how many records the buffer can accept
+        /// </summary>
+        public long MaxRecords { get; }
+        /// <summary>
+        /// Define how many bytes the buffer can store
+        /// </summary>
+        public long MaxBytes { get; }
+        /// <summary>
+        /// Define the buffer full strategy policy as "SHUTDOWN"
+        /// </summary>
+        public BUFFER_FULL_STRATEGY BufferFullStrategy => BUFFER_FULL_STRATEGY.SHUTDOWN;
+        /// <summary>
+        /// Enable the change logging
+        /// </summary>
+        public bool LoggingEnabled { get; private set; }
+        /// <summary>
+        /// Explicit config for the backup changelog topic
+        /// </summary>
+        public IDictionary<string, string> Config { get; private set;}
+        
+        private StrictBufferConfig(long maxRecords, long maxBytes)
+        {
+            MaxRecords = maxRecords;
+            MaxBytes = maxBytes;
+            LoggingEnabled = true;
+            Config = new Dictionary<string, string>();
+        }
+        
+        private StrictBufferConfig()
+        {
+            MaxRecords = Int64.MaxValue;
+            MaxBytes = Int64.MaxValue;
+            LoggingEnabled = true;
+            Config = new Dictionary<string, string>();
+        }
+        
+        /// <summary>
+        /// Create a buffer unconstrained by size (either keys or bytes).
+        /// As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
+        /// If there isn't enough resource available to meet the demand, the application will encounter an
+        /// <see cref="OutOfMemoryException"/> and shut down (not guaranteed to be a graceful exit). 
+        /// This is a convenient option if you doubt that your buffer will be that large, but also don't
+        /// wish to pick particular constraints, such as in testing.
+        /// This buffer is "strict" in the sense that it will enforce the time bound or crash.
+        /// It will never emit early.
+        /// </summary>
+        /// <returns></returns>
+        public static StrictBufferConfig Unbounded() => new();
+        
+        /// <summary>
+        /// Create a buffer to gracefully shut down the application when the number of different keys exceed the maximum.
+        /// </summary>
+        /// <param name="maxRecords">Maximum number of different keys</param>
+        /// <returns></returns>
+        public static StrictBufferConfig Bounded(long maxRecords) => new(maxRecords, Int64.MaxValue);
+        
+        /// <summary>
+        /// Create a buffer to gracefully shut down the application when the number of bytes used by the buffer is exceed.
+        /// </summary>
+        /// <param name="maxBytes">Maximum number of bytes</param>
+        /// <returns></returns>
+        public static StrictBufferConfig Bounded(CacheSize maxBytes) => new(Int64.MaxValue, maxBytes.CacheSizeBytes);
+        
+        /// <summary>
+        /// Enable the logging feature
+        /// </summary>
+        /// <param name="config">Optional configuration for the changelog topic</param>
+        /// <returns></returns>
+        public StrictBufferConfig WithLoggingEnabled(IDictionary<string, string> config)
+        {
+            Config = config ?? new Dictionary<string, string>();
+            LoggingEnabled = true;
+            return this;
+        }
+
+        /// <summary>
+        /// Disable the logging feature
+        /// </summary>
+        /// <returns></returns>
+        public StrictBufferConfig WithLoggingDisabled()
+        {
+            Config = null;
+            LoggingEnabled = false;
+            return this;
+        }
+    }
+
+    /// <summary>
+    /// Marker class for a buffer configuration that will strictly enforce size constraints (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate
+    /// results downstream, but does not promise to eliminate them entirely.
+    /// </summary>
+    public class EagerConfig : IBufferConfig
+    {
+        /// <summary>
+        /// Define how many records the buffer can accept
+        /// </summary>
+        public long MaxRecords { get; }
+        /// <summary>
+        /// Define how many bytes the buffer can store
+        /// </summary>
+        public long MaxBytes { get; }
+        /// <summary>
+        /// Define the buffer full strategy policy as "EMIT"
+        /// </summary>
+        public BUFFER_FULL_STRATEGY BufferFullStrategy => BUFFER_FULL_STRATEGY.EMIT;
+        /// <summary>
+        /// Enable the change logging
+        /// </summary>
+        public bool LoggingEnabled { get; private set; }
+        /// <summary>
+        /// Explicit config for the backup changelog topic
+        /// </summary>
+        public IDictionary<string, string> Config { get; private set; }
+
+        private EagerConfig(long maxRecords, long maxBytes)
+        {
+            MaxRecords = maxRecords;
+            MaxBytes = maxBytes;
+            LoggingEnabled = true;
+            Config = new Dictionary<string, string>();
+        }
+
+        /// <summary>
+        /// Create a size-constrained buffer in terms of the maximum number of keys it will store.
+        /// </summary>
+        /// <param name="maxRecords">Maximum number of keys</param>
+        /// <returns></returns>
+        public static EagerConfig EmitEarlyWhenFull(long maxRecords) => new(maxRecords, Int64.MaxValue);
+        
+        /// <summary>
+        /// Create a size-constrained buffer in terms of the maximum number of bytes it will use.
+        /// </summary>
+        /// <param name="maxBytes">Maximum number of bytes</param>
+        /// <returns></returns>
+        public static EagerConfig EmitEarlyWhenFull(CacheSize maxBytes) => new(Int64.MaxValue, maxBytes.CacheSizeBytes);
+        
+        /// <summary>
+        /// Create a size-constrained buffer in terms of the maximum number of keys it will store OR maximum number of bytes it will use.
+        /// </summary>
+        /// <param name="maxRecords">Maximum number of keys</param>
+        /// <param name="maxBytes">Maximum number of bytes</param>
+        /// <returns></returns>
+        public static EagerConfig EmitEarlyWhenFull(long maxRecords, CacheSize maxBytes) => new(maxRecords, maxBytes.CacheSizeBytes);
+
+        /// <summary>
+        /// Enable the logging feature
+        /// </summary>
+        /// <param name="config">Optional configuration for the changelog topic</param>
+        /// <returns></returns>
+        public EagerConfig WithLoggingEnabled(IDictionary<string, string> config)
+        {
+            Config = config ?? new Dictionary<string, string>();
+            LoggingEnabled = true;
+            return this;
+        }
+
+        /// <summary>
+        /// Disable the logging feature
+        /// </summary>
+        /// <returns></returns>
+        public EagerConfig WithLoggingDisabled()
+        {
+            Config = null;
+            LoggingEnabled = false;
+            return this;
+        }
+    }
+    
+    /// <summary>
+    /// Define the behavior of the suppress processor used in <see cref="IKTable{K,V}.Suppress"/>.
+    /// </summary>
+    /// <typeparam name="K">Type of key</typeparam>
+    /// <typeparam name="V">Type of value</typeparam>
+    public class Suppressed<K,V>
+    {
+        /// <summary>
+        /// Used as a grace period
+        /// </summary>
+        public TimeSpan SuppressionTime { get; }
+        
+        /// <summary>
+        /// Define the buffer config policy
+        /// </summary>
+        public IBufferConfig BufferConfig { get; }
+        
+        /// <summary>
+        /// Define the buffer time for each record
+        /// </summary>
+        public Func<ProcessorContext, K, long> TimeDefinition { get; }
+        
+        /// <summary>
+        /// Be careful. It's only safe to drop tombstones for windowed KTables in "final results" mode.
+        /// </summary>
+        public bool SafeToDropTombstones { get; }
+        
+        /// <summary>
+        /// Name of the suppress processor
+        /// </summary>
+        public string Name { get; internal set; }
+        
+        /// <summary>
+        /// (Optional) Key serdes
+        /// </summary>
+        public ISerDes<K> KeySerdes { get; private set; }
+        
+        /// <summary>
+        /// (Optional) Value serdes
+        /// </summary>
+        public ISerDes<V> ValueSerdes { get; private set; }
+        
+        internal Suppressed(
+            string name,
+            TimeSpan suppressionTime,
+            IBufferConfig bufferConfig,
+            Func<ProcessorContext, K, long> timeDefinition,
+            bool safeToDropTombstones)
+        {
+            Name = name;
+            SuppressionTime = suppressionTime;
+            BufferConfig = bufferConfig;
+            TimeDefinition = timeDefinition;
+            SafeToDropTombstones = safeToDropTombstones;
+        }
+
+        /// <summary>
+        /// Define the key serdes
+        /// </summary>
+        /// <param name="keySerdes">key serdes</param>
+        /// <returns></returns>
+        public Suppressed<K, V> WithKeySerdes(ISerDes<K> keySerdes)
+        {
+            KeySerdes = keySerdes;
+            return this;
+        }
+        
+        /// <summary>
+        /// Define the value serdes 
+        /// </summary>
+        /// <param name="valueSerdes">value serdes</param>
+        /// <returns></returns>
+        public Suppressed<K, V> WithValueSerdes(ISerDes<V> valueSerdes)
+        {
+            ValueSerdes = valueSerdes;
+            return this;
+        }
+    }
+}
\ No newline at end of file
diff --git a/docs/stateful-processors.md b/docs/stateful-processors.md
index 3accf30e..9c56a463 100644
--- a/docs/stateful-processors.md
+++ b/docs/stateful-processors.md
@@ -476,4 +476,40 @@ builder.Stream<string, string>("input")
 |     t0     | key: key1 ; value : eventID1  |   key: key1 ; value : eventID1 ; t0    |   key: key1 ; value : eventID1           |
 |     t0+10s | key: key1 ; value : eventID1  |   key: key1 ; value : eventID1 ; t0    |   X                                      |
 |     t0+30s | key: key1 ; value : eventID2  |   key: key1 ; value : eventID2 ; t0+30s|   key: key1 ; value : eventID2           |
-|     t0+95s | key: key1 ; value : eventID2  |   key: key1 ; value : eventID2 ; t0+95s|   key: key1 ; value : eventID2           |
\ No newline at end of file
+|     t0+95s | key: key1 ; value : eventID2  |   key: key1 ; value : eventID2 ; t0+95s|   key: key1 ; value : eventID2           |
+
+## Suppress
+
+In Streamiz, windowed computations update their results continuously. As new data arrives for a window, freshly computed results are emitted downstream. For many applications, this is ideal, since fresh results are always available. and Streamiz is designed to make programming continuous computations seamless. However, some applications need to take action only on the final result of a windowed computation. Common examples of this are sending alerts or delivering results to a system that doesn't support updates.
+
+Suppose that you have an hourly windowed count of events per user. If you want to send an alert when a user has less than three events in an hour, you have a real challenge. All users would match this condition at first, until they accrue enough events, so you cannot simply send an alert when someone matches the condition; you have to wait until you know you won't see any more events for a particular window and then send the alert.
+
+Streamiz offers a clean way to define this logic: after defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed.
+
+Example : 
+
+``` csharp
+var builder = new StreamBuilder();
+        
+builder.Stream("input", new StringSerDes(), new StringSerDes())
+    .GroupByKey()
+    .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1)))
+    .Count(
+        InMemoryWindows
+            .As<string, long>("count-store")
+            .WithKeySerdes(new StringSerDes())
+            .WithValueSerdes(new Int64SerDes()))
+    .Suppress(SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromMinutes(1),
+                StrictBufferConfig.Unbounded()))
+    .ToStream()
+    .To("output", 
+        new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds), 
+        new Int64SerDes());
+```
+
+The key parts of this program are:
+- `TumblingWindowOptions.Of(TimeSpan.FromMinutes(1))` Define a tumbling window of 1 minute
+- `.Suppress(SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.Zero,` This configures the suppression operator to emit nothing for a window until it closes, and then emit the final result without a grace period.
+- `StrictBufferConfig.Unbounded()))` This configures the buffer used for storing events until their windows close.
+
+More details on the `Suppressed` class in the documentation.
\ No newline at end of file
diff --git a/launcher/sample-stream/Program.cs b/launcher/sample-stream/Program.cs
index 9b3556fb..332f4bd4 100644
--- a/launcher/sample-stream/Program.cs
+++ b/launcher/sample-stream/Program.cs
@@ -8,22 +8,18 @@
 using Streamiz.Kafka.Net.Metrics;
 using Streamiz.Kafka.Net.Metrics.Prometheus;
 using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State;
 using Streamiz.Kafka.Net.State.RocksDb;
 using Streamiz.Kafka.Net.Stream;
 using Streamiz.Kafka.Net.Table;
+using RocksDb = Streamiz.Kafka.Net.Table.RocksDb;
 
 namespace sample_stream
 {
     public static class Program
     {
        public static async Task Main(string[] args)
-        {
-            var rocksDbHandler = new BoundMemoryRocksDbConfigHandler()
-                .ConfigureNumThreads(2)
-                .SetCompactionStyle(Compaction.Universal)
-                .SetCompressionType(Compression.Lz4)
-                .LimitTotalMemory(CacheSize.OfMb(40));
-            
+       {
            var config = new StreamConfig<StringSerDes, StringSerDes>{
                 ApplicationId = $"test-app",
                 BootstrapServers = "localhost:9092",
@@ -32,13 +28,9 @@ public static async Task Main(string[] args)
                 {
                     b.AddConsole();
                     b.SetMinimumLevel(LogLevel.Information);
-                }),
-                RocksDbConfigHandler = rocksDbHandler.Handle
+                })
             };
            
-            config.MetricsRecording = MetricsRecordingLevel.DEBUG;
-            config.UsePrometheusReporter(9090, true);
-                   
             var t = BuildTopology();
             var stream = new KafkaStream(t, config);
             
@@ -53,33 +45,14 @@ private static Topology BuildTopology()
         {
             var builder = new StreamBuilder();
             
-            builder.Stream<string, string>("input3")
-                .Peek((k,v,c) => Console.WriteLine($"Key : {k} Context : {c.Topic}:{c.Partition}:{c.Offset}"))
+            builder.Stream<string, string>("input")
                 .GroupByKey()
                 .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1)))
-                .Count(RocksDbWindows.As<string, long>("count-store")
-                    .WithKeySerdes(new StringSerDes())
-                    .WithValueSerdes(new Int64SerDes()))
-                    //.WithCachingEnabled()
+                .Count()
+                .Suppress(SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromMinutes(1), StrictBufferConfig.Unbounded()))
                 .ToStream()
-                .Map((k,v, _) => new KeyValuePair<string,string>(k.ToString(), v.ToString()))
-                .To("output3",
-                    new StringSerDes(),
-                    new StringSerDes());
-
-            /*builder.Stream<string, string>("input3")
-                .Peek((k,v,c) => Console.WriteLine($"Key : {k} Context : {c.Topic}:{c.Partition}:{c.Offset}"))
-                .To("output3",
-                    new StringSerDes(),
-                    new StringSerDes());
-
-
-            /*builder.Stream<string, string>("input")
-                .DropDuplicate((key, value1, value2) => value1.Equals(value2),
-                    TimeSpan.FromMinutes(1))
-                .To(
-                    "output");//, (s, s1, arg3, arg4) => new Partition(0));
-            */
+                .To("output");
+            
             return builder.Build();
         }
     }
diff --git a/roadmap.md b/roadmap.md
index 2b8c635c..f949b496 100644
--- a/roadmap.md
+++ b/roadmap.md
@@ -10,4 +10,7 @@
 - KIP-612 : end-to-end latency metrics
 - KIP-450 : Sliding windows
 - Session Windows
-- Rename WithRecordTimestamp to AlterRecordTimestamp
\ No newline at end of file
+- Rename WithRecordTimestamp to AlterRecordTimestamp
+- Versioned State Store
+- https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join
+- KIP-424 : Allow suppression of intermediate events based on wall clock time
\ No newline at end of file
diff --git a/test/Streamiz.Kafka.Net.Tests/Processors/KTableSuppressTests.cs b/test/Streamiz.Kafka.Net.Tests/Processors/KTableSuppressTests.cs
new file mode 100644
index 00000000..810e4f8c
--- /dev/null
+++ b/test/Streamiz.Kafka.Net.Tests/Processors/KTableSuppressTests.cs
@@ -0,0 +1,325 @@
+using System;
+using System.Globalization;
+using System.Linq;
+using System.Text;
+using Confluent.Kafka;
+using NUnit.Framework;
+using Streamiz.Kafka.Net.Errors;
+using Streamiz.Kafka.Net.Mock;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State;
+using Streamiz.Kafka.Net.Stream;
+using Streamiz.Kafka.Net.Table;
+
+namespace Streamiz.Kafka.Net.Tests.Processors;
+
+public class KTableSuppressTests
+{
+    private StreamConfig streamConfig;
+    private DateTime dt;
+
+    [SetUp]
+    public void Init()
+    {
+        streamConfig = new StreamConfig();
+        streamConfig.ApplicationId = "test-ktable-suppress-test";
+        
+        var c = CultureInfo.CreateSpecificCulture("fr-FR");
+        dt = DateTime.Parse("15/10/2024 08:00:00", c);
+    }
+    
+    private StreamBuilder BuildTopology(Suppressed<Windowed<string>, long> suppressed)
+    {
+        var builder = new StreamBuilder();
+        
+        builder.Stream("input", new StringSerDes(), new StringSerDes())
+            .GroupByKey()
+            .WindowedBy(TumblingWindowOptions.Of(TimeSpan.FromMinutes(1)))
+            .Count(
+                InMemoryWindows
+                    .As<string, long>("count-store")
+                    .WithKeySerdes(new StringSerDes())
+                    .WithValueSerdes(new Int64SerDes()))
+            .Suppress(suppressed
+                .WithKeySerdes(new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds))
+                .WithValueSerdes(new Int64SerDes()))
+            .ToStream()
+            .To("output", 
+                new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds), 
+                new Int64SerDes());
+
+        return builder;
+    }
+    
+    [Test]
+    public void WindowedZeroTimeLimitShouldImmediatelyEmit()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilTimeLimit<Windowed<string>, long>(TimeSpan.Zero,
+                    StrictBufferConfig.Unbounded().WithLoggingEnabled(null)));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", DateTime.Now);
+        inputTopic.PipeInput("key1", "value2", DateTime.Now);
+
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(2, records.Count());
+    }
+
+    [Test]
+    public void IntermediateSuppressionShouldBufferAndEmitLater()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilTimeLimit<Windowed<string>, long>(TimeSpan.FromSeconds(1),
+                    StrictBufferConfig.Unbounded().WithLoggingDisabled()));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+        inputTopic.PipeInput("key1", "value2", dt.AddSeconds(30)); // should process and emit
+
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(1, records.Count());
+        Assert.AreEqual(2, records.First().Message.Value);
+    }
+
+    [Test]
+    public void FinalResultsSuppressionShouldBufferAndEmitAtGraceExpiration()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromMinutes(1),
+                    StrictBufferConfig.Unbounded()));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+        
+        // although the stream time is now 15/10/2024 08:01:00, we have to wait 1 minutes after the window *end* before we
+        // emit "key1", so we don't emit yet.
+        inputTopic.PipeInput("key2", "value1", dt.AddMinutes(1));
+        
+        // ok, now it's time to emit "key1"
+        inputTopic.PipeInput("key3", "value1", dt.AddMinutes(2).AddSeconds(1));
+        
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(1, records.Count());
+        Assert.AreEqual("key1", records.First().Message.Key.Key);
+        Assert.AreEqual(1, records.First().Message.Value);
+    }
+
+    [Test]
+    public void FinalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.Zero,
+                    StrictBufferConfig.Unbounded()));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+        
+        // although the stream time is now 15/10/2024 08:01:00, we have to wait 1 minutes after the window *end* before we
+        // emit "key1", so we don't emit yet.
+        inputTopic.PipeInput("key2", "value1", dt.AddMinutes(1));
+        
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(1, records.Count());
+        Assert.AreEqual("key1", records.First().Message.Key.Key);
+        Assert.AreEqual(1, records.First().Message.Value);
+    }
+
+    [Test]
+    public void FinalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.Zero,
+                    StrictBufferConfig.Unbounded()));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+        
+        // although the stream time is now 15/10/2024 08:01:00, we have to wait 1 minutes after the window *end* before we
+        // emit "key1", so we don't emit yet.
+        inputTopic.PipeInput("key2", "value1", dt.AddMinutes(1));
+        
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(1, records.Count());
+        Assert.AreEqual("key1", records.First().Message.Key.Key);
+        Assert.AreEqual(1, records.First().Message.Value);
+    }
+
+    [Test]
+    public void SuppressShouldEmitWhenOverRecordCapacity()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromDays(100),
+                    EagerConfig.EmitEarlyWhenFull(1)));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+        inputTopic.PipeInput("dummy", "dummy", dt.AddMinutes(2));
+        
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(1, records.Count());
+        Assert.AreEqual("key1", records.First().Message.Key.Key);
+        Assert.AreEqual(1, records.First().Message.Value);
+    }
+
+    [Test]
+    public void SuppressShouldEmitWhenOverByteCapacity()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromDays(100),
+                    EagerConfig.EmitEarlyWhenFull(CacheSize.OfB(80)).WithLoggingDisabled()));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+        inputTopic.PipeInput("dummy", "dummy", dt.AddMinutes(2));
+        
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(1, records.Count());
+        Assert.AreEqual("key1", records.First().Message.Key.Key);
+        Assert.AreEqual(1, records.First().Message.Value);
+    }
+    
+    [Test]
+    public void SuppressShouldEmitWhenOverByteAndRecordCapacity()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromDays(100),
+                    EagerConfig.EmitEarlyWhenFull(3, CacheSize.OfB(120)).WithLoggingEnabled(null)));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+        inputTopic.PipeInput("key1", "value2", dt);
+        inputTopic.PipeInput("key1", "value3", dt);
+        inputTopic.PipeInput("dummy", "dummy", dt.AddMinutes(2));
+        
+        var records = outputTopic.ReadKeyValueList();
+        Assert.AreEqual(1, records.Count());
+        Assert.AreEqual("key1", records.First().Message.Key.Key);
+        Assert.AreEqual(3, records.First().Message.Value);
+    }
+
+    [Test]
+    public void SuppressShouldShutDownWhenOverRecordCapacity()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromDays(100),
+                    StrictBufferConfig.Bounded(1)));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+
+        Assert.Throws<StreamsException>(() => inputTopic.PipeInput("dummy", "dummy", dt.AddMinutes(2)));
+    }
+
+    [Test]
+    public void SuppressShouldShutDownWhenOverByteCapacity()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.FromDays(100),
+                    StrictBufferConfig.Bounded(CacheSize.OfB(80))));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+        
+        inputTopic.PipeInput("key1", "value1", dt);
+
+        Assert.Throws<StreamsException>(() => inputTopic.PipeInput("dummy", "dummy", dt.AddMinutes(2)));
+    }
+    
+    [Test]
+    public void EmitEndWindowWithHeaders()
+    {
+        var builder =
+            BuildTopology(
+                SuppressedBuilder.UntilWindowClose<Windowed<string>, long>(TimeSpan.Zero,
+                    StrictBufferConfig.Unbounded()));
+        
+        using var driver = new TopologyTestDriver(builder.Build(), streamConfig);
+        var inputTopic = driver.CreateInputTopic("input", new StringSerDes(), new StringSerDes());
+        var outputTopic = driver.CreateOuputTopic("output",
+            TimeSpan.FromSeconds(5),
+            new TimeWindowedSerDes<string>(new StringSerDes(), (long)TimeSpan.FromMinutes(1).TotalMilliseconds),
+            new Int64SerDes());
+
+        Headers headers = new Headers();
+        headers.Add("header", Encoding.UTF8.GetBytes("header-value"));
+        
+        inputTopic.PipeInput("key1", "value1", dt, headers);
+        inputTopic.PipeInput("key1", "value2", dt.AddMinutes(1));
+
+        var records = outputTopic.ReadKeyValueList().ToList();
+        Assert.AreEqual(1, records.Count);
+        Assert.AreEqual(1, records[0].Message.Headers.Count);
+        Assert.AreEqual("key1", records[0].Message.Key.Key);
+        Assert.AreEqual(1L, records[0].Message.Value);
+        Assert.AreEqual("header", records[0].Message.Headers[0].Key);
+    }
+}
\ No newline at end of file
diff --git a/test/Streamiz.Kafka.Net.Tests/Stores/InMemoryTimeOrderedKeyValueChangeBufferTests.cs b/test/Streamiz.Kafka.Net.Tests/Stores/InMemoryTimeOrderedKeyValueChangeBufferTests.cs
new file mode 100644
index 00000000..cbc52c0b
--- /dev/null
+++ b/test/Streamiz.Kafka.Net.Tests/Stores/InMemoryTimeOrderedKeyValueChangeBufferTests.cs
@@ -0,0 +1,402 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Confluent.Kafka;
+using NUnit.Framework;
+using Streamiz.Kafka.Net.Metrics;
+using Streamiz.Kafka.Net.Processors.Internal;
+using Streamiz.Kafka.Net.SerDes;
+using Streamiz.Kafka.Net.State.Suppress.Internal;
+using Moq;
+using Streamiz.Kafka.Net.Crosscutting;
+using Streamiz.Kafka.Net.Errors;
+using Streamiz.Kafka.Net.Kafka.Internal;
+using Streamiz.Kafka.Net.Metrics.Internal;
+using Streamiz.Kafka.Net.Mock.Sync;
+using Streamiz.Kafka.Net.Processors;
+using Streamiz.Kafka.Net.Table.Internal;
+
+namespace Streamiz.Kafka.Net.Tests.Stores;
+
+public class InMemoryTimeOrderedKeyValueChangeBufferTests
+{
+    private InMemoryTimeOrderedKeyValueChangeBuffer<string, string> buffer;
+    private ProcessorContext context;
+    private StreamsProducer producer;
+    private Action<ConsumeResult<byte[],byte[]>> restoreCallback;
+
+    private void PutRecord(long streamTime,
+        long recordTimestamp,
+        String key,
+        String value)
+    {
+        RecordContext recordContext = new RecordContext(new Headers(), 0, recordTimestamp, 0, "topic");
+        context.SetRecordMetaData(recordContext);
+        buffer.Put(streamTime, key, new Change<string>(null, value), recordContext);
+    }
+
+    [SetUp]
+    public void Init()
+    {
+        buffer = new InMemoryTimeOrderedKeyValueChangeBuffer<string, string>(
+            "suppress-store",
+            true,
+            new StringSerDes(),
+            new StringSerDes());
+
+        var config = new StreamConfig();
+        config.ApplicationId = "suppress-in-memory-store";
+
+        var metricsRegistry = new StreamMetricsRegistry();
+        
+        var stateManager = new Mock<IStateManager>();
+        stateManager.Setup(s => 
+                s.Register(It.IsAny<IStateStore>(), It.IsAny<Action<ConsumeResult<byte[], byte[]>>>()))
+            .Callback((IStateStore store, Action<ConsumeResult<byte[], byte[]>> callback) =>
+            {
+                restoreCallback = callback;
+            });
+        
+        stateManager.Setup(s => s.GetRegisteredChangelogPartitionFor(It.IsAny<string>()))
+            .Returns((string stateStore) => new TopicPartition(stateStore + "-changelog", 0));
+
+        var supplier = new SyncKafkaSupplier();
+        producer = new StreamsProducer(
+            config,
+            "thread",
+            Guid.NewGuid(),
+            supplier,
+            "");
+            
+        var collector = new RecordCollector(
+            "test-collector",
+            config,
+            new TaskId {Id = 0, Partition = 0},
+            producer,
+            NoRunnableSensor.Empty);
+        
+        var context = new Mock<ProcessorContext>();
+        context.Setup(c => c.Id)
+            .Returns(new TaskId { Id = 0, Partition = 0 });
+        context.Setup(c => c.Configuration)
+            .Returns(config);
+        context.Setup(c => c.Metrics)
+            .Returns(() => metricsRegistry);
+        context.Setup(c => c.States)
+            .Returns(() => stateManager.Object);
+        context.Setup(c => c.RecordCollector)
+            .Returns(() => collector);
+
+        this.context = context.Object;
+        buffer.Init(this.context, buffer);
+    }
+
+    [TearDown]
+    public void Dispose()
+    {
+        buffer.Close();
+    }
+
+    private static BufferValue GetBufferValue(String value, long timestamp) {
+        return new BufferValue(
+            null,
+            null,
+            Encoding.UTF8.GetBytes(value),
+            new RecordContext(new Headers(), 0, timestamp, 0, "topic")
+        );
+    }
+    
+    [Test]
+    public void AcceptDataTest()
+    {
+        PutRecord(0L, 0L, "key1", "value1");
+        Assert.AreEqual(1, buffer.NumRecords);
+    }
+
+    [Test]
+    public void RejectNullValuesTest()
+    {
+        RecordContext recordContext = new RecordContext(new Headers(), 0, 0L, 0, "topic");
+        Assert.Throws<ArgumentNullException>(() => buffer.Put(0L, "key", null, recordContext));
+    }
+
+    [Test]
+    public void RejectNullContextTest()
+    {
+        Assert.Throws<ArgumentNullException>(() => buffer.Put(0L, "key", new Change<string>(null, "value"), null));
+    }
+
+    [Test]
+    public void RemoveDataTest()
+    {
+        PutRecord(0L, 0L, "key1", "value1");
+        Assert.AreEqual(1, buffer.NumRecords);
+        buffer.EvictWhile(() => true, (_, _, _) => { });
+        Assert.AreEqual(0, buffer.NumRecords);
+    }
+    
+    [Test]
+    public void RespectEvictionPredicateTest()
+    {
+        PutRecord(0L, 0L, "key1", "value1");
+        PutRecord(1L, 0L, "key2", "value2");
+        Assert.AreEqual(2, buffer.NumRecords);
+        List<(string, string)> evicted = new();
+        List<(string, string)> expected = new()
+        {
+            ("key1", "value1")
+        };
+        
+        buffer.EvictWhile(() => buffer.NumRecords > 1, (k, v, _) => { evicted.Add((k, v.NewValue));});
+        
+        Assert.AreEqual(1, buffer.NumRecords);
+        Assert.AreEqual(expected, evicted);
+    }
+
+    [Test]
+    public void TrackCountTest()
+    {
+        PutRecord(0L, 0L, "key1", "value1");
+        Assert.AreEqual(1, buffer.NumRecords);
+        PutRecord(1L, 0L, "key2", "value2");
+        Assert.AreEqual(2, buffer.NumRecords);
+        PutRecord(2L, 0L, "key3", "value3");
+        Assert.AreEqual(3, buffer.NumRecords);
+    }
+    
+    [Test]
+    public void TrackSizeTest()
+    {
+        // 8 + 10 (key/value) + 25 (timestamp + tpo + headers) 
+        PutRecord(0L, 0L, "key1", "value1");
+        Assert.AreEqual(43, buffer.BufferSize);
+        // 8 + 6 (key/value) + 25 (timestamp + tpo + headers) 
+        PutRecord(1L, 0L, "key1", "v2");
+        Assert.AreEqual(39, buffer.BufferSize);
+        // 8 + 10 (key/value) + 25 (timestamp + tpo + headers) 
+        PutRecord(2L, 0L, "key3", "value3");
+        Assert.AreEqual(82, buffer.BufferSize);
+    }
+
+    [Test]
+    public void TrackMinTimestampTest()
+    {
+        PutRecord(1L, 0L, "key1", "v1");
+        Assert.AreEqual(1, buffer.MinTimestamp);
+        PutRecord(0L, 0L, "key2", "v2");
+        Assert.AreEqual(0, buffer.MinTimestamp);
+    }
+
+    [Test]
+    public void EvictOldestAndUpdateSizeAndCountAndMinTimestampTest()
+    {
+        PutRecord(1L, 0L, "key1", "12345");
+        Assert.AreEqual(42, buffer.BufferSize);
+        Assert.AreEqual(1, buffer.NumRecords);
+        Assert.AreEqual(1, buffer.MinTimestamp);
+        
+        PutRecord(0L, 0L, "key2", "123");
+        Assert.AreEqual(82, buffer.BufferSize);
+        Assert.AreEqual(2, buffer.NumRecords);
+        Assert.AreEqual(0, buffer.MinTimestamp);
+        
+        int @case = 0;
+        buffer.EvictWhile(() => true, (k, v, r) =>
+        {
+            ++@case;
+            if (@case == 1)
+            {
+                Assert.AreEqual("key2", k);
+                Assert.AreEqual(2, buffer.NumRecords);
+                Assert.AreEqual(82, buffer.BufferSize);
+                Assert.AreEqual(0, buffer.MinTimestamp);
+            }
+            else if (@case == 2)
+            {
+                Assert.AreEqual("key1", k);
+                Assert.AreEqual(1, buffer.NumRecords);
+                Assert.AreEqual(42, buffer.BufferSize);
+                Assert.AreEqual(1, buffer.MinTimestamp);
+            }
+        });
+        
+        Assert.AreEqual(2, @case);
+        Assert.AreEqual(0, buffer.NumRecords);
+        Assert.AreEqual(0, buffer.BufferSize);
+        Assert.AreEqual(long.MaxValue, buffer.MinTimestamp);
+    }
+
+    [Test]
+    public void ReturnUndefinedOnPriorValueForNotBufferedKeyTest()
+    {
+        Assert.IsFalse(buffer.PriorValueForBuffered("ASDF").IsDefined);
+    }
+
+    [Test]
+    public void ReturnPriorValueForBufferedKeyTest()
+    {
+        RecordContext recordContext = new RecordContext(new Headers(), 0, 0L, 0, "topic");
+        context.SetRecordMetaData(recordContext);
+        
+        buffer.Put(1L, "A", new Change<string>("old-value", "new-value"), recordContext);
+        buffer.Put(1L, "B", new Change<string>(null, "new-value"), recordContext);
+
+        Assert.AreEqual("old-value", buffer.PriorValueForBuffered("A").Value.Value);
+        Assert.AreEqual(-1, buffer.PriorValueForBuffered("A").Value.Timestamp);
+    }
+
+    [Test]
+    public void FlushTest()
+    {
+        PutRecord(2L, 0L, "key1", "value1");
+        PutRecord(1L, 1L, "key2", "value2");
+        PutRecord(0L, 2L, "key3", "value3");
+        
+        buffer.EvictWhile(() => buffer.MinTimestamp < 1, (k,v,c) => { });
+        buffer.Flush();
+
+        var messages = ((SyncProducer)producer.Producer)
+            .GetHistory("suppress-store-changelog")
+            .Select(m =>
+            {
+                if (m.Value == null)
+                    return (-1, null);
+                
+                var byteBuffer = ByteBuffer.Build(m.Value, true);
+                var bufferValue = BufferValue.Deserialize(byteBuffer);
+                long timestamp = byteBuffer.GetLong();
+                return (timestamp, bufferValue);
+            })
+            .ToList();
+
+        List<(long, BufferValue)> expectedMsg = new()
+        {
+            (2, GetBufferValue("value1", 0L)),
+            (1, GetBufferValue("value2", 1L)),
+            (-1, null),
+        };
+        
+        Assert.AreEqual(expectedMsg, messages);
+    }
+
+    [Test]
+    public void RestoreTest()
+    {
+        BufferValue GetBufferValueWithHeaders(string v, long timestamp)
+        {
+            var bufferValue = GetBufferValue(v, timestamp);
+            bufferValue.RecordContext.Headers.Add("header", Encoding.UTF8.GetBytes("header-value"));
+            return bufferValue;
+        }
+        
+        var bufferValue1 = GetBufferValueWithHeaders("value1", 0L).Serialize(sizeof(long));
+        bufferValue1.PutLong(0L);
+        var bufferValue2 = GetBufferValueWithHeaders("value2", 1L).Serialize(sizeof(long));
+        bufferValue2.PutLong(1L);
+        var bufferValue3 = GetBufferValueWithHeaders("value3", 2L).Serialize(sizeof(long));
+        bufferValue3.PutLong(2L);
+
+        BufferValue fb2bis = new BufferValue(
+            Encoding.UTF8.GetBytes("value2"),
+            null,
+            Encoding.UTF8.GetBytes("value2bis"),
+            new RecordContext(new Headers(), 0, 3L, 0, "topic"));
+        var bufferValue2Bis = fb2bis.Serialize(sizeof(long));
+        bufferValue2Bis.PutLong(3L);
+
+        var headerSizeBytes = 18L;
+        
+        List<ConsumeResult<byte[], byte[]>> recordsToRestore = new()
+        {
+            new ConsumeResult<byte[], byte[]>
+            {
+                // 4 + 8 + 8 + 5
+                TopicPartitionOffset = new TopicPartitionOffset("topic", 0, 0),
+                Message = new Message<byte[], byte[]>()
+                {
+                    Key = Encoding.UTF8.GetBytes("key1"), // 4
+                    Value = bufferValue1.ToArray(), // 6 + 8
+                    Timestamp = new Timestamp(0L, TimestampType.CreateTime),
+                    Headers = new()
+                },
+                IsPartitionEOF = false
+            },
+            new ConsumeResult<byte[], byte[]>
+            {
+                // 4 + 8 + 8 + 5
+                TopicPartitionOffset = new TopicPartitionOffset("topic", 0, 1),
+                Message = new Message<byte[], byte[]>()
+                {
+                    Key = Encoding.UTF8.GetBytes("key2"), // 4
+                    Value = bufferValue2.ToArray(), // 6 +8
+                    Timestamp = new Timestamp(1L, TimestampType.CreateTime),
+                    Headers = new()
+                },
+                IsPartitionEOF = false
+            },
+            new ConsumeResult<byte[], byte[]>
+            {
+                // 4 + 8 + 8 + 5
+                TopicPartitionOffset = new TopicPartitionOffset("topic", 0, 2),
+                Message = new Message<byte[], byte[]>()
+                {
+                    Key = Encoding.UTF8.GetBytes("key3"), // 4
+                    Value = bufferValue3.ToArray(), // 6 + 8
+                    Timestamp = new Timestamp(2L, TimestampType.CreateTime),
+                    Headers = new()
+                },
+                IsPartitionEOF = false
+            }
+        };
+
+        foreach (var r in recordsToRestore)
+            restoreCallback(r);
+        
+        Assert.AreEqual(3, buffer.NumRecords);
+        Assert.AreEqual(0, buffer.MinTimestamp);
+        Assert.AreEqual(129 + 3*headerSizeBytes, buffer.BufferSize);
+
+        restoreCallback(new ConsumeResult<byte[], byte[]>
+        {
+            // 4 + 8 + 8 + 5
+            TopicPartitionOffset = new TopicPartitionOffset("topic", 0, 2),
+            Message = new Message<byte[], byte[]>()
+            {
+                Key = Encoding.UTF8.GetBytes("key3"), // 4
+                Value = null,
+                Timestamp = new Timestamp(3L, TimestampType.CreateTime),
+                Headers = new()
+            },
+            IsPartitionEOF = false
+        });
+        
+        Assert.AreEqual(2, buffer.NumRecords);
+        Assert.AreEqual(0, buffer.MinTimestamp);
+        Assert.AreEqual(86 + 2*headerSizeBytes, buffer.BufferSize);
+        
+        restoreCallback(new ConsumeResult<byte[], byte[]>
+        {
+            // 4 + 8 + 8 + 5
+            TopicPartitionOffset = new TopicPartitionOffset("topic", 0, 4),
+            Message = new Message<byte[], byte[]>()
+            {
+                Key = Encoding.UTF8.GetBytes("key2"), // 4
+                Value = bufferValue2Bis.ToArray(),
+                Timestamp = new Timestamp(3L, TimestampType.CreateTime),
+                Headers = new()
+            },
+            IsPartitionEOF = false
+        });
+
+        Assert.IsFalse(buffer.PriorValueForBuffered("key3").IsDefined);
+        Assert.Throws<IllegalStateException>(() =>
+        {
+            var value = buffer.PriorValueForBuffered("key3").Value;
+        });
+        Assert.IsTrue(buffer.PriorValueForBuffered("key1").IsDefined);
+        Assert.IsNull(buffer.PriorValueForBuffered("key1").Value);
+        Assert.IsTrue(buffer.PriorValueForBuffered("key2").IsDefined);
+        Assert.AreEqual("value2", buffer.PriorValueForBuffered("key2").Value.Value);
+    }
+}
\ No newline at end of file