Skip to content

Commit

Permalink
Cleaning
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Nov 8, 2024
1 parent 181ecbb commit cdd9c32
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 43 deletions.
12 changes: 4 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1568,11 +1568,9 @@ void RootQueueEngine::afterQueuePurged(const bsl::string& appId,
d_queueState_p->queue()));

if (appKey.isNull()) {
// NOTE: Since in CSL mode when a consumer opens the queue with an
// unauthorized appId, we insert an item having a pair of (appId,
// nullKey) as its key to d_apps. Thus, to avoid accidentally treating
// a nullKey resulting from unauthorized appId as wildcard matching, we
// add an additional assert that the appId must be empty.
// 'mqbu::StorageKey::k_NULL_KEY' indicates the entire queue in which
// case there must be 'bmqp::ProtocolUtil::k_NULL_APP_ID'

BSLS_ASSERT_SAFE(appId == bmqp::ProtocolUtil::k_NULL_APP_ID);

d_storageIter_mp->reset();
Expand Down Expand Up @@ -1979,7 +1977,7 @@ void RootQueueEngine::registerStorage(const bsl::string& appId,
void RootQueueEngine::unregisterStorage(
const bsl::string& appId,
BSLS_ANNOTATION_UNUSED const mqbu::StorageKey& appKey,
unsigned int appOrdinal)
BSLS_ANNOTATION_UNUSED unsigned int appOrdinal)
{
// executed by the *QUEUE DISPATCHER* thread

Expand All @@ -1992,8 +1990,6 @@ void RootQueueEngine::unregisterStorage(

// we still keep the app but invalidate the authorization
iter->second->unauthorize();

(void)appOrdinal;
}

mqbi::StorageResult::Enum RootQueueEngine::evaluateAutoSubscriptions(
Expand Down
10 changes: 5 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,19 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
virtual void
onTimer(bsls::Types::Int64 currentTimer) BSLS_KEYWORD_OVERRIDE;

/// Called after the specified `appIdKeyPair` has been dynamically
/// Called after the specified `addedAppIds` have been dynamically
/// registered.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual void afterAppIdRegistered(
const mqbi::Storage::AppInfos& addedAppIds) BSLS_KEYWORD_OVERRIDE;

/// Called after the specified `appIdKeyPair` has been dynamically
/// Called after the specified `removedAppIds` have been dynamically
/// unregistered.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual void afterAppIdUnregistered(
const mqbi::Storage::AppInfos& appIdKeyPairs) BSLS_KEYWORD_OVERRIDE;
const mqbi::Storage::AppInfos& removedAppIds) BSLS_KEYWORD_OVERRIDE;

/// Called after creation of a new storage for the specified
/// `appIdKeyPair`.
Expand Down Expand Up @@ -443,7 +443,7 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
virtual void
loadInternals(mqbcmd::QueueEngine* out) const BSLS_KEYWORD_OVERRIDE;

/// Log appllication subscription info for the specified `appKey` into the
/// Log application subscription info for the specified `appId` into the
/// specified `stream`.
///
/// THREAD: This method is called from the Queue's
Expand All @@ -453,7 +453,7 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
const bsl::string& appId) const BSLS_KEYWORD_OVERRIDE;

private:
/// Log appllication subscription info for the specified `appState` into
/// Log application subscription info for the specified `appState` into
/// the specified `stream`.
bsl::ostream& logAppSubscriptionInfo(bsl::ostream& stream,
const AppStateSp& appState) const;
Expand Down
1 change: 0 additions & 1 deletion src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,6 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData,
// QueueUpdateAdvisory to indicate that we are updating appIds for the
// entire domain.

bmqt::Uri uriii("bmq://bmq.test.mmap.priority/q1");
bmqp_ctrlmsg::QueueInfoUpdate queueUpdate;
queueUpdate.uri() = "";
queueUpdate.partitionId() = mqbs::DataStore::k_INVALID_PARTITION_ID;
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbi/mqbi_queueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,19 @@ class QueueEngine {
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual void onTimer(bsls::Types::Int64 currentTimer) = 0;

/// Called after the specified `appIdKeyPair` has been dynamically
/// Called after the specified `addedAppIds` have been dynamically
/// registered.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual void
afterAppIdRegistered(const mqbi::Storage::AppInfos& appIdKeyPairs);
afterAppIdRegistered(const mqbi::Storage::AppInfos& addedAppIds);

/// Called after the specified `appIdKeyPair` has been dynamically
/// Called after the specified `removedAppIds` have been dynamically
/// unregistered.
///
/// THREAD: This method is called from the Queue's dispatcher thread.
virtual void
afterAppIdUnregistered(const mqbi::Storage::AppInfos& appIdKeyPairs);
afterAppIdUnregistered(const mqbi::Storage::AppInfos& removedAppIds);

/// Called after creation of a new storage for the specified
/// `appIdKeyPair`.
Expand Down
45 changes: 20 additions & 25 deletions src/integration-tests/test_appids.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

pytestmark = order(3)

authorized_app_ids = ["foo", "bar", "baz"]
default_app_ids = ["foo", "bar", "baz"]
timeout = 60
max_msgs = 3

Expand All @@ -51,14 +51,14 @@ def test_open_alarm_authorize_post(cluster: Cluster):
producer = next(proxies).create_client("producer")
producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True)

all_app_ids = authorized_app_ids + ["quux"]
all_app_ids = default_app_ids + ["quux"]

# ---------------------------------------------------------------------
# Create a consumer for each authorized substream.

consumers = {}

for app_id in authorized_app_ids:
for app_id in default_app_ids:
consumer = next(proxies).create_client(app_id)
consumers[app_id] = consumer
consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True)
Expand Down Expand Up @@ -111,7 +111,7 @@ def test_open_alarm_authorize_post(cluster: Cluster):

# ---------------------------------------------------------------------
# Authorize 'quux'.
set_app_ids(cluster, authorized_app_ids + ["quux"])
set_app_ids(cluster, default_app_ids + ["quux"])

# ---------------------------------------------------------------------
# Check that all substreams are alive.
Expand All @@ -133,7 +133,7 @@ def test_open_alarm_authorize_post(cluster: Cluster):

leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE)
# pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe
for app_id in authorized_app_ids:
for app_id in default_app_ids:
test_logger.info(f"Check if {app_id} has seen 2 messages")
assert wait_until(
lambda: len(
Expand Down Expand Up @@ -171,7 +171,7 @@ def test_create_authorize_open_post(cluster: Cluster):

# ---------------------------------------------------------------------
# Authorize 'quux'.
set_app_ids(cluster, authorized_app_ids + ["quux"])
set_app_ids(cluster, default_app_ids + ["quux"])

# ---------------------------------------------------------------------
# Create a consumer for 'quux. This should succeed.
Expand All @@ -198,7 +198,7 @@ def test_load_domain_authorize_open_post(cluster: Cluster):

# ---------------------------------------------------------------------
# Authorize 'quux'.
set_app_ids(cluster, authorized_app_ids + ["quux"])
set_app_ids(cluster, default_app_ids + ["quux"])

# ---------------------------------------------------------------------
# Create a consumer for 'quux. This should succeed.
Expand All @@ -221,7 +221,7 @@ def _test_authorize_before_domain_loaded(cluster):

# ---------------------------------------------------------------------
# Authorize 'quux'.
set_app_ids(cluster, authorized_app_ids + ["quux"])
set_app_ids(cluster, default_app_ids + ["quux"])

# ---------------------------------------------------------------------
# Create the queue.
Expand Down Expand Up @@ -249,9 +249,9 @@ def _test_command_errors(cluster):
proxies = cluster.proxy_cycle()
next(proxies).create_client("producer")

set_app_ids(cluster, authorized_app_ids + ["quux"])
set_app_ids(cluster, default_app_ids + ["quux"])

set_app_ids(cluster, authorized_app_ids)
set_app_ids(cluster, default_app_ids)


def test_unregister_in_presence_of_queues(cluster: Cluster):
Expand All @@ -276,7 +276,7 @@ def test_unregister_in_presence_of_queues(cluster: Cluster):
# message posted while 'foo' was still valid.
foo.wait_push_event()

set_app_ids(cluster, [a for a in authorized_app_ids if a not in ["foo"]])
set_app_ids(cluster, [a for a in default_app_ids if a not in ["foo"]])

@attempt(3)
def _():
Expand Down Expand Up @@ -320,7 +320,7 @@ def _():
assert Client.e_SUCCESS == foo.close(tc.URI_FANOUT_FOO, block=True)

# Re-authorize
set_app_ids(cluster, authorized_app_ids)
set_app_ids(cluster, default_app_ids)

foo.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
producer.post(tc.URI_FANOUT, ["after-reauthorize"], block=True)
Expand Down Expand Up @@ -408,7 +408,7 @@ def test_unauthorized_appid_doesnt_hold_messages(cluster: Cluster):
# consume all the messages in all the authorized substreams

# pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe
for app_id in authorized_app_ids:
for app_id in default_app_ids:
appid_uri = f"{tc.URI_FANOUT}?id={app_id}"
consumer = next(proxies).create_client(app_id)
consumer.open(appid_uri, flags=["read"], succeed=True)
Expand Down Expand Up @@ -446,7 +446,7 @@ def test_deauthorized_appid_doesnt_hold_messages(cluster: Cluster):

# ---------------------------------------------------------------------
# unauthorize 'bar' and 'baz'
set_app_ids(cluster, [a for a in authorized_app_ids if a not in ["bar", "baz"]])
set_app_ids(cluster, [a for a in default_app_ids if a not in ["bar", "baz"]])

# ---------------------------------------------------------------------
# fill queue to capacity
Expand Down Expand Up @@ -539,21 +539,16 @@ def test_two_consumers_of_unauthorized_app(multi_node: Cluster):
leader.stop()


def set_app_ids(cluster: Cluster, app_ids: List[str]): # noqa: F811
cluster.config.domains[
tc.DOMAIN_FANOUT
].definition.parameters.mode.fanout.app_ids = app_ids # type: ignore
cluster.reconfigure_domain(tc.DOMAIN_FANOUT, succeed=True)


@tweak.cluster.cluster_attributes.is_cslmode_enabled(False)
@tweak.cluster.cluster_attributes.is_fsmworkflow(False)
def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster):
leader = cluster.last_known_leader
proxies = cluster.proxy_cycle()

producer = next(proxies).create_client("producer")
producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True)

all_app_ids = authorized_app_ids + ["quux"]
all_app_ids = default_app_ids + ["quux"]

# ---------------------------------------------------------------------
# Create a consumer for each authorized substream.
Expand All @@ -567,7 +562,7 @@ def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster):

# ---------------------------------------------------------------------
# Authorize 'quux'.
set_app_ids(cluster, authorized_app_ids + ["quux"])
set_app_ids(cluster, default_app_ids + ["quux"])

# ---------------------------------------------------------------------
# Post a message.
Expand All @@ -594,7 +589,7 @@ def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster):
3,
)

# Save one confirm to the storage
# Save one confirm to the storage for 'quux' only
consumers["quux"].confirm(f"{tc.URI_FANOUT}?id=quux", "+1", succeed=True)

for app_id in all_app_ids:
Expand Down Expand Up @@ -625,7 +620,7 @@ def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster):
consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True)

# pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe
for app_id in authorized_app_ids:
for app_id in default_app_ids:
test_logger.info(f"Check if {app_id} has seen 2 messages")
assert wait_until(
lambda: len(
Expand Down

0 comments on commit cdd9c32

Please sign in to comment.