Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Oct 30, 2024
1 parent f10d75f commit a0352a1
Show file tree
Hide file tree
Showing 18 changed files with 370 additions and 272 deletions.
1 change: 1 addition & 0 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,7 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
oldCfgAppIds,
newCfgAppIds);

// TODO: This should be one call - one QueueUpdateAdvisory for all Apps
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
Expand Down
116 changes: 59 additions & 57 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,31 +138,31 @@ void createQueueUriKey(bmqt::Uri* out,
BSLS_ASSERT_OPT(rc == 0);
}

void afterAppIdRegisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}

void afterAppIdUnregisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}
// void afterAppIdRegisteredDispatched(
// mqbi::Queue* queue,
// const mqbc::ClusterStateQueueInfo::AppInfos& appIdInfos)
//{
// // executed by the *QUEUE DISPATCHER* thread
//
// // PRECONDITIONS
// BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));
//
// queue->queueEngine()->afterAppIdRegistered(
// mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
// }
//
// void afterAppIdUnregisteredDispatched(
// mqbi::Queue* queue,
// const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
//{
// // executed by the *QUEUE DISPATCHER* thread
//
// // PRECONDITIONS
// BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));
//
// queue->queueEngine()->afterAppIdUnregistered(
// mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
// }

void handleHolderDummy(const bsl::shared_ptr<mqbi::QueueHandle>& handle)
{
Expand Down Expand Up @@ -4394,32 +4394,25 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.

mqbi::Storage::AppInfos one(1, d_allocator_p);
one.emplace(*cit);
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
one,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
queue,
*cit),
queue);
}
d_storageManager_p->updateQueueReplica(partitionId,
uri,
qiter->second->key(),
addedAppIds,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::QueueEngine::afterAppIdRegistered,
queue->queueEngine(),
addedAppIds),
queue);
}

for (AppInfosCIter cit = removedAppIds.cbegin();
Expand All @@ -4434,15 +4427,24 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
qiter->second->key(),
cit->second);
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
queue,
*cit),
queue);
}
}

if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::QueueEngine::afterAppIdUnregistered,
queue->queueEngine(),
removedAppIds),
queue);
}

d_storageManager_p->updateQueuePrimary(qiter->second->uri(),
qiter->second->key(),
qiter->second->partitionId(),
addedAppIds,
removedAppIds);
// No need to log in case of failure because 'updateQueuePrimary' does it
// (even in case of success FTM).

bmqu::Printer<AppInfos> printer1(&addedAppIds);
bmqu::Printer<AppInfos> printer2(&removedAppIds);
BALL_LOG_INFO << d_cluster_p->description() << ": Updated queue: " << uri
Expand Down
143 changes: 74 additions & 69 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,32 @@ void queueHolderDummy(const bsl::shared_ptr<mqbi::Queue>& queue)
BALL_LOG_INFO << "Deleted queue '" << queue->uri().canonical() << "'";
}

void afterAppIdRegisteredDispatched(mqbi::Queue* queue,
const bsl::string& appId)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
}

void afterAppIdUnregisteredDispatched(mqbi::Queue* queue,
const bsl::string& appId)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

// Note: Inputing nullKey here is okay since this routine will be removed
// when we switch to CSL workflow.
queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
}
// void afterAppIdRegisteredDispatched(mqbi::Queue* queue,
// const bsl::string& appId)
//{
// // executed by the *QUEUE DISPATCHER* thread
//
// // PRECONDITIONS
// BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));
//
// queue->queueEngine()->afterAppIdRegistered(
// mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
// }

// void afterAppIdUnregisteredDispatched(mqbi::Queue* queue,
// const bsl::string& appId)
//{
// // executed by the *QUEUE DISPATCHER* thread
//
// // PRECONDITIONS
// BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));
//
// // Note: Inputing nullKey here is okay since this routine will be
// removed
// // when we switch to CSL workflow.
// queue->queueEngine()->afterAppIdUnregistered(
// mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
// }

/// Validates an application subscription.
bool validdateSubscriptionExpression(bsl::ostream& errorDescription,
Expand Down Expand Up @@ -491,50 +492,54 @@ int Domain::configure(bsl::ostream& errorDescription,
BSLS_ASSERT_OPT(oldConfig.has_value());
BSLS_ASSERT_OPT(d_config.has_value());

// In non-CSL mode, manually dispatch AppId registration callbacks.
if (!d_cluster_sp->isCSLModeEnabled() &&
d_config.value().mode().isFanoutValue()) {
// Compute list of added and removed App IDs.
bsl::unordered_set<bsl::string> oldCfgAppIds(
oldConfig.value().mode().fanout().appIDs().cbegin(),
oldConfig.value().mode().fanout().appIDs().cend(),
d_allocator_p);
bsl::unordered_set<bsl::string> newCfgAppIds(
d_config.value().mode().fanout().appIDs().cbegin(),
d_config.value().mode().fanout().appIDs().cend(),
d_allocator_p);

bsl::unordered_set<bsl::string> addedIds, removedIds;
mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
&removedIds,
oldCfgAppIds,
newCfgAppIds);

bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);

// Invoke callbacks for each added and removed ID on each queue.
bsl::unordered_set<bsl::string>::const_iterator it =
addedIds.cbegin();
QueueMap::const_iterator qIt;
for (; it != addedIds.cend(); it++) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
qIt->second.get(),
*it),
qIt->second.get());
}
}
for (it = removedIds.cbegin(); it != removedIds.cend(); ++it) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
qIt->second.get(),
*it),
qIt->second.get());
}
}
}
// // In non-CSL mode, manually dispatch AppId registration
// callbacks. if (!d_cluster_sp->isCSLModeEnabled() &&
// d_config.value().mode().isFanoutValue()) {
// // Compute list of added and removed App IDs.
// bsl::unordered_set<bsl::string> oldCfgAppIds(
// oldConfig.value().mode().fanout().appIDs().cbegin(),
// oldConfig.value().mode().fanout().appIDs().cend(),
// d_allocator_p);
// bsl::unordered_set<bsl::string> newCfgAppIds(
// d_config.value().mode().fanout().appIDs().cbegin(),
// d_config.value().mode().fanout().appIDs().cend(),
// d_allocator_p);
//
// bsl::unordered_set<bsl::string> addedIds, removedIds;
// mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
// &removedIds,
// oldCfgAppIds,
// newCfgAppIds);
//
// bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
//
// // Invoke callbacks for each added and removed ID on each
// queue. bsl::unordered_set<bsl::string>::const_iterator it
// =
// addedIds.cbegin();
// QueueMap::const_iterator qIt;
// for (; it != addedIds.cend(); it++) {
// for (qIt = d_queues.cbegin(); qIt != d_queues.cend();
// ++qIt) {
// d_dispatcher_p->execute(
// bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
// qIt->second.get(),
// *it),
// qIt->second.get());
// }
// }
// for (it = removedIds.cbegin(); it != removedIds.cend();
// ++it) {
// for (qIt = d_queues.cbegin(); qIt != d_queues.cend();
// ++qIt) {
// d_dispatcher_p->execute(
// bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
// qIt->second.get(),
// *it),
// qIt->second.get());
// }
// }
// }

// Notify the 'cluster' of the updated configuration, so it can write
// any needed update-advisories to the CSL.
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
d_allocator_p);
}

rc = d_queueEngine_mp->configure(errorDescription);
rc = d_queueEngine_mp->configure(errorDescription, isReconfigure);
if (rc != 0) {
return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN
}
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_queueenginetester.h
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ QueueEngineTester::createQueueEngineHelper(mqbi::QueueEngine* engine)
BSLS_ASSERT_OPT(d_mockQueue_sp);

bmqu::MemOutStream errorDescription(d_allocator_p);
int rc = engine->configure(errorDescription);
int rc = engine->configure(errorDescription, false);
BSLS_ASSERT_OPT(rc == 0);

// Set the engine on the Queue
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,8 @@ RelayQueueEngine::~RelayQueueEngine()

// MANIPULATORS
int RelayQueueEngine::configure(
BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription)
BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription,
BSLS_ANNOTATION_UNUSED bool isReconfigure)
{
return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ class RelayQueueEngine : public mqbi::QueueEngine {

/// Configure this instance. Return zero on success, non-zero value
/// otherwise and populate the specified `errorDescription`.
virtual int
configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE;
virtual int configure(bsl::ostream& errorDescription,
bool isReconfigure) BSLS_KEYWORD_OVERRIDE;

/// Reset the internal state of this engine. If the optionally specified
/// 'keepConfirming' is 'true', keep the data structures for CONFIRMs
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription,
RelayQueueEngine(d_state_p, mqbconfm::Domain(), d_allocator_p),
d_allocator_p);

rc = d_queueEngine_mp->configure(errorDescription);
rc = d_queueEngine_mp->configure(errorDescription, isReconfigure);
if (rc != 0) {
return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN
}
Expand Down Expand Up @@ -246,7 +246,7 @@ int RemoteQueue::configureAsClusterMember(bsl::ostream& errorDescription,

bdlma::LocalSequentialAllocator<1024> localAllocator(d_allocator_p);
bmqu::MemOutStream errorDesc(&localAllocator);
rc = d_queueEngine_mp->configure(errorDesc);
rc = d_queueEngine_mp->configure(errorDesc, isReconfigure);
if (rc != 0) {
BMQTSK_ALARMLOG_ALARM("CLUSTER_STATE")
<< d_state_p->domain()->cluster()->name() << ": Partition ["
Expand Down
Loading

0 comments on commit a0352a1

Please sign in to comment.