Skip to content

Commit

Permalink
Refactor OpenPaasContactsConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
HoussemNasri committed Oct 18, 2024
1 parent ef23bd5 commit 94a9b41
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.BiFunction;

import jakarta.inject.Inject;
import jakarta.inject.Named;
Expand All @@ -21,19 +22,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linagora.tmail.api.OpenPaasRestClient;
import com.linagora.tmail.james.jmap.EmailAddressContactInjectKeys;
import com.linagora.tmail.james.jmap.contact.ContactFields;
import com.linagora.tmail.james.jmap.contact.EmailAddressContact;
import com.linagora.tmail.james.jmap.contact.EmailAddressContactSearchEngine;
import com.rabbitmq.client.BuiltinExchangeType;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.core.publisher.SignalType;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
Expand Down Expand Up @@ -84,13 +85,14 @@ public void start() {
.block();

consumeContactsDisposable = doConsumeContactMessages();
System.out.println("Hello, World");
}

private Disposable doConsumeContactMessages() {
return delivery()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(delivery ->
messageConsume(delivery, new String(delivery.getBody(), StandardCharsets.UTF_8)));
.flatMap(delivery -> messageConsume(delivery, new String(delivery.getBody(), StandardCharsets.UTF_8)))
.doOnError(e -> LOGGER.error("Failed to consume contact message", e))
.subscribe();
}

public Flux<AcknowledgableDelivery> delivery() {
Expand All @@ -99,58 +101,61 @@ public Flux<AcknowledgableDelivery> delivery() {
Receiver::close);
}

private void messageConsume(AcknowledgableDelivery ackDelivery, String messagePayload) {
Mono.just(messagePayload)
.<ContactAddedRabbitMqMessage>handle((message, sink) -> {
try {
sink.next(objectMapper.readValue(message, ContactAddedRabbitMqMessage.class));
} catch (JsonProcessingException e) {
sink.error(new RuntimeException(e));
private Mono<EmailAddressContact> messageConsume(AcknowledgableDelivery ackDelivery, String messagePayload) {
return Mono.just(messagePayload)
.map(this::parseContactAddedRabbitMqMessage)
.flatMap(this::handleMessage)
.doFinally(signal -> {
if (signal == SignalType.ON_COMPLETE) {
ackDelivery.ack();
} else if (signal == SignalType.ON_ERROR) {
ackDelivery.nack(false);
}
})
.handle((msg, sink) -> {
handleMessage(msg).block();
ackDelivery.ack();
})
.onErrorResume(e -> {
LOGGER.error("Failed to consume OpenPaaS added contact message", e);
return Mono.empty();
}).subscribe();
.onErrorResume(e -> Mono.error(new RuntimeException("Failed to consume OpenPaaS added contact message", e)));
}

private Mono<Void> handleMessage(ContactAddedRabbitMqMessage contactAddedMessage) {
private ContactAddedRabbitMqMessage parseContactAddedRabbitMqMessage(String message) {
try {
return objectMapper.readValue(message, ContactAddedRabbitMqMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse ContactAddedRabbitMqMessage", e);
}
}

private Mono<EmailAddressContact> handleMessage(ContactAddedRabbitMqMessage contactAddedMessage) {
LOGGER.info("Consumed jCard object message: {}", contactAddedMessage);

String openPaasOwnerId = contactAddedMessage.userId();
return openPaasRestClient.retrieveMailAddress(openPaasOwnerId)
.mapNotNull(ownerMailAddress -> {
JCardObject jCardObject = contactAddedMessage.vcard();

Optional<String> contactFullnameOpt = jCardObject.fnOpt();
Optional<MailAddress> contactMailAddressOpt = jCardObject.emailOpt()
.flatMap(contactEmail -> {
try {
return Optional.of(new MailAddress(contactEmail));
} catch (AddressException e) {
return Optional.empty();
}
});

if (contactFullnameOpt.isEmpty() || contactMailAddressOpt.isEmpty()) {
return Mono.empty();
}
return openPaasRestClient.retrieveMailAddress(contactAddedMessage.userId())
.map(ownerMailAddress -> AccountId.fromUsername(Username.fromMailAddress(ownerMailAddress)))
.flatMap(ownerAccountId ->
Mono.justOrEmpty(toContactFields(contactAddedMessage.vcard()))
.flatMap(contactFields -> doAddContact(ownerAccountId, contactFields)));
}

private Mono<EmailAddressContact> doAddContact(AccountId ownerAccountId, ContactFields contactFields) {
return Mono.from(contactSearchEngine.index(ownerAccountId, contactFields));
}

ContactFields contactFields = new ContactFields(
contactMailAddressOpt.get(),
contactFullnameOpt.get(),
contactFullnameOpt.get()
);
private Optional<ContactFields> toContactFields(JCardObject jCardObject) {
Optional<String> contactFullnameOpt = jCardObject.fnOpt();
Optional<MailAddress> contactMailAddressOpt = jCardObject.emailOpt()
.flatMap(contactEmail -> {
try {
return Optional.of(new MailAddress(contactEmail));
} catch (AddressException e) {
LOGGER.warn("Invalid contact email address: {}", contactEmail, e);
return Optional.empty();
}
});

AccountId ownerAccountId =
AccountId.fromUsername(Username.fromMailAddress(ownerMailAddress));
return combineOptionals(contactFullnameOpt, contactMailAddressOpt,
(contactFullname, contactMailAddress) ->
new ContactFields(contactMailAddress, contactFullname, contactFullname));
}

return Mono.from(contactSearchEngine.index(ownerAccountId, contactFields)).block();
}).then();
private static <T,K,V> Optional<V> combineOptionals(Optional<T> opt1, Optional<K> opt2, BiFunction<T, K, V> f) {
return opt1.flatMap(t1 -> opt2.map(t2 -> f.apply(t1, t2)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.jmap.api.model.AccountId;
import org.bouncycastle.oer.its.ieee1609dot2.basetypes.UINT16;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import reactor.core.publisher.Flux;
Expand All @@ -38,18 +40,17 @@ class OpenPaasContactsConsumerTest {
.isolationPolicy(WEAK);

private EmailAddressContactSearchEngine searchEngine;
private OpenPaasRestClient restClient;
private OpenPaasContactsConsumer consumer;

@BeforeEach
void setup() throws URISyntaxException {
searchEngine = new InMemoryEmailAddressContactSearchEngine();
restClient = new OpenPaasRestClient(
OpenPaasRestClient restClient = new OpenPaasRestClient(
new OpenPaasConfiguration(
openPaasServerExtension.getBaseUrl(),
"admin",
"admin")
);
searchEngine = new InMemoryEmailAddressContactSearchEngine();
consumer = new OpenPaasContactsConsumer(rabbitMQExtension.getReceiverProvider(),
rabbitMQExtension.getSender(),
rabbitMQExtension.getRabbitMQ().withQuorumQueueConfiguration(),
Expand Down

0 comments on commit 94a9b41

Please sign in to comment.