diff --git a/.gitignore b/.gitignore index c95e4f75..77a1b76b 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ build TestResults .idea/ -.vscode/ \ No newline at end of file +.vscode/ + +confidential \ No newline at end of file diff --git a/core/Mock/Sync/SyncAdminClient.cs b/core/Mock/Sync/SyncAdminClient.cs index f59a3c16..d6741e10 100644 --- a/core/Mock/Sync/SyncAdminClient.cs +++ b/core/Mock/Sync/SyncAdminClient.cs @@ -2,6 +2,7 @@ using Confluent.Kafka.Admin; using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Streamiz.Kafka.Net.Mock.Kafka; @@ -10,11 +11,13 @@ namespace Streamiz.Kafka.Net.Mock.Sync internal class SyncAdminClient : BasedAdminClient { private readonly SyncProducer producer; + private readonly bool _autoCreateTopic; private AdminClientConfig config; - public SyncAdminClient(SyncProducer producer) + public SyncAdminClient(SyncProducer producer, bool autoCreateTopic) { this.producer = producer; + _autoCreateTopic = autoCreateTopic; } internal void UseConfig(AdminClientConfig config) @@ -66,20 +69,35 @@ public override void Dispose() public override Metadata GetMetadata(string topic, TimeSpan timeout) { var error = new Error(ErrorCode.NoError); - - var brokersMetadata = new List { - new BrokerMetadata(1, "localhost", 9092) - }; - - var partitionsMetadata = new List + var topics = producer.GetAllTopics(); + var brokersMetadata = new List { - new PartitionMetadata(1, 1, new int[1]{1}, new int[1]{1}, error) + new(1, "localhost", 9092) }; + + if (topics.Contains(topic)) + { + var partitionsMetadata = new List + { + new(1, 1, new int[1] { 1 }, new int[1] { 1 }, error) + }; - var topicMetadata = new TopicMetadata(topic, partitionsMetadata, error); + var topicMetadata = new TopicMetadata(topic, partitionsMetadata, error); + + return new Metadata(brokersMetadata, + new List() { topicMetadata }, + 1, "localhost"); + } + if (_autoCreateTopic) + { + // auto create topic if not exist + producer.CreateTopic(topic); + return GetMetadata(topic, timeout); + } + return new Metadata(brokersMetadata, - new List() { topicMetadata }, + new List(), 1, "localhost"); } diff --git a/core/Mock/Sync/SyncKafkaSupplier.cs b/core/Mock/Sync/SyncKafkaSupplier.cs index 03a46da4..d080c60e 100644 --- a/core/Mock/Sync/SyncKafkaSupplier.cs +++ b/core/Mock/Sync/SyncKafkaSupplier.cs @@ -14,10 +14,10 @@ internal class SyncKafkaSupplier : IKafkaSupplier protected SyncProducer producer = null; protected SyncAdminClient admin = null; - public SyncKafkaSupplier() + public SyncKafkaSupplier(bool autoCreateTopic = true) { producer = new SyncProducer(); - admin = new SyncAdminClient(producer); + admin = new SyncAdminClient(producer, autoCreateTopic); } public virtual IAdminClient GetAdmin(AdminClientConfig config) diff --git a/core/Processors/DefaultTopicManager.cs b/core/Processors/DefaultTopicManager.cs index 5fab99a6..364551cc 100644 --- a/core/Processors/DefaultTopicManager.cs +++ b/core/Processors/DefaultTopicManager.cs @@ -49,7 +49,6 @@ async Task> Run() var defaultConfig = new Dictionary(); var topicsNewCreated = new List(); var topicsToCreate = new List(); - var metadata = AdminClient.GetMetadata(timeout); // 1. get source topic partition // 2. check if changelog exist, : @@ -58,6 +57,7 @@ async Task> Run() // 3. if changelog doesn't exist, create it with partition number and configuration foreach (var t in topics) { + var metadata = AdminClient.GetMetadata(t.Key, timeout); var numberPartitions = GetNumberPartitionForTopic(metadata, t.Key); if (numberPartitions == 0) { diff --git a/core/State/RocksDb/RocksDbOptions.cs b/core/State/RocksDb/RocksDbOptions.cs index 24f3f161..5d98510d 100644 --- a/core/State/RocksDb/RocksDbOptions.cs +++ b/core/State/RocksDb/RocksDbOptions.cs @@ -1191,9 +1191,9 @@ public RocksDbOptions SetOptimizeFiltersForHits(int value) /// /// /// the instance of the current object - public RocksDbOptions SetPlainTableFactory(uint p1, int p2, double p3, ulong p4) + public RocksDbOptions SetPlainTableFactory(uint user_key_len, int bloom_bits_per_key, double hash_table_ratio, int index_sparseness, int huge_page_tlb_size, char encoding_type, bool full_scan_mode, bool store_index_in_file) { - columnFamilyOptions.SetPlainTableFactory(p1, p2, p3, p4); + columnFamilyOptions.SetPlainTableFactory(user_key_len, bloom_bits_per_key, hash_table_ratio, index_sparseness, huge_page_tlb_size, encoding_type, full_scan_mode, store_index_in_file); return this; } diff --git a/core/Streamiz.Kafka.Net.csproj b/core/Streamiz.Kafka.Net.csproj index ee73700f..8606b4d9 100644 --- a/core/Streamiz.Kafka.Net.csproj +++ b/core/Streamiz.Kafka.Net.csproj @@ -42,7 +42,7 @@ - + diff --git a/samples/sample-stream/Program.cs b/samples/sample-stream/Program.cs index d6e54372..59468230 100644 --- a/samples/sample-stream/Program.cs +++ b/samples/sample-stream/Program.cs @@ -25,7 +25,6 @@ public static async Task Main(string[] args) PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin, Partitioner = Partitioner.ConsistentRandom, Debug = "broker,topic,msg", - ClientId = "ttoot", Logger = LoggerFactory.Create((b) => { b.AddConsole(); @@ -50,7 +49,7 @@ private static Topology BuildTopology() TimeSpan windowSize = TimeSpan.FromHours(1); var builder = new StreamBuilder(); - /* builder.Stream("input") + builder.Stream("input") .GroupByKey() .WindowedBy(TumblingWindowOptions.Of(windowSize)) .Count(RocksDbWindows.As("count-store") @@ -61,9 +60,8 @@ private static Topology BuildTopology() .Map((k,v) => new KeyValuePair(k.ToString(), v.ToString())) .To("output", new StringSerDes(), - new StringSerDes());*/ + new StringSerDes()); - builder.GlobalTable("global"); return builder.Build(); } diff --git a/test/Streamiz.Kafka.Net.Tests/Private/DefaultTopicManagerTests.cs b/test/Streamiz.Kafka.Net.Tests/Private/DefaultTopicManagerTests.cs index 4551db3f..650441e0 100644 --- a/test/Streamiz.Kafka.Net.Tests/Private/DefaultTopicManagerTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Private/DefaultTopicManagerTests.cs @@ -16,7 +16,7 @@ public class DefaultTopicManagerTests [SetUp] public void Begin() { - kafkaSupplier = new SyncKafkaSupplier(); + kafkaSupplier = new SyncKafkaSupplier(false); } [TearDown] @@ -47,7 +47,8 @@ public void ApplyInternalChangelogTopics() NumberPartitions = 1 }); - var r = manager.ApplyAsync(0, topics).GetAwaiter().GetResult().ToList(); + var r = manager.ApplyAsync(0, topics) + .GetAwaiter().GetResult().ToList(); Assert.AreEqual(2, r.Count); Assert.AreEqual("topic", r[0]); diff --git a/test/Streamiz.Kafka.Net.Tests/Public/KafkaStreamTests.cs b/test/Streamiz.Kafka.Net.Tests/Public/KafkaStreamTests.cs index 681586e6..94f6c47a 100644 --- a/test/Streamiz.Kafka.Net.Tests/Public/KafkaStreamTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Public/KafkaStreamTests.cs @@ -633,7 +633,7 @@ public async Task BuildGlobalStateStore() config.PollMs = 1; var builder = new StreamBuilder(); - builder.GlobalTable("test", InMemory.As("store")); + builder.GlobalTable("test", InMemory.As("store")); var supplier = new SyncKafkaSupplier(); var producer = supplier.GetProducer(new ProducerConfig()); diff --git a/test/Streamiz.Kafka.Net.Tests/Public/RocksDbOptionsTests.cs b/test/Streamiz.Kafka.Net.Tests/Public/RocksDbOptionsTests.cs index 3a3052d7..a29c4ca1 100644 --- a/test/Streamiz.Kafka.Net.Tests/Public/RocksDbOptionsTests.cs +++ b/test/Streamiz.Kafka.Net.Tests/Public/RocksDbOptionsTests.cs @@ -65,11 +65,8 @@ public void Begin() .SetArenaBlockSize(arenaBlockSize) .SetBloomLocality(bloomLocality) .SetBytesPerSync(bytesPerSync) - // .SetCompactionFilter(IntPtr.Zero) - // .SetCompactionFilterFactory(IntPtr.Zero) .SetCompactionReadaheadSize(1200) .SetCompactionStyle(RocksDbSharp.Compaction.Level) - //.SetComparator(IntPtr.Zero) .SetCompression(RocksDbSharp.Compression.Lz4) .SetCompressionOptions(1, 2, 3, 4) .SetCompressionPerLevel(new[] { RocksDbSharp.Compression.Lz4 }, 1) @@ -80,9 +77,7 @@ public void Begin() .SetDeleteObsoleteFilesPeriodMicros(50) .SetDisableAutoCompactions(1) .SetEnableWriteThreadAdaptiveYield(true) - //.SetEnv(IntPtr.Zero) - .SetErrorIfExists() - //.SetFifoCompactionOptions(IntPtr.Zero) + .SetErrorIfExists(false) .SetHardPendingCompactionBytesLimit(1) .SetHashLinkListRep(12) .SetHashSkipListRep(56, 4, 2) @@ -140,7 +135,6 @@ public void Begin() .SetPrefixExtractor(SliceTransform.CreateNoOp()) .SetRecycleLogFileNum(1) .SetReportBgIoStats(true) - .SetPlainTableFactory(1, 23, 4, 2) .SetMaxBackgroundCompactions(1); };