Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lijamie98 committed Aug 8, 2023
1 parent 17245de commit ac3f2ca
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.stellar.anchor.platform.event;

import static java.lang.Thread.currentThread;
import static org.stellar.anchor.event.EventService.*;
import static org.stellar.anchor.util.Log.*;
import static org.stellar.anchor.util.MetricConstants.*;
Expand Down Expand Up @@ -76,7 +77,8 @@ public void start() {
new EventProcessor(
CALLBACK_API_EVENT_PROCESSOR_NAME,
EventQueue.TRANSACTION,
new CallbackApiEventHandler(callbackApiConfig)));
new CallbackApiEventHandler(callbackApiConfig),
eventService));
}
// Create a processor of the client status callback handler for each client defined in the
// clientsConfig
Expand Down Expand Up @@ -112,7 +114,8 @@ public void start() {
clientConfig,
sep24TransactionStore,
assetService,
moreInfoUrlConstructor)));
moreInfoUrlConstructor),
eventService));
}
}

Expand All @@ -132,7 +135,7 @@ public void stop() {
}
}

class EventProcessor implements Runnable {
static class EventProcessor implements Runnable {
// The initial backoff time for connection error.
private final long NETWORK_INITIAL_BACKOFF_TIME_SECONDS = 1;
// The maximum backoff time for connection error.
Expand All @@ -148,6 +151,7 @@ class EventProcessor implements Runnable {
private final String name;
private final EventQueue eventQueue;
private final EventHandler eventHandler;
private final EventService eventService;

private final ScheduledExecutorService consumerScheduler =
DaemonExecutors.newScheduledThreadPool(1);
Expand All @@ -162,10 +166,12 @@ class EventProcessor implements Runnable {
// The flag to indicate if the processor is stopped.
private boolean stopped = false;

public EventProcessor(String name, EventQueue eventQueue, EventHandler eventHandler) {
public EventProcessor(
String name, EventQueue eventQueue, EventHandler eventHandler, EventService eventService) {
this.name = name;
this.eventQueue = eventQueue;
this.eventHandler = eventHandler;
this.eventService = eventService;
}

public void start() {
Expand All @@ -190,35 +196,16 @@ public void run() {
getConsumerRestartCount() + 1);
Session queueSession = eventService.createSession(name, eventQueue);
try {
while (!Thread.currentThread().isInterrupted() && !stopped) {
while (!currentThread().isInterrupted() && !stopped) {
ReadResponse readResponse = queueSession.read();
List<AnchorEvent> events = readResponse.getEvents();
Metrics.counter(EVENT_RECEIVED, QUEUE, toMetricTag(eventQueue.name()))
.increment(events.size());
debugF("Received {} events from queue", events.size());
events.forEach(
// The event delivery should retry in one of the 3 ways:
//
// Error #1: If connection error (IOException) happens, the events will be retried
// with
// exponential
// backoff timer. All subsequent events will NOT be delivered until the connection
// becomes
// successful.
//
// Error #2: If the business server returns HTTP status code other than 200s or 300s,
// we
// will retry [3] times
// with backoff timer. After 3 retries, we will skip the event and proceed to next
// sub-sequent events.
//
// Error #3: If the event processor encounters an un-expected error caused by the
// un-seen
// bugs, the
// event will be delivered to a no-op DLQ class which has Log.error implementation.
this::handleEvent);
// Do not continue if the thread is interrupted.
if (Thread.currentThread().isInterrupted()) break;
for (AnchorEvent event : events) {
handleEvent(event);
if (currentThread().isInterrupted()) break;
}
queueSession.ack(readResponse);
}

Expand All @@ -232,69 +219,83 @@ public void run() {
}
}

private void handleEvent(AnchorEvent event) {
void handleEvent(AnchorEvent event) {
boolean isProcessed = false;
int httpStatusNotOkAttempts = 0;
// For every event, reset the timer.
httpErrorBackoffTimer.reset();
networkBackoffTimer.reset();
getHttpErrorBackoffTimer().reset();
getNetworkBackoffTimer().reset();
// Retry until the event is processed or the thread is interrupted.
while (!isProcessed || !Thread.currentThread().isInterrupted()) {
while (!isProcessed && !currentThread().isInterrupted()) {
try {
if (eventHandler.handleEvent(event)) {
// ***** The event is processed successfully.
isProcessed = true;
Metrics.counter(EVENT_PROCESSED, QUEUE, toMetricTag(eventQueue.name())).increment();
incrementProcessedCounter();
} else {
// ***** Error #2. HTTP status code other than 200s or 300s
networkBackoffTimer.reset();
httpStatusNotOkAttempts++;
if (httpStatusNotOkAttempts < HTTP_STATUS_MAX_RETRIES) {
// retry.
httpStatusNotOkAttempts++;
httpErrorBackoffTimer.backoff();
getNetworkBackoffTimer().reset();
getHttpErrorBackoffTimer().backoff();
} else {
// retry > 3 times, skip the event.
// retry >= 3 times, skip the event.
isProcessed = true;
incrementProcessedCounter();
}
}
} catch (IOException ioex) {
// ***** Error #1: connection error
httpErrorBackoffTimer.reset();
getHttpErrorBackoffTimer().reset();
httpStatusNotOkAttempts = 0;
try {
networkBackoffTimer.backoff();
getNetworkBackoffTimer().backoff();
} catch (InterruptedException e) {
// The thread is interrupted, so we need to stop the processor. This will
// break the while loop.
isProcessed = false;
currentThread().interrupt();
}
} catch (Exception e) {
// ***** Error #3. uncaught exception
networkBackoffTimer.reset();
httpErrorBackoffTimer.reset();
getNetworkBackoffTimer().reset();
getHttpErrorBackoffTimer().reset();
sendToDLQ(event, e);
isProcessed = true;
}
}
}

private void sendToDLQ(AnchorEvent event, Exception e) {
void incrementProcessedCounter() {
Metrics.counter(EVENT_PROCESSED, QUEUE, toMetricTag(eventQueue.name())).increment();
}

void sendToDLQ(AnchorEvent event, Exception e) {
Log.errorF("Failed to process event: {}", json(event));
Log.errorEx(e);
}

long getConsumerRestartCount() {
return ((ScheduledThreadPoolExecutor) consumerScheduler).getCompletedTaskCount();
}
}

private String toMetricTag(String name) {
switch (name) {
case CALLBACK_API_EVENT_PROCESSOR_NAME:
return TV_BUSINESS_SERVER_CALLBACK;
case CLIENT_STATUS_CALLBACK_EVENT_PROCESSOR_NAME_PREFIX:
return TV_STATUS_CALLBACK;
default:
return TV_UNKNOWN;
ExponentialBackoffTimer getHttpErrorBackoffTimer() {
return httpErrorBackoffTimer;
}

ExponentialBackoffTimer getNetworkBackoffTimer() {
return networkBackoffTimer;
}

private String toMetricTag(String name) {
switch (name) {
case CALLBACK_API_EVENT_PROCESSOR_NAME:
return TV_BUSINESS_SERVER_CALLBACK;
case CLIENT_STATUS_CALLBACK_EVENT_PROCESSOR_NAME_PREFIX:
return TV_STATUS_CALLBACK;
default:
return TV_UNKNOWN;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package org.stellar.anchor.platform.event

import io.mockk.*
import io.mockk.impl.annotations.MockK
import java.io.FileNotFoundException
import java.io.IOException
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.stellar.anchor.api.event.AnchorEvent
import org.stellar.anchor.api.exception.SepException
import org.stellar.anchor.event.EventService
import org.stellar.anchor.event.EventService.EventQueue
import org.stellar.anchor.platform.event.EventProcessorManager.EventProcessor
import org.stellar.anchor.util.ExponentialBackoffTimer

class EventProcessorTest {
@MockK(relaxed = true) lateinit var event: AnchorEvent
@MockK(relaxed = true) lateinit var eventHandler: EventHandler
@MockK(relaxed = true) lateinit var eventService: EventService
@MockK(relaxed = true) lateinit var httpErrorBackoffTimer: ExponentialBackoffTimer
@MockK(relaxed = true) lateinit var networkErrorBackoffTimer: ExponentialBackoffTimer
private lateinit var eventProcessor: EventProcessor

@BeforeEach
fun setup() {
MockKAnnotations.init(this)
eventProcessor =
spyk(EventProcessor("TEST PROCESSOR", EventQueue.TRANSACTION, eventHandler, eventService))
}

@AfterEach
fun tearDown() {
unmockkAll()
}

@Test
fun `test that the event is not retried if the event handler returns true`() {
every { eventHandler.handleEvent(event) } returns true
eventProcessor.handleEvent(event)
// Check if handleEvent is called only once
verify(exactly = 1) { eventHandler.handleEvent(any()) }
// Check if incrementProcessCount is called
verify(exactly = 1) { eventProcessor.incrementProcessedCounter() }
}

@Test
fun `test that when event handler returns false httpErrorBackoffTimer backoff() is called 3 times`() {
every { eventHandler.handleEvent(event) } returns false
every { eventProcessor.httpErrorBackoffTimer } returns httpErrorBackoffTimer
every { eventProcessor.networkBackoffTimer } returns networkErrorBackoffTimer

eventProcessor.handleEvent(event)

// Check if handleEvent is called 3 times
verify(exactly = 3) { eventHandler.handleEvent(any()) }
// Check if the timer is called 2 times when the handleEvent is called 3 times.
verify(exactly = 2) { httpErrorBackoffTimer.backoff() }
// Eventually, we mark it successful
verify(exactly = 1) { eventProcessor.incrementProcessedCounter() }
}

@ParameterizedTest
@ValueSource(ints = [1, 10, 100])
fun `test that when event handler throws IOException, networkBackoffTimer is called many times`(
attempts: Int
) {
var counter = attempts
every { eventHandler.handleEvent(event) } answers { throw IOException("Mock exception") }
every { eventProcessor.httpErrorBackoffTimer } returns httpErrorBackoffTimer
every { eventProcessor.networkBackoffTimer } returns networkErrorBackoffTimer
every { networkErrorBackoffTimer.backoff() } answers
{
counter--
if (counter == 0) {
Thread.currentThread().interrupt()
}
}

eventProcessor.handleEvent(event)

// Check if handleEvent is called `attempts` times
verify(exactly = attempts) { eventHandler.handleEvent(any()) }
// Check if the timer is called `attempts` times when the handleEvent is called 3 times.
verify(exactly = attempts) { networkErrorBackoffTimer.backoff() }
// Make sure the metric does not show it passes
verify(exactly = 0) { eventProcessor.incrementProcessedCounter() }
}

@ParameterizedTest
@ValueSource(
classes = [RuntimeException::class, SepException::class, FileNotFoundException::class]
)
fun `test that when event handler throws uncaught exception, sendToDLQ is called once`(
clz: Class<Throwable>
) {
// Mock handleEvent to throw uncaught exception
every { eventHandler.handleEvent(event) } answers
{
throw clz.getConstructor(String::class.java).newInstance()
}

every { eventProcessor.httpErrorBackoffTimer } returns httpErrorBackoffTimer
every { eventProcessor.networkBackoffTimer } returns networkErrorBackoffTimer

eventProcessor.handleEvent(event)

// Check if handleEvent is called 1 times
verify(exactly = 1) { eventHandler.handleEvent(any()) }
// Check if the timer is called 1 times when the handleEvent is called 3 times.
verify(exactly = 1) { eventProcessor.sendToDLQ(any(), any()) }
// Check that no backoff is called
verify(exactly = 0) { networkErrorBackoffTimer.backoff() }
verify(exactly = 0) { httpErrorBackoffTimer.backoff() }
// Make sure the metric does not show it passes
verify(exactly = 0) { eventProcessor.incrementProcessedCounter() }
}
}

0 comments on commit ac3f2ca

Please sign in to comment.