Skip to content

Commit

Permalink
Bug 37512819 - [37512368->22.06.12] Topics: Subscribers are not prope…
Browse files Browse the repository at this point in the history
…rly unsubscribed when the service senior dies

(merge 14.1.1.2206 -> ce/22.06 113841)

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v22.06/": change = 113842]
  • Loading branch information
thegridman committed Jan 29, 2025
1 parent c3ba388 commit 5b033bf
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ public void onUpdate(PagedTopicSubscription subscription)
}
else if (isActive() && f_fDurable)
{
Logger.finest("Disconnecting Subscriber (null channel set) " + this);
Logger.finest("Disconnecting Subscriber (null channel set) " + PagedTopicSubscriberConnector.this);
SubscriberEvent event = new SubscriberEvent(PagedTopicSubscriberConnector.this, SubscriberEvent.Type.Unsubscribed, PagedTopicSubscription.NO_CHANNELS);
event.dispatch(f_listeners);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void shouldRecoverAfterCleanStorageRestart() throws Exception
AtomicBoolean fSubscribed = new AtomicBoolean(false);
AtomicInteger cPublished = new AtomicInteger(0);
AtomicInteger cReceived = new AtomicInteger(0);
int cPubMax = 101;

Map<Message, Subscriber.Element<Message>> mapReceived = new ConcurrentHashMap<>();
Map<Message, Publisher.Status> mapPublished = new ConcurrentHashMap<>();
Expand All @@ -206,7 +207,7 @@ public void shouldRecoverAfterCleanStorageRestart() throws Exception
{
try
{
for (int i = 0; i < 101 && fPublish.get(); i++)
for (int i = 0; i < cPubMax && fPublish.get(); i++)
{
Message message = new Message(i, "Message-" + i);
publisher.publish(message)
Expand Down Expand Up @@ -269,8 +270,9 @@ public void shouldRecoverAfterCleanStorageRestart() throws Exception
try (Subscriber<Message> subscriber = topic.createSubscriber(inGroup(sGroup),
optComplete, withIdentifyingName(sName)))
{
int nMax = (i == cSubscriber ? cPubMax : 5);
Logger.info("Created subscriber " + sName + " " + subscriber);
for (int j = 0; j < 5; j++)
for (int j = 0; j < nMax; j++)
{
CompletableFuture<Subscriber.Element<Message>> future = null;
try
Expand Down

0 comments on commit 5b033bf

Please sign in to comment.