Skip to content

Commit

Permalink
[AMQ-8354] Fix deinitialization of ReplicaBroker.
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaShupletsov committed Feb 28, 2024
1 parent 90ee78a commit 153ae2a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void brokerServiceStarted(ReplicaRole role) {

@Override
public void stop() throws Exception {
logger.info("Stopping Source broker");
logger.info("Stopping broker replication.");
deinitialize();
super.stop();
}
Expand All @@ -109,9 +109,8 @@ public void startAfterRoleChange() throws Exception {
}

void completeBeforeRoleChange() throws Exception {
messageListener.deinitialize();
removeReplicationQueues();
deinitialize();
removeReplicationQueues();
onStopSuccess();
}

Expand All @@ -131,7 +130,7 @@ private void init(ReplicaRole role) {
messageListener = new ReplicaBrokerEventListener(this, queueProvider, periodAcknowledgeCallBack, replicaPolicy, replicaStatistics);
}

private void deinitialize() throws JMSException {
private void deinitialize() throws Exception {
if (replicationScheduledFuture != null) {
replicationScheduledFuture.cancel(true);
}
Expand All @@ -142,13 +141,19 @@ private void deinitialize() throws JMSException {
ActiveMQMessageConsumer consumer = eventConsumer.get();
ActiveMQSession session = connectionSession.get();
ActiveMQConnection brokerConnection = connection.get();
if (consumer != null) {
consumer.setMessageListener(null);
}
if (messageListener != null) {
messageListener.close();
}
if (consumer != null) {
consumer.stop();
consumer.close();
}
if (messageListener != null) {
messageListener.deinitialize();
}
if (session != null) {
session.close();
}
Expand All @@ -162,6 +167,7 @@ private void deinitialize() throws JMSException {
connection.set(null);
replicationScheduledFuture = null;
ackPollerScheduledFuture = null;
messageListener = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,13 @@ private synchronized void processMessageWithRetries(ActiveMQMessage message, Tra
}
return null;
});

ReplicaEventRetrier outerRetrier = replicaEventRetrier.get();
replicaEventRetrier.set(retrier);
try {
retrier.process();
} finally {
replicaEventRetrier.set(null);
replicaEventRetrier.set(outerRetrier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public void process() throws InterruptedException {
Thread.sleep(sleepInterval);
}
}
if (!running.get()) {
throw new InterruptedException("Retried was stopped");
}
}

public void stop() {
Expand Down

0 comments on commit 153ae2a

Please sign in to comment.