From 0e31c7a62916c252608d1ecd8dcb956c0e584da3 Mon Sep 17 00:00:00 2001 From: Jose Date: Thu, 19 Dec 2024 22:44:56 +0100 Subject: [PATCH 1/7] Fix initialization of DataStorm samples after session recovery - #3056 --- cpp/src/DataStorm/SessionI.cpp | 47 ++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 166e555d49f..c7f304f78d8 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -1134,28 +1134,37 @@ SessionI::subscriberInitialized( out << _id << ": initialized '" << element << "' from 'e" << elementId << '@' << topicId << "'"; } elementSubscriber->initialized = true; - elementSubscriber->lastId = samples.empty() ? 0 : samples.back().id; - vector> samplesI; - samplesI.reserve(samples.size()); - auto sampleFactory = element->getTopic()->getSampleFactory(); - auto keyFactory = element->getTopic()->getKeyFactory(); - for (const auto& sample : samples) + if (samples.empty()) { - assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first); - - samplesI.push_back(sampleFactory->create( - _id, - elementSubscribers->name, - sample.id, - sample.event, - key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue), - subscriber.tags[sample.tag], - sample.value, - sample.timestamp)); - assert(samplesI.back()->key); + return {}; + } + else + { + assert(samples.back().id > elementSubscriber->lastId); + elementSubscriber->lastId = samples.back().id; + + vector> samplesI; + samplesI.reserve(samples.size()); + auto sampleFactory = element->getTopic()->getSampleFactory(); + auto keyFactory = element->getTopic()->getKeyFactory(); + for (const auto& sample : samples) + { + assert((!key && !sample.keyValue.empty()) || key == subscriber.keys[sample.keyId].first); + + samplesI.push_back(sampleFactory->create( + _id, + elementSubscribers->name, + sample.id, + sample.event, + key ? key : keyFactory->decode(_instance->getCommunicator(), sample.keyValue), + subscriber.tags[sample.tag], + sample.value, + sample.timestamp)); + assert(samplesI.back()->key); + } + return samplesI; } - return samplesI; } void From 676ea4fdf485e0c1ac5ad11a2acf908dd166fe65 Mon Sep 17 00:00:00 2001 From: Jose Date: Fri, 20 Dec 2024 10:33:01 +0100 Subject: [PATCH 2/7] Add comment explaining the lastId initialization logic --- cpp/src/DataStorm/SessionI.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index c7f304f78d8..e154c571f55 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -1135,13 +1135,21 @@ SessionI::subscriberInitialized( } elementSubscriber->initialized = true; + // If the samples collection is empty, the element subscriber's lastId remains unchanged: + // - If no samples have been received, lastId is 0. + // - If the element subscriber has been initialized before, lastId represents the ID of the latest received sample. + // + // If the samples collection is not empty: + // - It contains samples queued in the peer writer for the element that are valid according to the element's + // configuration. + // - These samples have not yet been processed by the element subscriber, according to the subscriber's lastId. if (samples.empty()) { return {}; } else { - assert(samples.back().id > elementSubscriber->lastId); + assert(samples.front().id > elementSubscriber->lastId); elementSubscriber->lastId = samples.back().id; vector> samplesI; From a2d70282fcf500662eeb195af447509e3a073ed2 Mon Sep 17 00:00:00 2001 From: Jose Date: Fri, 20 Dec 2024 11:18:53 +0100 Subject: [PATCH 3/7] Session reestablishment test --- cpp/test/DataStorm/reliability/Reader.cpp | 67 ++++++++++++++++++++++- cpp/test/DataStorm/reliability/Writer.cpp | 37 +++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/cpp/test/DataStorm/reliability/Reader.cpp b/cpp/test/DataStorm/reliability/Reader.cpp index 0b0856ef001..5c85686c5c0 100644 --- a/cpp/test/DataStorm/reliability/Reader.cpp +++ b/cpp/test/DataStorm/reliability/Reader.cpp @@ -57,7 +57,7 @@ void ::Reader::run(int argc, char* argv[]) auto connection = node.getSessionConnection(sample.getSession()); while (!connection) { - this_thread::sleep_for(chrono::milliseconds(200)); + this_thread::sleep_for(chrono::milliseconds(10)); connection = node.getSessionConnection(sample.getSession()); } connection->close().get(); @@ -68,6 +68,71 @@ void ::Reader::run(int argc, char* argv[]) writer.update(0); writer.waitForNoReaders(); } + + { + Topic topic(node, "int2"); + auto reader = makeSingleKeyReader(topic, "element", "", config); + string session; + + // Read 100 samples from the "element" key and close the connection. + for (int i = 0; i < 100; ++i) + { + auto sample = reader.getNextUnread(); + if (sample.getValue() != i) + { + cerr << "unexpected sample: " << sample.getValue() << " expected:" << i << endl; + test(false); + } + session = sample.getSession(); + } + + auto connection = node.getSessionConnection(session); + while (!connection) + { + this_thread::sleep_for(chrono::milliseconds(10)); + connection = node.getSessionConnection(session); + } + connection->close().get(); + + // Send a sample to the writer on "reader_barrier" to let it know that the connection was closed. + // The writer will read it after the session is reestablished. + auto writerB = makeSingleKeyWriter(topic, "reader_barrier"); + writerB.waitForReaders(); + writerB.update(0); + + // Wait for the writer to acknowledge the sample send on "reader_barrier" and close the connection again. + auto readerB = makeSingleKeyReader(topic, "writer_barrier"); + [[maybe_unused]] auto _ = readerB.getNextUnread(); + + // Session was reestablish close again + connection = node.getSessionConnection(session); + while (!connection) + { + this_thread::sleep_for(chrono::milliseconds(10)); + connection = node.getSessionConnection(session); + } + connection->close().get(); + + // Let the writer know the connection was closed again, and that it can proceed with the second batch of + // samples. + writerB.update(0); + + for (int i = 0; i < 100; ++i) + { + auto sample = reader.getNextUnread(); + if (sample.getValue() != i + 100) + { + cerr << "unexpected sample: " << sample.getValue() << " expected:" << (i + 100) << endl; + test(false); + } + session = sample.getSession(); + } + + // Let the writer know we have processed all samples. + writerB.waitForReaders(); + writerB.update(0); + writerB.waitForNoReaders(); + } } DEFINE_TEST(::Reader) diff --git a/cpp/test/DataStorm/reliability/Writer.cpp b/cpp/test/DataStorm/reliability/Writer.cpp index e0517fd641b..704db8a2498 100644 --- a/cpp/test/DataStorm/reliability/Writer.cpp +++ b/cpp/test/DataStorm/reliability/Writer.cpp @@ -54,6 +54,43 @@ void ::Writer::run(int argc, char* argv[]) [[maybe_unused]] auto _ = makeSingleKeyReader(topic, "barrier").getNextUnread(); } cout << "ok" << endl; + + // Publish a batch of samples to a topic's key, follow by two consecutive session recovery events without writer + // activity on the given key. + // Then send a second batch of samples to the same topic's key and ensure the reader continue reading from when it + // left off. + cout << "testing reader multiple connection closure without writer activity... " << flush; + { + Topic topic(node, "int2"); + auto writer = makeSingleKeyWriter(topic, "element", "", config); + writer.waitForReaders(); + for (int i = 0; i < 100; ++i) + { + writer.update(i); + } + + auto readerB = makeSingleKeyReader(topic, "reader_barrier"); + + // A control sample send by the reader to let the writer know the connection was closed. The writer process this + // sample after the first session reestablishment. + auto sample = readerB.getNextUnread(); + + // Send a control sample to let the reader know session was reestablished. + auto writerB = makeSingleKeyWriter(topic, "writer_barrier"); + writerB.update(0); + + // Wait for a second control sample from the reader indicating the second session closure. The writer process + // this sample after the second session reestablishment. + sample = readerB.getNextUnread(); + + // Session has been reestablish twice without activity in "element" key. Send the second batch of samples. + for (int i = 0; i < 100; ++i) + { + writer.update(i + 100); + } + sample = readerB.getNextUnread(); + } + cout << "ok" << endl; } DEFINE_TEST(::Writer) From 476a67a567e6421c6da46cec6864ffaccbd63eae Mon Sep 17 00:00:00 2001 From: Jose Date: Fri, 20 Dec 2024 15:24:46 +0100 Subject: [PATCH 4/7] Update cpp/test/DataStorm/reliability/Reader.cpp Co-authored-by: Joe George --- cpp/test/DataStorm/reliability/Reader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/test/DataStorm/reliability/Reader.cpp b/cpp/test/DataStorm/reliability/Reader.cpp index 5c85686c5c0..3924cecdf7f 100644 --- a/cpp/test/DataStorm/reliability/Reader.cpp +++ b/cpp/test/DataStorm/reliability/Reader.cpp @@ -104,7 +104,7 @@ void ::Reader::run(int argc, char* argv[]) auto readerB = makeSingleKeyReader(topic, "writer_barrier"); [[maybe_unused]] auto _ = readerB.getNextUnread(); - // Session was reestablish close again + // Session was reestablished; close it again. connection = node.getSessionConnection(session); while (!connection) { From 6d7c0088504804b9b25221da936b48250ce1dc67 Mon Sep 17 00:00:00 2001 From: Jose Date: Fri, 20 Dec 2024 15:27:31 +0100 Subject: [PATCH 5/7] Update cpp/test/DataStorm/reliability/Writer.cpp Co-authored-by: Joe George --- cpp/test/DataStorm/reliability/Writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/test/DataStorm/reliability/Writer.cpp b/cpp/test/DataStorm/reliability/Writer.cpp index 704db8a2498..62eb40856e7 100644 --- a/cpp/test/DataStorm/reliability/Writer.cpp +++ b/cpp/test/DataStorm/reliability/Writer.cpp @@ -71,7 +71,7 @@ void ::Writer::run(int argc, char* argv[]) auto readerB = makeSingleKeyReader(topic, "reader_barrier"); - // A control sample send by the reader to let the writer know the connection was closed. The writer process this + // A control sample sent by the reader to let the writer know the connection was closed. The writer processes this // sample after the first session reestablishment. auto sample = readerB.getNextUnread(); From 9a8226d2edeb8bf11ebde720e2dea8a0edbc3b81 Mon Sep 17 00:00:00 2001 From: Jose Date: Fri, 20 Dec 2024 16:47:17 +0100 Subject: [PATCH 6/7] Review fixes --- cpp/test/DataStorm/reliability/Reader.cpp | 1 - cpp/test/DataStorm/reliability/Writer.cpp | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/test/DataStorm/reliability/Reader.cpp b/cpp/test/DataStorm/reliability/Reader.cpp index 3924cecdf7f..94c94f9b368 100644 --- a/cpp/test/DataStorm/reliability/Reader.cpp +++ b/cpp/test/DataStorm/reliability/Reader.cpp @@ -125,7 +125,6 @@ void ::Reader::run(int argc, char* argv[]) cerr << "unexpected sample: " << sample.getValue() << " expected:" << (i + 100) << endl; test(false); } - session = sample.getSession(); } // Let the writer know we have processed all samples. diff --git a/cpp/test/DataStorm/reliability/Writer.cpp b/cpp/test/DataStorm/reliability/Writer.cpp index 62eb40856e7..01b0a5a406e 100644 --- a/cpp/test/DataStorm/reliability/Writer.cpp +++ b/cpp/test/DataStorm/reliability/Writer.cpp @@ -71,8 +71,8 @@ void ::Writer::run(int argc, char* argv[]) auto readerB = makeSingleKeyReader(topic, "reader_barrier"); - // A control sample sent by the reader to let the writer know the connection was closed. The writer processes this - // sample after the first session reestablishment. + // A control sample sent by the reader to let the writer know the connection was closed. The writer processes + // this sample after the first session reestablishment. auto sample = readerB.getNextUnread(); // Send a control sample to let the reader know session was reestablished. From 4cc3ba74ad183eca8316540d32fd35453556a990 Mon Sep 17 00:00:00 2001 From: Jose Date: Sun, 22 Dec 2024 12:57:48 +0100 Subject: [PATCH 7/7] review fixes --- cpp/test/DataStorm/reliability/Reader.cpp | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/cpp/test/DataStorm/reliability/Reader.cpp b/cpp/test/DataStorm/reliability/Reader.cpp index 94c94f9b368..d6fdf4b76cc 100644 --- a/cpp/test/DataStorm/reliability/Reader.cpp +++ b/cpp/test/DataStorm/reliability/Reader.cpp @@ -87,11 +87,7 @@ void ::Reader::run(int argc, char* argv[]) } auto connection = node.getSessionConnection(session); - while (!connection) - { - this_thread::sleep_for(chrono::milliseconds(10)); - connection = node.getSessionConnection(session); - } + test(connection); connection->close().get(); // Send a sample to the writer on "reader_barrier" to let it know that the connection was closed. @@ -106,11 +102,7 @@ void ::Reader::run(int argc, char* argv[]) // Session was reestablished; close it again. connection = node.getSessionConnection(session); - while (!connection) - { - this_thread::sleep_for(chrono::milliseconds(10)); - connection = node.getSessionConnection(session); - } + test(connection); connection->close().get(); // Let the writer know the connection was closed again, and that it can proceed with the second batch of