Skip to content

Commit

Permalink
[ANCHOR-377] Switch business event delivery to infinite retry (#1329)
Browse files Browse the repository at this point in the history
### Description

Separate client event processor from business event processor 
Implement infinite retries with exponential backoff when delivering
events to the business server.

### Testing

- `./gradlew test`
  • Loading branch information
JiahuiWho authored Apr 23, 2024
1 parent 2fea38d commit 76ae0b3
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 195 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.stellar.anchor.platform.event;

import static java.lang.Thread.currentThread;
import static org.stellar.anchor.util.MetricConstants.TV_BUSINESS_SERVER_CALLBACK;

import java.io.IOException;
import org.stellar.anchor.api.event.AnchorEvent;
import org.stellar.anchor.event.EventService;
import org.stellar.anchor.util.ExponentialBackoffTimer;
import org.stellar.anchor.util.Log;

public class CallbackApiEventProcessor extends EventProcessor {
private final CallbackApiEventHandler eventHandler;
private final ExponentialBackoffTimer backoffTimer = new ExponentialBackoffTimer();

public CallbackApiEventProcessor(
String name,
EventService.EventQueue eventQueue,
EventService eventService,
CallbackApiEventHandler eventHandler) {
super(name, eventQueue, eventService);
this.eventHandler = eventHandler;
}

@Override
void handleEventWithRetry(AnchorEvent event) {
boolean isProcessed = false;
// For every event, reset the timer.
getBackoffTimer().reset();
// Infinite retry until the event is processed or the thread is interrupted.
while (!isProcessed && !currentThread().isInterrupted()) {
try {
if (eventHandler.handleEvent(event)) {
isProcessed = true;
incrementProcessedCounter();
} else {
try {
getBackoffTimer().backoff();
} catch (InterruptedException e) {
currentThread().interrupt();
}
}
} catch (IOException ioex) {
Log.errorEx(ioex);
try {
getBackoffTimer().backoff();
} catch (InterruptedException e) {
currentThread().interrupt();
}
}
}
}

@Override
String toMetricTag(String queueName) {
return TV_BUSINESS_SERVER_CALLBACK;
}

ExponentialBackoffTimer getBackoffTimer() {
return backoffTimer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.stellar.anchor.platform.event;

import static java.lang.Thread.currentThread;
import static org.stellar.anchor.util.MetricConstants.*;
import static org.stellar.anchor.util.StringHelper.json;

import java.io.IOException;
import org.stellar.anchor.api.event.AnchorEvent;
import org.stellar.anchor.event.EventService;
import org.stellar.anchor.util.ExponentialBackoffTimer;
import org.stellar.anchor.util.Log;

public class ClientStatusCallbackProcessor extends EventProcessor {
private final ClientStatusCallbackHandler eventHandler;
// The initial backoff time for connection error.
private final long NETWORK_INITIAL_BACKOFF_TIME_SECONDS = 1;
// The maximum backoff time for connection error.
private final long NETWORK_MAX_BACKOFF_TIME_SECONDS = 30;
// The initial backoff time for HTTP status code other than 200s or 300s.
private final long HTTP_STATUS_INITIAL_BACKOFF_TIME_SECONDS = 1;
// The maximum backoff time for HTTP status code other than 200s or 300s.
private final long HTTP_STATUS_MAX_BACKOFF_TIME_SECONDS = 5;
// The maximum number of retries for HTTP status code other than 200s or 300s.
private final long MAX_RETRIES = 3;
private final ExponentialBackoffTimer networkBackoffTimer =
new ExponentialBackoffTimer(
NETWORK_INITIAL_BACKOFF_TIME_SECONDS, NETWORK_MAX_BACKOFF_TIME_SECONDS);
private final ExponentialBackoffTimer httpErrorBackoffTimer =
new ExponentialBackoffTimer(
HTTP_STATUS_INITIAL_BACKOFF_TIME_SECONDS, HTTP_STATUS_MAX_BACKOFF_TIME_SECONDS);

protected ClientStatusCallbackProcessor(
String name,
EventService.EventQueue eventQueue,
EventService eventService,
ClientStatusCallbackHandler eventHandler) {
super(name, eventQueue, eventService);
this.eventHandler = eventHandler;
}

@Override
void handleEventWithRetry(AnchorEvent event) {
boolean isProcessed = false;
int retryAttempts = 0;
// For every event, reset the timer.
getHttpErrorBackoffTimer().reset();
getNetworkBackoffTimer().reset();
// Retry until the event is processed or the thread is interrupted.
while (!isProcessed && !currentThread().isInterrupted()) {
try {
if (eventHandler.handleEvent(event)) {
// ***** The event is processed successfully.
isProcessed = true;
incrementProcessedCounter();
} else {
// ***** Error #2. HTTP status code other than 200s or 300s
if (++retryAttempts < MAX_RETRIES) {
// retry.
try {
getHttpErrorBackoffTimer().backoff();
} catch (InterruptedException e) {
// The thread is interrupted, so we need to stop the processor. This will
// break the while loop.
currentThread().interrupt();
}
} else {
// retry >= 3 times, skip the event.
isProcessed = true;
incrementProcessedCounter();
}
}
} catch (IOException ioex) {
// Retry for connection error
if (++retryAttempts < MAX_RETRIES) {
try {
getNetworkBackoffTimer().backoff();
} catch (InterruptedException e) {
// The thread is interrupted, so we need to stop the processor. This will
// break the while loop.
currentThread().interrupt();
}
} else {
isProcessed = true;
incrementProcessedCounter();
}
} catch (Exception e) {
// ***** Error #3. uncaught exception
sendToDLQ(event, e);
isProcessed = true;
}
}
}

@Override
String toMetricTag(String queueName) {
return TV_STATUS_CALLBACK;
}

ExponentialBackoffTimer getHttpErrorBackoffTimer() {
return httpErrorBackoffTimer;
}

ExponentialBackoffTimer getNetworkBackoffTimer() {
return networkBackoffTimer;
}

void sendToDLQ(AnchorEvent event, Exception e) {
Log.errorF("Failed to process event: {}", json(event));
Log.errorEx(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.stellar.anchor.platform.event;

import static java.lang.Thread.currentThread;
import static org.stellar.anchor.util.Log.debugF;
import static org.stellar.anchor.util.Log.infoF;
import static org.stellar.anchor.util.MetricConstants.*;

import io.micrometer.core.instrument.Metrics;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
import org.stellar.anchor.api.event.AnchorEvent;
import org.stellar.anchor.api.exception.AnchorException;
import org.stellar.anchor.event.EventService;
import org.stellar.anchor.event.EventService.EventQueue;
import org.stellar.anchor.platform.utils.DaemonExecutors;
import org.stellar.anchor.util.Log;

public abstract class EventProcessor implements Runnable {
private final String name;
private final EventQueue eventQueue;
private final EventService eventService;
private final ScheduledExecutorService consumerScheduler =
DaemonExecutors.newScheduledThreadPool(1);
private ScheduledFuture<?> processingTask = null;
// The flag to indicate if the processor is stopped.
private boolean stopped = false;

protected EventProcessor(
String name, EventService.EventQueue eventQueue, EventService eventService) {
this.name = name;
this.eventQueue = eventQueue;
this.eventService = eventService;
}

public void start() {
processingTask = consumerScheduler.scheduleWithFixedDelay(this, 1, 2, TimeUnit.SECONDS);
}

public void stop() throws AnchorException {
stopped = true;
if (processingTask != null) {
processingTask.cancel(true);
}
consumerScheduler.shutdown();
}

@SneakyThrows
@Override
public void run() {
infoF(
"The EventProcessor listening task is starting for the {} time.",
getConsumerRestartCount() + 1);
EventService.Session queueSession = eventService.createSession(name, eventQueue);
try {
while (!currentThread().isInterrupted() && !stopped) {
EventService.ReadResponse readResponse = queueSession.read();
List<AnchorEvent> events = readResponse.getEvents();
Metrics.counter(EVENT_RECEIVED, QUEUE, toMetricTag(eventQueue.name()))
.increment(events.size());
debugF("Received {} events from queue", events.size());
for (AnchorEvent event : events) {
handleEventWithRetry(event);
if (currentThread().isInterrupted()) break;
}
queueSession.ack(readResponse);
}

queueSession.close();
} catch (Exception ex) {
// This is unexpected, so we need to restart the consumer.
Log.errorEx(ex);
} finally {
queueSession.close();
infoF("Closing queue session [{}]", queueSession.getSessionName());
}
}

abstract void handleEventWithRetry(AnchorEvent event);

abstract String toMetricTag(String queueName);

long getConsumerRestartCount() {
return ((ScheduledThreadPoolExecutor) consumerScheduler).getCompletedTaskCount();
}

void incrementProcessedCounter() {
Metrics.counter(EVENT_PROCESSED, QUEUE, toMetricTag(eventQueue.name())).increment();
}
}
Loading

0 comments on commit 76ae0b3

Please sign in to comment.