diff --git a/platform/src/main/java/org/stellar/anchor/platform/event/CallbackApiEventProcessor.java b/platform/src/main/java/org/stellar/anchor/platform/event/CallbackApiEventProcessor.java new file mode 100644 index 0000000000..adc5a66ee7 --- /dev/null +++ b/platform/src/main/java/org/stellar/anchor/platform/event/CallbackApiEventProcessor.java @@ -0,0 +1,62 @@ +package org.stellar.anchor.platform.event; + +import static java.lang.Thread.currentThread; +import static org.stellar.anchor.util.MetricConstants.TV_BUSINESS_SERVER_CALLBACK; + +import java.io.IOException; +import org.stellar.anchor.api.event.AnchorEvent; +import org.stellar.anchor.event.EventService; +import org.stellar.anchor.util.ExponentialBackoffTimer; +import org.stellar.anchor.util.Log; + +public class CallbackApiEventProcessor extends EventProcessor { + private final CallbackApiEventHandler eventHandler; + private final ExponentialBackoffTimer backoffTimer = new ExponentialBackoffTimer(); + + public CallbackApiEventProcessor( + String name, + EventService.EventQueue eventQueue, + EventService eventService, + CallbackApiEventHandler eventHandler) { + super(name, eventQueue, eventService); + this.eventHandler = eventHandler; + } + + @Override + void handleEventWithRetry(AnchorEvent event) { + boolean isProcessed = false; + // For every event, reset the timer. + getBackoffTimer().reset(); + // Infinite retry until the event is processed or the thread is interrupted. + while (!isProcessed && !currentThread().isInterrupted()) { + try { + if (eventHandler.handleEvent(event)) { + isProcessed = true; + incrementProcessedCounter(); + } else { + try { + getBackoffTimer().backoff(); + } catch (InterruptedException e) { + currentThread().interrupt(); + } + } + } catch (IOException ioex) { + Log.errorEx(ioex); + try { + getBackoffTimer().backoff(); + } catch (InterruptedException e) { + currentThread().interrupt(); + } + } + } + } + + @Override + String toMetricTag(String queueName) { + return TV_BUSINESS_SERVER_CALLBACK; + } + + ExponentialBackoffTimer getBackoffTimer() { + return backoffTimer; + } +} diff --git a/platform/src/main/java/org/stellar/anchor/platform/event/ClientStatusCallbackProcessor.java b/platform/src/main/java/org/stellar/anchor/platform/event/ClientStatusCallbackProcessor.java new file mode 100644 index 0000000000..558089086a --- /dev/null +++ b/platform/src/main/java/org/stellar/anchor/platform/event/ClientStatusCallbackProcessor.java @@ -0,0 +1,111 @@ +package org.stellar.anchor.platform.event; + +import static java.lang.Thread.currentThread; +import static org.stellar.anchor.util.MetricConstants.*; +import static org.stellar.anchor.util.StringHelper.json; + +import java.io.IOException; +import org.stellar.anchor.api.event.AnchorEvent; +import org.stellar.anchor.event.EventService; +import org.stellar.anchor.util.ExponentialBackoffTimer; +import org.stellar.anchor.util.Log; + +public class ClientStatusCallbackProcessor extends EventProcessor { + private final ClientStatusCallbackHandler eventHandler; + // The initial backoff time for connection error. + private final long NETWORK_INITIAL_BACKOFF_TIME_SECONDS = 1; + // The maximum backoff time for connection error. + private final long NETWORK_MAX_BACKOFF_TIME_SECONDS = 30; + // The initial backoff time for HTTP status code other than 200s or 300s. + private final long HTTP_STATUS_INITIAL_BACKOFF_TIME_SECONDS = 1; + // The maximum backoff time for HTTP status code other than 200s or 300s. + private final long HTTP_STATUS_MAX_BACKOFF_TIME_SECONDS = 5; + // The maximum number of retries for HTTP status code other than 200s or 300s. + private final long MAX_RETRIES = 3; + private final ExponentialBackoffTimer networkBackoffTimer = + new ExponentialBackoffTimer( + NETWORK_INITIAL_BACKOFF_TIME_SECONDS, NETWORK_MAX_BACKOFF_TIME_SECONDS); + private final ExponentialBackoffTimer httpErrorBackoffTimer = + new ExponentialBackoffTimer( + HTTP_STATUS_INITIAL_BACKOFF_TIME_SECONDS, HTTP_STATUS_MAX_BACKOFF_TIME_SECONDS); + + protected ClientStatusCallbackProcessor( + String name, + EventService.EventQueue eventQueue, + EventService eventService, + ClientStatusCallbackHandler eventHandler) { + super(name, eventQueue, eventService); + this.eventHandler = eventHandler; + } + + @Override + void handleEventWithRetry(AnchorEvent event) { + boolean isProcessed = false; + int retryAttempts = 0; + // For every event, reset the timer. + getHttpErrorBackoffTimer().reset(); + getNetworkBackoffTimer().reset(); + // Retry until the event is processed or the thread is interrupted. + while (!isProcessed && !currentThread().isInterrupted()) { + try { + if (eventHandler.handleEvent(event)) { + // ***** The event is processed successfully. + isProcessed = true; + incrementProcessedCounter(); + } else { + // ***** Error #2. HTTP status code other than 200s or 300s + if (++retryAttempts < MAX_RETRIES) { + // retry. + try { + getHttpErrorBackoffTimer().backoff(); + } catch (InterruptedException e) { + // The thread is interrupted, so we need to stop the processor. This will + // break the while loop. + currentThread().interrupt(); + } + } else { + // retry >= 3 times, skip the event. + isProcessed = true; + incrementProcessedCounter(); + } + } + } catch (IOException ioex) { + // Retry for connection error + if (++retryAttempts < MAX_RETRIES) { + try { + getNetworkBackoffTimer().backoff(); + } catch (InterruptedException e) { + // The thread is interrupted, so we need to stop the processor. This will + // break the while loop. + currentThread().interrupt(); + } + } else { + isProcessed = true; + incrementProcessedCounter(); + } + } catch (Exception e) { + // ***** Error #3. uncaught exception + sendToDLQ(event, e); + isProcessed = true; + } + } + } + + @Override + String toMetricTag(String queueName) { + return TV_STATUS_CALLBACK; + } + + ExponentialBackoffTimer getHttpErrorBackoffTimer() { + return httpErrorBackoffTimer; + } + + ExponentialBackoffTimer getNetworkBackoffTimer() { + return networkBackoffTimer; + } + + void sendToDLQ(AnchorEvent event, Exception e) { + Log.errorF("Failed to process event: {}", json(event)); + Log.errorEx(e); + } +} diff --git a/platform/src/main/java/org/stellar/anchor/platform/event/EventProcessor.java b/platform/src/main/java/org/stellar/anchor/platform/event/EventProcessor.java new file mode 100644 index 0000000000..e937ee4c5f --- /dev/null +++ b/platform/src/main/java/org/stellar/anchor/platform/event/EventProcessor.java @@ -0,0 +1,93 @@ +package org.stellar.anchor.platform.event; + +import static java.lang.Thread.currentThread; +import static org.stellar.anchor.util.Log.debugF; +import static org.stellar.anchor.util.Log.infoF; +import static org.stellar.anchor.util.MetricConstants.*; + +import io.micrometer.core.instrument.Metrics; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.stellar.anchor.api.event.AnchorEvent; +import org.stellar.anchor.api.exception.AnchorException; +import org.stellar.anchor.event.EventService; +import org.stellar.anchor.event.EventService.EventQueue; +import org.stellar.anchor.platform.utils.DaemonExecutors; +import org.stellar.anchor.util.Log; + +public abstract class EventProcessor implements Runnable { + private final String name; + private final EventQueue eventQueue; + private final EventService eventService; + private final ScheduledExecutorService consumerScheduler = + DaemonExecutors.newScheduledThreadPool(1); + private ScheduledFuture processingTask = null; + // The flag to indicate if the processor is stopped. + private boolean stopped = false; + + protected EventProcessor( + String name, EventService.EventQueue eventQueue, EventService eventService) { + this.name = name; + this.eventQueue = eventQueue; + this.eventService = eventService; + } + + public void start() { + processingTask = consumerScheduler.scheduleWithFixedDelay(this, 1, 2, TimeUnit.SECONDS); + } + + public void stop() throws AnchorException { + stopped = true; + if (processingTask != null) { + processingTask.cancel(true); + } + consumerScheduler.shutdown(); + } + + @SneakyThrows + @Override + public void run() { + infoF( + "The EventProcessor listening task is starting for the {} time.", + getConsumerRestartCount() + 1); + EventService.Session queueSession = eventService.createSession(name, eventQueue); + try { + while (!currentThread().isInterrupted() && !stopped) { + EventService.ReadResponse readResponse = queueSession.read(); + List events = readResponse.getEvents(); + Metrics.counter(EVENT_RECEIVED, QUEUE, toMetricTag(eventQueue.name())) + .increment(events.size()); + debugF("Received {} events from queue", events.size()); + for (AnchorEvent event : events) { + handleEventWithRetry(event); + if (currentThread().isInterrupted()) break; + } + queueSession.ack(readResponse); + } + + queueSession.close(); + } catch (Exception ex) { + // This is unexpected, so we need to restart the consumer. + Log.errorEx(ex); + } finally { + queueSession.close(); + infoF("Closing queue session [{}]", queueSession.getSessionName()); + } + } + + abstract void handleEventWithRetry(AnchorEvent event); + + abstract String toMetricTag(String queueName); + + long getConsumerRestartCount() { + return ((ScheduledThreadPoolExecutor) consumerScheduler).getCompletedTaskCount(); + } + + void incrementProcessedCounter() { + Metrics.counter(EVENT_PROCESSED, QUEUE, toMetricTag(eventQueue.name())).increment(); + } +} diff --git a/platform/src/main/java/org/stellar/anchor/platform/event/EventProcessorManager.java b/platform/src/main/java/org/stellar/anchor/platform/event/EventProcessorManager.java index 3f3e4b88a3..c03c26ea74 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/event/EventProcessorManager.java +++ b/platform/src/main/java/org/stellar/anchor/platform/event/EventProcessorManager.java @@ -1,26 +1,17 @@ package org.stellar.anchor.platform.event; -import static java.lang.Thread.currentThread; import static org.stellar.anchor.event.EventService.*; import static org.stellar.anchor.util.Log.*; import static org.stellar.anchor.util.MetricConstants.*; -import static org.stellar.anchor.util.MetricConstants.EVENT_RECEIVED; import static org.stellar.anchor.util.StringHelper.json; -import io.micrometer.core.instrument.Metrics; -import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.stellar.anchor.MoreInfoUrlConstructor; -import org.stellar.anchor.api.event.AnchorEvent; import org.stellar.anchor.api.exception.AnchorException; import org.stellar.anchor.api.exception.InternalServerErrorException; import org.stellar.anchor.asset.AssetService; @@ -30,11 +21,9 @@ import org.stellar.anchor.platform.config.CallbackApiConfig; import org.stellar.anchor.platform.config.EventProcessorConfig; import org.stellar.anchor.platform.config.PropertyClientsConfig; -import org.stellar.anchor.platform.utils.DaemonExecutors; import org.stellar.anchor.sep24.Sep24TransactionStore; import org.stellar.anchor.sep31.Sep31TransactionStore; import org.stellar.anchor.sep6.Sep6TransactionStore; -import org.stellar.anchor.util.ExponentialBackoffTimer; import org.stellar.anchor.util.Log; public class EventProcessorManager { @@ -85,11 +74,11 @@ public void start() { if (eventProcessorConfig.getCallbackApiRequest().isEnabled()) { // Create a processor for the callback API handler processors.add( - new EventProcessor( + new CallbackApiEventProcessor( CALLBACK_API_EVENT_PROCESSOR_NAME, EventQueue.TRANSACTION, - new CallbackApiEventHandler(callbackApiConfig), - eventService)); + eventService, + new CallbackApiEventHandler(callbackApiConfig))); } // Create a processor of the client status callback handler for each client defined in the // clientsConfig @@ -116,17 +105,17 @@ public void start() { "Unknown client type: " + clientConfig.getType()); } processors.add( - new EventProcessor( + new ClientStatusCallbackProcessor( processorName, EventQueue.TRANSACTION, + eventService, new ClientStatusCallbackHandler( secretConfig, clientConfig, sep6TransactionStore, assetService, sep6MoreInfoUrlConstructor, - sep24MoreInfoUrlConstructor), - eventService)); + sep24MoreInfoUrlConstructor))); } } @@ -145,172 +134,4 @@ public void stop() { } } } - - static class EventProcessor implements Runnable { - // The initial backoff time for connection error. - private final long NETWORK_INITIAL_BACKOFF_TIME_SECONDS = 1; - // The maximum backoff time for connection error. - private final long NETWORK_MAX_BACKOFF_TIME_SECONDS = 30; - // The initial backoff time for HTTP status code other than 200s or 300s. - private final long HTTP_STATUS_INITIAL_BACKOFF_TIME_SECONDS = 1; - // The maximum backoff time for HTTP status code other than 200s or 300s. - private final long HTTP_STATUS_MAX_BACKOFF_TIME_SECONDS = 5; - // The maximum number of retries for HTTP status code other than 200s or 300s. - private final long MAX_RETRIES = 3; - - private final String name; - private final EventQueue eventQueue; - private final EventHandler eventHandler; - private final EventService eventService; - - private final ScheduledExecutorService consumerScheduler = - DaemonExecutors.newScheduledThreadPool(1); - private ScheduledFuture processingTask = null; - private final ExponentialBackoffTimer networkBackoffTimer = - new ExponentialBackoffTimer( - NETWORK_INITIAL_BACKOFF_TIME_SECONDS, NETWORK_MAX_BACKOFF_TIME_SECONDS); - private final ExponentialBackoffTimer httpErrorBackoffTimer = - new ExponentialBackoffTimer( - HTTP_STATUS_INITIAL_BACKOFF_TIME_SECONDS, HTTP_STATUS_MAX_BACKOFF_TIME_SECONDS); - - // The flag to indicate if the processor is stopped. - private boolean stopped = false; - - public EventProcessor( - String name, EventQueue eventQueue, EventHandler eventHandler, EventService eventService) { - this.name = name; - this.eventQueue = eventQueue; - this.eventHandler = eventHandler; - this.eventService = eventService; - } - - public void start() { - processingTask = consumerScheduler.scheduleWithFixedDelay(this, 1, 2, TimeUnit.SECONDS); - } - - public void stop() throws AnchorException { - stopped = true; - - if (processingTask != null) { - processingTask.cancel(true); - } - - consumerScheduler.shutdown(); - } - - @SneakyThrows - @Override - public void run() { - infoF( - "The EventProcessor listening task is starting for the {} time.", - getConsumerRestartCount() + 1); - Session queueSession = eventService.createSession(name, eventQueue); - try { - while (!currentThread().isInterrupted() && !stopped) { - ReadResponse readResponse = queueSession.read(); - List events = readResponse.getEvents(); - Metrics.counter(EVENT_RECEIVED, QUEUE, toMetricTag(eventQueue.name())) - .increment(events.size()); - debugF("Received {} events from queue", events.size()); - for (AnchorEvent event : events) { - handleEvent(event); - if (currentThread().isInterrupted()) break; - } - queueSession.ack(readResponse); - } - - queueSession.close(); - } catch (Exception ex) { - // This is unexpected, so we need to restart the consumer. - Log.errorEx(ex); - } finally { - queueSession.close(); - infoF("Closing queue session [{}]", queueSession.getSessionName()); - } - } - - void handleEvent(AnchorEvent event) { - boolean isProcessed = false; - int retryAttempts = 0; - // For every event, reset the timer. - getHttpErrorBackoffTimer().reset(); - getNetworkBackoffTimer().reset(); - // Retry until the event is processed or the thread is interrupted. - while (!isProcessed && !currentThread().isInterrupted()) { - try { - if (eventHandler.handleEvent(event)) { - // ***** The event is processed successfully. - isProcessed = true; - incrementProcessedCounter(); - } else { - // ***** Error #2. HTTP status code other than 200s or 300s - if (++retryAttempts < MAX_RETRIES) { - // retry. - try { - getHttpErrorBackoffTimer().backoff(); - } catch (InterruptedException e) { - // The thread is interrupted, so we need to stop the processor. This will - // break the while loop. - currentThread().interrupt(); - } - } else { - // retry >= 3 times, skip the event. - isProcessed = true; - incrementProcessedCounter(); - } - } - } catch (IOException ioex) { - // Retry for connection error - if (++retryAttempts < MAX_RETRIES) { - try { - getNetworkBackoffTimer().backoff(); - } catch (InterruptedException e) { - // The thread is interrupted, so we need to stop the processor. This will - // break the while loop. - currentThread().interrupt(); - } - } else { - isProcessed = true; - incrementProcessedCounter(); - } - } catch (Exception e) { - // ***** Error #3. uncaught exception - sendToDLQ(event, e); - isProcessed = true; - } - } - } - - void incrementProcessedCounter() { - Metrics.counter(EVENT_PROCESSED, QUEUE, toMetricTag(eventQueue.name())).increment(); - } - - void sendToDLQ(AnchorEvent event, Exception e) { - Log.errorF("Failed to process event: {}", json(event)); - Log.errorEx(e); - } - - long getConsumerRestartCount() { - return ((ScheduledThreadPoolExecutor) consumerScheduler).getCompletedTaskCount(); - } - - ExponentialBackoffTimer getHttpErrorBackoffTimer() { - return httpErrorBackoffTimer; - } - - ExponentialBackoffTimer getNetworkBackoffTimer() { - return networkBackoffTimer; - } - - private String toMetricTag(String name) { - switch (name) { - case CALLBACK_API_EVENT_PROCESSOR_NAME: - return TV_BUSINESS_SERVER_CALLBACK; - case CLIENT_STATUS_CALLBACK_EVENT_PROCESSOR_NAME_PREFIX: - return TV_STATUS_CALLBACK; - default: - return TV_UNKNOWN; - } - } - } } diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/event/CallbackApiEventProcessorTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/event/CallbackApiEventProcessorTest.kt new file mode 100644 index 0000000000..4191b5e0aa --- /dev/null +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/event/CallbackApiEventProcessorTest.kt @@ -0,0 +1,62 @@ +package org.stellar.anchor.platform.event + +import io.mockk.* +import io.mockk.impl.annotations.MockK +import java.io.IOException +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.stellar.anchor.api.event.AnchorEvent +import org.stellar.anchor.event.EventService +import org.stellar.anchor.util.ExponentialBackoffTimer + +class CallbackApiEventProcessorTest { + @MockK(relaxed = true) lateinit var event: AnchorEvent + @MockK(relaxed = true) lateinit var eventHandler: CallbackApiEventHandler + @MockK(relaxed = true) lateinit var eventService: EventService + @MockK(relaxed = true) lateinit var backoffTimer: ExponentialBackoffTimer + private lateinit var eventProcessor: CallbackApiEventProcessor + + @BeforeEach + fun setup() { + MockKAnnotations.init(this) + eventProcessor = + spyk( + CallbackApiEventProcessor( + "TEST PROCESSOR", + EventService.EventQueue.TRANSACTION, + eventService, + eventHandler + ) + ) + } + + @Test + fun `test event handled successfully without retry`() { + every { eventHandler.handleEvent(event) } returns true + eventProcessor.handleEventWithRetry(event) + verify(exactly = 1) { eventHandler.handleEvent(any()) } + verify(exactly = 1) { eventProcessor.incrementProcessedCounter() } + } + + @ParameterizedTest + @ValueSource(ints = [1, 3, 5, 10, 100]) + fun `test event retry on IOException and stops on InterruptedException`(attempts: Int) { + var counter = attempts + every { eventHandler.handleEvent(event) } answers { throw IOException("Mock exception") } + every { eventProcessor.backoffTimer } returns backoffTimer + every { backoffTimer.backoff() } answers + { + counter-- + if (counter == 0) { + Thread.currentThread().interrupt() + } + } + + eventProcessor.handleEventWithRetry(event) + verify(exactly = 0) { eventProcessor.incrementProcessedCounter() } + verify(exactly = attempts) { eventHandler.handleEvent(event) } + verify(exactly = attempts) { backoffTimer.backoff() } + } +} diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/event/EventProcessorTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/event/ClientStatusCallbackProcessorTest.kt similarity index 88% rename from platform/src/test/kotlin/org/stellar/anchor/platform/event/EventProcessorTest.kt rename to platform/src/test/kotlin/org/stellar/anchor/platform/event/ClientStatusCallbackProcessorTest.kt index a18bdd94c4..2a05ac7231 100644 --- a/platform/src/test/kotlin/org/stellar/anchor/platform/event/EventProcessorTest.kt +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/event/ClientStatusCallbackProcessorTest.kt @@ -12,28 +12,34 @@ import org.stellar.anchor.api.event.AnchorEvent import org.stellar.anchor.api.exception.SepException import org.stellar.anchor.event.EventService import org.stellar.anchor.event.EventService.EventQueue -import org.stellar.anchor.platform.event.EventProcessorManager.EventProcessor import org.stellar.anchor.util.ExponentialBackoffTimer -class EventProcessorTest { +class ClientStatusCallbackProcessorTest { @MockK(relaxed = true) lateinit var event: AnchorEvent - @MockK(relaxed = true) lateinit var eventHandler: EventHandler + @MockK(relaxed = true) lateinit var eventHandler: ClientStatusCallbackHandler @MockK(relaxed = true) lateinit var eventService: EventService @MockK(relaxed = true) lateinit var httpErrorBackoffTimer: ExponentialBackoffTimer @MockK(relaxed = true) lateinit var networkErrorBackoffTimer: ExponentialBackoffTimer - private lateinit var eventProcessor: EventProcessor + private lateinit var eventProcessor: ClientStatusCallbackProcessor @BeforeEach fun setup() { MockKAnnotations.init(this) eventProcessor = - spyk(EventProcessor("TEST PROCESSOR", EventQueue.TRANSACTION, eventHandler, eventService)) + spyk( + ClientStatusCallbackProcessor( + "TEST PROCESSOR", + EventQueue.TRANSACTION, + eventService, + eventHandler + ) + ) } @Test fun `test that the event is not retried if the event handler returns true`() { every { eventHandler.handleEvent(event) } returns true - eventProcessor.handleEvent(event) + eventProcessor.handleEventWithRetry(event) // Check if handleEvent is called only once verify(exactly = 1) { eventHandler.handleEvent(any()) } // Check if incrementProcessCount is called @@ -45,8 +51,7 @@ class EventProcessorTest { every { eventHandler.handleEvent(event) } returns false every { eventProcessor.httpErrorBackoffTimer } returns httpErrorBackoffTimer every { eventProcessor.networkBackoffTimer } returns networkErrorBackoffTimer - - eventProcessor.handleEvent(event) + eventProcessor.handleEventWithRetry(event) // Check if handleEvent is called 3 times verify(exactly = 3) { eventHandler.handleEvent(any()) } @@ -73,7 +78,7 @@ class EventProcessorTest { } } - eventProcessor.handleEvent(event) + eventProcessor.handleEventWithRetry(event) // Check if handleEvent is called `attempts` times verify(atLeast = 1, atMost = 3) { eventHandler.handleEvent(any()) } @@ -104,7 +109,7 @@ class EventProcessorTest { every { eventProcessor.httpErrorBackoffTimer } returns httpErrorBackoffTimer every { eventProcessor.networkBackoffTimer } returns networkErrorBackoffTimer - eventProcessor.handleEvent(event) + eventProcessor.handleEventWithRetry(event) // Check if handleEvent is called 1 times verify(exactly = 1) { eventHandler.handleEvent(any()) }