Skip to content

Commit

Permalink
Add some unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lijamie98 committed Jul 10, 2023
1 parent af7f1b2 commit 7ce3ced
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 49 deletions.
7 changes: 7 additions & 0 deletions core/src/main/java/org/stellar/anchor/event/EventService.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ interface Session {

/** Closes the session. */
void close() throws AnchorException;

/**
* Returns the name of the session.
*
* @return the name of the session.
*/
String getSessionName();
}

interface ReadResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@
package org.stellar.anchor.platform.test

import org.stellar.anchor.api.event.AnchorEvent
import org.stellar.anchor.config.event.QueueConfig.QueueType.KAFKA
import org.stellar.anchor.event.EventService.EventQueue.TRANSACTION
import org.stellar.anchor.platform.TestConfig
import org.stellar.anchor.platform.config.KafkaConfig
import org.stellar.anchor.platform.config.PropertyEventConfig
import org.stellar.anchor.platform.config.PropertyQueueConfig
import org.stellar.anchor.platform.event.DefaultEventService
import org.stellar.anchor.util.GsonUtils
import org.stellar.anchor.util.Sep1Helper

class EventProcessingServerTests(config: TestConfig, toml: Sep1Helper.TomlContent, jwt: String) {
companion object {
val eventConfig =
GsonUtils.getInstance().fromJson(eventConfigJson, PropertyEventConfig::class.java)!!
}
fun testOk() {
val eventConfig = PropertyEventConfig()
eventConfig.isEnabled = true
eventConfig.queue = PropertyQueueConfig()
eventConfig.queue.type = KAFKA
eventConfig.queue.kafka = KafkaConfig()
eventConfig.queue.kafka.clientId = "testOk"
eventConfig.queue.kafka.retries = 0
eventConfig.queue.kafka.batchSize = 10
eventConfig.queue.kafka.lingerMs = 1000
eventConfig.queue.kafka.pollTimeoutSeconds = 10
eventConfig.queue.kafka.bootstrapServer = "kafka:29092"
val eventService = DefaultEventService(eventConfig)
val session = eventService.createSession("testOk", TRANSACTION)
val quoteEvent = GsonUtils.getInstance().fromJson(testQuoteEvent, AnchorEvent::class.java)
Expand Down Expand Up @@ -65,3 +55,22 @@ val testQuoteEvent =
"""
.trimIndent()

val eventConfigJson =
"""
{
"enabled": true,
"queue": {
"type": "KAFKA",
"kafka": {
"bootstrapServer": "kafka:29092",
"clientId": "testOk",
"retries": 0,
"lingerMs": 1000,
"batchSize": 10,
"pollTimeoutSeconds": 10
}
}
}
"""
.trimIndent()
Original file line number Diff line number Diff line change
@@ -1,24 +0,0 @@
{
"type": "QUOTE_CREATED",
"id": "123",
"sep": "38",
"quote": {
"id": "QUOTE-ID-123",
"sell_amount": "103",
"sell_asset": "USDC",
"buy_amount": "100",
"buy_asset": "USD",
"expires_at": "2021-01-01T00:00:00Z",
"price": "1.02",
"total_price": "1.03",
"creator": {
"id": "CREATOR-ID-1234",
"account": "GDQOE23CFSUMSVQK4Y5JHPPYK73VYCNHZHA7ENKCV37P6SUEO6XQBKPP"
},
"created_at": "2021-01-01T00:00:00Z",
"fee": {
"total": "0.01",
"asset": "GDQOE23CFSUMSVQK4Y5JHPPYK73VYCNHZHA7ENKCV37P6SUEO6XQBKPP"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.stellar.anchor.platform.event;

import static org.stellar.anchor.platform.config.ClientsConfig.*;
import static org.stellar.anchor.platform.config.ClientsConfig.ClientType.*;
import static org.stellar.anchor.util.Log.*;

import java.util.ArrayList;
Expand All @@ -23,7 +21,9 @@
import org.stellar.anchor.util.Log;

public class EventProcessorManager {
public static final String CLIENT_STATUS_CALLBACK = "client-status-callback-";
public static final String CLIENT_STATUS_CALLBACK_EVENT_PROCESSOR_NAME_PREFIX =
"client-status-callback-";
public static final String CALLBACK_API_EVENT_PROCESSOR_NAME = "callback-api";
private final EventProcessorConfig eventProcessorConfig;
private final CallbackApiConfig callbackApiConfig;
private final ClientsConfig clientsConfig;
Expand All @@ -48,7 +48,9 @@ public void start() {
// Create a processor for the callback API handler
processors.add(
new EventProcessor(
"callback-api", EventQueue.TRANSACTION, new CallbackApiHandler(callbackApiConfig)));
CALLBACK_API_EVENT_PROCESSOR_NAME,
EventQueue.TRANSACTION,
new CallbackApiHandler(callbackApiConfig)));
// Create a processor of the client status callback handler for each client defined in the
// clientsConfig
if (eventProcessorConfig.getClientStatusCallback().isEnabled()) {
Expand All @@ -59,10 +61,14 @@ public void start() {
String processorName = null;
switch (clientConfig.getType()) {
case CUSTODIAL:
processorName = CLIENT_STATUS_CALLBACK + clientConfig.getDomain();
processorName =
CLIENT_STATUS_CALLBACK_EVENT_PROCESSOR_NAME_PREFIX
+ clientConfig.getDomain();
break;
case NONCUSTODIAL:
processorName = CLIENT_STATUS_CALLBACK + clientConfig.getSigningKey();
processorName =
CLIENT_STATUS_CALLBACK_EVENT_PROCESSOR_NAME_PREFIX
+ clientConfig.getSigningKey();
break;
default:
errorF("Unknown client type: {}", clientConfig.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@

public class KafkaSession implements EventService.Session {

private final KafkaConfig kafkaConfig;
private final String sessionName;
private final String topic;
private Producer<String, AnchorEvent> producer = null;
private Consumer<String, AnchorEvent> consumer = null;
final KafkaConfig kafkaConfig;
final String sessionName;
final String topic;
Producer<String, AnchorEvent> producer = null;
Consumer<String, AnchorEvent> consumer = null;

KafkaSession(KafkaConfig kafkaConfig, String sessionName, EventQueue queue) {
this.kafkaConfig = kafkaConfig;
Expand Down Expand Up @@ -117,6 +117,11 @@ public void close() throws AnchorException {
}
}

@Override
public String getSessionName() {
return sessionName;
}

private Producer<String, AnchorEvent> createProducer() {
Log.debugF("kafkaConfig: {}", kafkaConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public void ack(EventService.ReadResponse readResponse) throws AnchorException {
public void close() throws AnchorException {
debug("Closing NoOpSession class");
}

@Override
public String getSessionName() {
return "NoOpSession";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.stellar.anchor.platform.event

import org.apache.commons.lang3.NotImplementedException
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.stellar.anchor.config.event.QueueConfig.QueueType.*
import org.stellar.anchor.event.EventService.EventQueue.TRANSACTION
import org.stellar.anchor.platform.config.PropertyEventConfig
import org.stellar.anchor.util.GsonUtils

internal class DefaultEventServiceTest {
private lateinit var eventConfig: PropertyEventConfig

@BeforeEach
fun setUp() {
eventConfig =
GsonUtils.getInstance().fromJson(eventConfigJson, PropertyEventConfig::class.java)!!
}

@AfterEach fun tearDown() {}

@Test
fun `test if the create session returns correct Session class`() {
// Test create Kafka session
eventConfig.queue.type = KAFKA
var defaultEventService = DefaultEventService(eventConfig)
var session = defaultEventService.createSession("test", TRANSACTION)
assert(session is KafkaSession)
var kafkaSession: KafkaSession = session as KafkaSession
assertEquals(kafkaSession.topic, "TRANSACTION")

// Test create SQS session should throw not implemented exception
eventConfig.queue.type = SQS
assertThrows<NotImplementedException> { defaultEventService.createSession("test", TRANSACTION) }
// Test create MSK session should throw not implemented exception
eventConfig.queue.type = MSK
assertThrows<NotImplementedException> { defaultEventService.createSession("test", TRANSACTION) }
}

val eventConfigJson =
"""
{
"enabled": true,
"queue": {
"type": "KAFKA",
"kafka": {
"bootstrapServer": "kafka:29092",
"clientId": "testOk",
"retries": 0,
"lingerMs": 1000,
"batchSize": 10,
"pollTimeoutSeconds": 10
}
}
}
"""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.stellar.anchor.platform.event

import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

internal class KafkaSessionTest {
@BeforeEach fun setUp() {}

@AfterEach fun tearDown() {}

@Test fun publish() {}

@Test fun read() {}

@Test fun ack() {}

@Test fun close() {}

@get:Test
val sessionName: Unit
get() {}

@Test fun createConsumer() {}
}

0 comments on commit 7ce3ced

Please sign in to comment.