Skip to content

Commit

Permalink
Merge branch 'develop' into feature/anchor-378-retry-strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
reecexlm authored Aug 9, 2023
2 parents ac3f2ca + 4a5f8d4 commit ad9cac8
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import org.stellar.anchor.api.platform.HealthCheckStatus;
import org.stellar.anchor.healthcheck.HealthCheckable;
import org.stellar.anchor.reference.config.KafkaListenerSettings;
import org.stellar.anchor.util.GsonUtils;
import org.stellar.anchor.util.Log;

public class KafkaListener extends AbstractEventListener implements HealthCheckable {
private final KafkaListenerSettings kafkaListenerSettings;
private final AnchorEventProcessor processor;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Consumer<String, AnchorEvent> consumer;
private Consumer<String, String> consumer;

public KafkaListener(
KafkaListenerSettings kafkaListenerSettings, AnchorEventProcessor processor) {
Expand All @@ -56,7 +57,7 @@ public void stop() {
executor.shutdown();
}

Consumer<String, AnchorEvent> createKafkaConsumer() {
Consumer<String, String> createKafkaConsumer() {
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerSettings.getBootStrapServer());
Expand All @@ -66,7 +67,7 @@ Consumer<String, AnchorEvent> createKafkaConsumer() {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (kafkaListenerSettings.isUseIAM()) {
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "AWS_MSK_IAM");
Expand All @@ -82,7 +83,7 @@ Consumer<String, AnchorEvent> createKafkaConsumer() {
public void listen() {
Log.info("Kafka event consumer server started ");

Consumer<String, AnchorEvent> consumer = createKafkaConsumer();
Consumer<String, String> consumer = createKafkaConsumer();

KafkaListenerSettings.Queues q = kafkaListenerSettings.getEventTypeToQueue();
consumer.subscribe(
Expand All @@ -96,12 +97,12 @@ public void listen() {

while (!Thread.interrupted()) {
try {
ConsumerRecords<String, AnchorEvent> consumerRecords =
consumer.poll(Duration.ofSeconds(10));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
Log.info(String.format("Messages received: %s", consumerRecords.count()));
consumerRecords.forEach(
record -> {
AnchorEvent event = record.value();
AnchorEvent event =
GsonUtils.getInstance().fromJson(record.value(), AnchorEvent.class);
processor.handleEvent(event);
});
} catch (Exception ex) {
Expand Down Expand Up @@ -149,7 +150,7 @@ public HealthCheckResult check() {
}

boolean validateKafka() {
try (Consumer<String, AnchorEvent> csm = createKafkaConsumer()) {
try (Consumer<String, String> csm = createKafkaConsumer()) {
csm.listTopics();
return true;
} catch (Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.stellar.anchor.api.event;

import com.fasterxml.jackson.annotation.JsonValue;
import lombok.*;
import org.stellar.anchor.api.platform.GetQuoteResponse;
import org.stellar.anchor.api.platform.GetTransactionResponse;
Expand All @@ -27,11 +26,10 @@ public class AnchorEvent {
public enum Type {
TRANSACTION_CREATED("transaction_created"),
TRANSACTION_STATUS_CHANGED("transaction_status_changed"),
@SuppressWarnings("unused")
TRANSACTION_ERROR("transaction_error"),
QUOTE_CREATED("quote_created");

@JsonValue public final String type;
public final String type;

Type(String type) {
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions
import org.junit.jupiter.api.Assertions.assertNotNull
import org.skyscreamer.jsonassert.JSONAssert
import org.springframework.web.util.UriComponentsBuilder
import org.stellar.anchor.api.event.AnchorEvent
import org.stellar.anchor.api.sep.sep24.Sep24GetTransactionResponse
import org.stellar.anchor.auth.JwtService
import org.stellar.anchor.auth.Sep24InteractiveUrlJwt
import org.stellar.anchor.platform.CLIENT_WALLET_SECRET
import org.stellar.anchor.platform.TestConfig
import org.stellar.anchor.util.GsonUtils
Expand All @@ -23,6 +26,7 @@ import org.stellar.anchor.util.StringHelper.json
import org.stellar.reference.client.AnchorReferenceServerClient
import org.stellar.reference.wallet.WalletServerClient
import org.stellar.walletsdk.ApplicationConfiguration
import org.stellar.walletsdk.InteractiveFlowResponse
import org.stellar.walletsdk.StellarConfiguration
import org.stellar.walletsdk.Wallet
import org.stellar.walletsdk.anchor.DepositTransaction
Expand Down Expand Up @@ -65,31 +69,46 @@ class Sep24End2EndTest(config: TestConfig, val jwt: String) {
private val anchorReferenceServerClient =
AnchorReferenceServerClient(Url(config.env["reference.server.url"]!!))
private val walletServerClient = WalletServerClient(Url(config.env["wallet.server.url"]!!))
private val jwtService: JwtService =
JwtService(
config.env["secret.sep10.jwt_secret"]!!,
config.env["secret.sep24.interactive_url.jwt_secret"]!!,
config.env["secret.sep24.more_info_url.jwt_secret"]!!,
config.env["secret.callback_api.auth_secret"]!!,
config.env["secret.platform_api.auth_secret"]!!
)

private fun `test typical deposit end-to-end flow`(asset: StellarAssetId, amount: String) =
runBlocking {
walletServerClient.clearCallbacks()
val token = anchor.auth().authenticate(keypair)
val txnId = makeDeposit(asset, amount, token)
val response = makeDeposit(asset, amount, token)

// Assert the interactive URL JWT is valid
val params = UriComponentsBuilder.fromUriString(response.url).build().queryParams
val cipher = params["token"]!![0]
val interactiveJwt = jwtService.decode(cipher, Sep24InteractiveUrlJwt::class.java)
assertEquals("referenceCustodial", interactiveJwt.claims[JwtService.CLIENT_NAME])

// Wait for the status to change to COMPLETED
waitForTxnStatus(txnId, COMPLETED, token)
waitForTxnStatus(response.id, COMPLETED, token)

// Check if the transaction can be listed by stellar transaction id
val fetchedTxn = anchor.getTransaction(txnId, token) as DepositTransaction
val fetchedTxn = anchor.getTransaction(response.id, token) as DepositTransaction
val transactionByStellarId =
anchor.getTransactionBy(token, stellarTransactionId = fetchedTxn.stellarTransactionId)
assertEquals(fetchedTxn.id, transactionByStellarId.id)

// Check the events sent to the reference server are recorded correctly
val actualEvents = waitForBusinessServerEvents(txnId, 4)
val actualEvents = waitForBusinessServerEvents(response.id, 4)
assertNotNull(actualEvents)
actualEvents?.let { assertEquals(4, it.size) }
val expectedEvents: List<AnchorEvent> =
gson.fromJson(expectedDepositEventsJson, object : TypeToken<List<AnchorEvent>>() {}.type)
compareAndAssertEvents(asset, expectedEvents, actualEvents!!)

// Check the callbacks sent to the wallet reference server are recorded correctly
val actualCallbacks = waitForWalletServerCallbacks(txnId, 4)
val actualCallbacks = waitForWalletServerCallbacks(response.id, 4)
actualCallbacks?.let {
assertEquals(4, it.size)
val expectedCallbacks: List<Sep24GetTransactionResponse> =
Expand All @@ -101,9 +120,14 @@ class Sep24End2EndTest(config: TestConfig, val jwt: String) {
}
}

private suspend fun makeDeposit(asset: StellarAssetId, amount: String, token: AuthToken): String {
private suspend fun makeDeposit(
asset: StellarAssetId,
amount: String,
token: AuthToken
): InteractiveFlowResponse {
// Start interactive deposit
val deposit = anchor.interactive().deposit(asset, token, mapOf("amount" to amount))

// Get transaction status and make sure it is INCOMPLETE
val transaction = anchor.getTransaction(deposit.id, token)
assertEquals(INCOMPLETE, transaction.status)
Expand All @@ -113,7 +137,7 @@ class Sep24End2EndTest(config: TestConfig, val jwt: String) {
info("accessing ${deposit.url}...")
assertEquals(200, resp.status.value)

return transaction.id
return deposit
}

private fun compareAndAssertEvents(
Expand Down Expand Up @@ -323,7 +347,7 @@ class Sep24End2EndTest(config: TestConfig, val jwt: String) {
val token = anchor.auth().authenticate(newAcc)
val deposits =
(0..1).map {
val txnId = makeDeposit(asset, amount, token)
val txnId = makeDeposit(asset, amount, token).id
waitForTxnStatus(txnId, COMPLETED, token)
txnId
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.stellar.anchor.platform.configurator;

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.Data;
import org.stellar.anchor.api.exception.InvalidConfigException;
Expand Down Expand Up @@ -69,8 +71,19 @@ public Map<String, String> toStringMap() {
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getValue()));
}

// TODO: instead of merging, create a property source with higher precedence
public void merge(ConfigMap config) {
// Matches any string of the form: <listName>[<index>].<elementName>
// <listName> can be a dot separated hierarchy of names.
Pattern pattern =
Pattern.compile("^([a-zA-Z0-9_]+(?:\\.[a-zA-Z0-9_]+)*)\\[(\\d+)](?:\\.[a-zA-Z0-9_]+)*$");
for (String name : config.names()) {
Matcher matcher = pattern.matcher(name);
// If the name is a list element, remove potential conflicting list names.
if (matcher.find()) {
String listName = matcher.group(1);
data.remove(listName);
}
data.put(name, config.data.get(name));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.stellar.anchor.api.event.AnchorEvent;
import org.stellar.anchor.api.exception.AnchorException;
import org.stellar.anchor.api.exception.EventPublishException;
import org.stellar.anchor.event.EventService;
import org.stellar.anchor.event.EventService.EventQueue;
import org.stellar.anchor.platform.config.KafkaConfig;
import org.stellar.anchor.util.GsonUtils;
import org.stellar.anchor.util.Log;

public class KafkaSession implements EventService.Session {

final KafkaConfig kafkaConfig;
final String sessionName;
final String topic;
Producer<String, AnchorEvent> producer = null;
Consumer<String, AnchorEvent> consumer = null;
Producer<String, String> producer = null;
Consumer<String, String> consumer = null;

KafkaSession(KafkaConfig kafkaConfig, String sessionName, EventQueue queue) {
this.kafkaConfig = kafkaConfig;
Expand All @@ -46,7 +46,8 @@ public void publish(AnchorEvent event) throws AnchorException {
if (producer == null) {
producer = createProducer();
}
ProducerRecord<String, AnchorEvent> record = new ProducerRecord<>(topic, event);
String serialized = GsonUtils.getInstance().toJson(event);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, serialized);
record.headers().add(new RecordHeader("type", event.getType().type.getBytes()));
// If the queue is offline, throw an exception
try {
Expand Down Expand Up @@ -76,15 +77,17 @@ public EventService.ReadResponse read() throws AnchorException {
consumer.subscribe(java.util.Collections.singletonList(topic));
}

ConsumerRecords<String, AnchorEvent> consumerRecords =
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(kafkaConfig.getPollTimeoutSeconds()));
ArrayList<AnchorEvent> events = new ArrayList<>(consumerRecords.count());
if (consumerRecords.isEmpty()) {
Log.debugF("Received {} Kafka records", consumerRecords.count());
} else {
Log.infoF("Received {} Kafka records", consumerRecords.count());
for (ConsumerRecord<String, AnchorEvent> record : consumerRecords) {
events.add(record.value());
for (ConsumerRecord<String, String> record : consumerRecords) {
AnchorEvent deserialized =
GsonUtils.getInstance().fromJson(record.value(), AnchorEvent.class);
events.add(deserialized);
}
// TOOD: emit metrics here.
}
Expand Down Expand Up @@ -123,13 +126,13 @@ public String getSessionName() {
return sessionName;
}

private Producer<String, AnchorEvent> createProducer() {
private Producer<String, String> createProducer() {
Log.debugF("kafkaConfig: {}", kafkaConfig);

Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer());
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
if (!isEmpty(kafkaConfig.getClientId())) {
props.put(CLIENT_ID_CONFIG, kafkaConfig.getClientId());
}
Expand All @@ -144,7 +147,7 @@ private Producer<String, AnchorEvent> createProducer() {
return new KafkaProducer<>(props);
}

Consumer<String, AnchorEvent> createConsumer() {
Consumer<String, String> createConsumer() {
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer());
Expand All @@ -156,7 +159,7 @@ Consumer<String, AnchorEvent> createConsumer() {
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return new KafkaConsumer<>(props);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.stellar.anchor.platform.configurator

import javax.xml.bind.annotation.XmlType.DEFAULT
import kotlin.test.assertEquals
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.stellar.anchor.api.exception.InvalidConfigException
import org.stellar.anchor.platform.configurator.ConfigMap.ConfigEntry

class ConfigMapTest {
@Test
Expand Down Expand Up @@ -50,4 +53,29 @@ class ConfigMapTest {
cm.put(testKey, value, ConfigMap.ConfigSource.ENV)
Assertions.assertEquals(false, cm.getBoolean(testKey))
}

@Test
fun `test merge`() {
val cm = ConfigMap()
cm.put("clients", "", ConfigMap.ConfigSource.DEFAULT)
cm.put("my.property", "", ConfigMap.ConfigSource.DEFAULT)
cm.put("other", "", ConfigMap.ConfigSource.DEFAULT)
val override = ConfigMap()
override.put("clients[0]", "a", ConfigMap.ConfigSource.ENV)
override.put("clients[1]", "b", ConfigMap.ConfigSource.ENV)
override.put("my.property[0]", "c", ConfigMap.ConfigSource.ENV)

cm.merge(override)

assert(
cm.data.equals(
mapOf(
"clients[0]" to ConfigEntry("a", ConfigMap.ConfigSource.ENV),
"clients[1]" to ConfigEntry("b", ConfigMap.ConfigSource.ENV),
"my.property[0]" to ConfigEntry("c", ConfigMap.ConfigSource.ENV),
"other" to ConfigEntry("", ConfigMap.ConfigSource.DEFAULT)
)
)
)
}
}
3 changes: 2 additions & 1 deletion service-runner/src/main/resources/profiles/sep24/config.env
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ clients[0].domain=wallet-server:8092
clients[0].callback_url=http://wallet-server:8092/callbacks
clients[1].name=referenceCustodial
clients[1].type=custodial
clients[1].signing_key=GDJLBYYKMCXNVVNABOE66NYXQGIA5AC5D223Z2KF6ZEYK4UBCA7FKLTG
clients[1].signing_key=GDJLBYYKMCXNVVNABOE66NYXQGIA5AC5D223Z2KF6ZEYK4UBCA7FKLTG
clients[1].callback_url=http://wallet-server:8092/callbacks

0 comments on commit ad9cac8

Please sign in to comment.