emailOpt) {
+
+ public JCardObject {
+ Preconditions.checkNotNull(fnOpt);
+ Preconditions.checkNotNull(emailOpt);
+ }
+
+ /**
+ * Purpose: To specify the formatted text corresponding to the name of
+ * the object the vCard represents.
+ *
+ * Example: Mr. John Q. Public\, Esq.
+ */
+ @Override
+ public Optional fnOpt() {
+ return fnOpt;
+ }
+
+ /**
+ * Purpose: To specify the electronic mail address for communication
+ * with the object the vCard represents.
+ *
+ * Example: jane_doe@example.com
+ */
+ @Override
+ public Optional emailOpt() {
+ return emailOpt;
+ }
+}
diff --git a/tmail-backend/tmail-third-party/openpaas/src/main/java/com/linagora/tmail/contact/JCardObjectDeserializer.java b/tmail-backend/tmail-third-party/openpaas/src/main/java/com/linagora/tmail/contact/JCardObjectDeserializer.java
new file mode 100644
index 0000000000..b4611b408e
--- /dev/null
+++ b/tmail-backend/tmail-third-party/openpaas/src/main/java/com/linagora/tmail/contact/JCardObjectDeserializer.java
@@ -0,0 +1,80 @@
+package com.linagora.tmail.contact;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+
+public class JCardObjectDeserializer extends StdDeserializer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JCardObjectDeserializer.class);
+
+ private static final String FN = "fn";
+ private static final String EMAIL = "email";
+ private static final Set SUPPORTED_PROPERTY_NAMES = Set.of(FN, EMAIL);
+ private static final int PROPERTY_NAME_INDEX = 0;
+ private static final int PROPERTIES_ARRAY_INDEX = 1;
+ private static final int TEXT_PROPERTY_VALUE_INDEX = 3;
+
+ public JCardObjectDeserializer() {
+ this(null);
+ }
+
+ protected JCardObjectDeserializer(Class> vc) {
+ super(vc);
+ }
+
+ @Override
+ public JCardObject deserialize(JsonParser p, DeserializationContext ctxt)
+ throws IOException {
+ JsonNode node = p.getCodec().readTree(p);
+
+ JsonNode jCardPropertiesArray = node.get(PROPERTIES_ARRAY_INDEX);
+ Map jCardProperties =
+ collectJCardProperties(jCardPropertiesArray.iterator());
+
+ if (!jCardProperties.containsKey(FN)) {
+ String json = node.toString();
+ LOGGER.warn("""
+ Missing 'fn' property in the provided JCard object. 'fn' is required according to the specifications.
+ Received data: {}.
+ Ensure the 'fn' property is present and correctly formatted.""", json);
+ }
+
+ return new JCardObject(getOptionalFromMap(jCardProperties, FN), getOptionalFromMap(jCardProperties, EMAIL));
+ }
+
+ private static Map collectJCardProperties(Iterator propertiesIterator) {
+ return Iterators.toStream(propertiesIterator)
+ .map(JCardObjectDeserializer::getPropertyKeyValuePair)
+ .flatMap(Optional::stream)
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ }
+
+ private static Optional> getPropertyKeyValuePair(JsonNode propertyNode) {
+ String propertyName = propertyNode.get(PROPERTY_NAME_INDEX).asText();
+ if (SUPPORTED_PROPERTY_NAMES.contains(propertyName)) {
+ String propertyValue = propertyNode.get(TEXT_PROPERTY_VALUE_INDEX).asText();
+ return Optional.of(ImmutablePair.of(propertyName, propertyValue));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private Optional getOptionalFromMap(Map map, String key) {
+ return Optional.ofNullable(map.getOrDefault(key, null));
+ }
+}
diff --git a/tmail-backend/tmail-third-party/openpaas/src/main/java/com/linagora/tmail/contact/OpenPaasContactsConsumer.java b/tmail-backend/tmail-third-party/openpaas/src/main/java/com/linagora/tmail/contact/OpenPaasContactsConsumer.java
new file mode 100644
index 0000000000..bb037a55df
--- /dev/null
+++ b/tmail-backend/tmail-third-party/openpaas/src/main/java/com/linagora/tmail/contact/OpenPaasContactsConsumer.java
@@ -0,0 +1,182 @@
+package com.linagora.tmail.contact;
+
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+import jakarta.inject.Inject;
+import jakarta.inject.Named;
+import jakarta.mail.internet.AddressException;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.core.MailAddress;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.AccountId;
+import org.apache.james.lifecycle.api.Startable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linagora.tmail.api.OpenPaasRestClient;
+import com.linagora.tmail.james.jmap.EmailAddressContactInjectKeys;
+import com.linagora.tmail.james.jmap.contact.ContactFields;
+import com.linagora.tmail.james.jmap.contact.EmailAddressContact;
+import com.linagora.tmail.james.jmap.contact.EmailAddressContactSearchEngine;
+import com.rabbitmq.client.BuiltinExchangeType;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.Sender;
+
+
+public class OpenPaasContactsConsumer implements Startable, Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OpenPaasContactsConsumer.class);
+
+ private static final boolean REQUEUE_ON_NACK = false;
+ public static final String EXCHANGE_NAME = "contacts:contact:add";
+ public static final String QUEUE_NAME = "ConsumeOpenPaasContactsQueue";
+ public static final String DEAD_LETTER_EXCHANGE = "contacts:contact:add:dead:letter";
+ public static final String DEAD_LETTER_QUEUE = "ConsumeOpenPaasContactsQueue-dead-letter";
+
+ private Disposable consumeContactsDisposable;
+ private final ReceiverProvider receiverProvider;
+ private final Sender sender;
+ private final RabbitMQConfiguration commonRabbitMQConfiguration;
+ private final EmailAddressContactSearchEngine contactSearchEngine;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final OpenPaasRestClient openPaasRestClient;
+
+ @Inject
+ public OpenPaasContactsConsumer(@Named(EmailAddressContactInjectKeys.AUTOCOMPLETE) ReceiverProvider receiverProvider,
+ @Named(EmailAddressContactInjectKeys.AUTOCOMPLETE) Sender sender,
+ RabbitMQConfiguration commonRabbitMQConfiguration,
+ EmailAddressContactSearchEngine contactSearchEngine,
+ OpenPaasRestClient openPaasRestClient) {
+ this.receiverProvider = receiverProvider;
+ this.sender = sender;
+ this.commonRabbitMQConfiguration = commonRabbitMQConfiguration;
+ this.contactSearchEngine = contactSearchEngine;
+ this.openPaasRestClient = openPaasRestClient;
+ }
+
+ public void start() {
+ Flux.concat(
+ sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)
+ .durable(DURABLE).type(BuiltinExchangeType.FANOUT.getType())),
+ sender.declareExchange(ExchangeSpecification.exchange(DEAD_LETTER_EXCHANGE)
+ .durable(DURABLE)),
+ sender.declareQueue(QueueSpecification
+ .queue(DEAD_LETTER_QUEUE)
+ .durable(DURABLE)
+ .arguments(commonRabbitMQConfiguration.workQueueArgumentsBuilder().build())),
+ sender.declareQueue(QueueSpecification
+ .queue(QUEUE_NAME)
+ .durable(DURABLE)
+ .arguments(commonRabbitMQConfiguration.workQueueArgumentsBuilder()
+ .put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
+ .put("x-dead-letter-routing-key", EMPTY_ROUTING_KEY)
+ .build())),
+ sender.bind(BindingSpecification.binding()
+ .exchange(EXCHANGE_NAME)
+ .queue(QUEUE_NAME)
+ .routingKey(EMPTY_ROUTING_KEY)),
+ sender.bind(BindingSpecification.binding()
+ .exchange(DEAD_LETTER_EXCHANGE)
+ .queue(DEAD_LETTER_QUEUE)
+ .routingKey(EMPTY_ROUTING_KEY)))
+ .then()
+ .block();
+
+ consumeContactsDisposable = doConsumeContactMessages();
+ }
+
+ private Disposable doConsumeContactMessages() {
+ return delivery()
+ .flatMap(delivery -> messageConsume(delivery, new String(delivery.getBody(), StandardCharsets.UTF_8)))
+ .subscribe();
+ }
+
+ public Flux delivery() {
+ return Flux.using(receiverProvider::createReceiver,
+ receiver -> receiver.consumeManualAck(QUEUE_NAME),
+ Receiver::close);
+ }
+
+ private Mono messageConsume(AcknowledgableDelivery ackDelivery, String messagePayload) {
+ return Mono.just(messagePayload)
+ .map(this::parseContactAddedRabbitMqMessage)
+ .flatMap(this::handleMessage)
+ .doOnSuccess(result -> {
+ LOGGER.warn("Consumed contact successfully '{}'", result);
+ ackDelivery.ack();
+ })
+ .onErrorResume(error -> {
+ LOGGER.error("Error when consume message '{}'", messagePayload, error);
+ ackDelivery.nack(REQUEUE_ON_NACK);
+ return Mono.empty();
+ });
+ }
+
+ private ContactAddedRabbitMqMessage parseContactAddedRabbitMqMessage(String message) {
+ try {
+ return objectMapper.readValue(message, ContactAddedRabbitMqMessage.class);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to parse ContactAddedRabbitMqMessage", e);
+ }
+ }
+
+ private Mono handleMessage(ContactAddedRabbitMqMessage contactAddedMessage) {
+ LOGGER.info("Consumed jCard object message: {}", contactAddedMessage);
+ Optional contactFieldsOpt = toContactFields(contactAddedMessage.vcard());
+ return openPaasRestClient.retrieveMailAddress(contactAddedMessage.userId())
+ .map(ownerMailAddress -> AccountId.fromUsername(Username.fromMailAddress(ownerMailAddress)))
+ .flatMap(ownerAccountId ->
+ Mono.justOrEmpty(contactFieldsOpt)
+ .flatMap(contactFields -> doAddContact(ownerAccountId, contactFields)));
+ }
+
+ private Mono doAddContact(AccountId ownerAccountId, ContactFields contactFields) {
+ return Mono.from(contactSearchEngine.index(ownerAccountId, contactFields));
+ }
+
+ private Optional toContactFields(JCardObject jCardObject) {
+ Optional contactFullnameOpt = jCardObject.fnOpt();
+ Optional contactMailAddressOpt = jCardObject.emailOpt()
+ .flatMap(contactEmail -> {
+ try {
+ return Optional.of(new MailAddress(contactEmail));
+ } catch (AddressException e) {
+ LOGGER.warn("Invalid contact email address: {}", contactEmail, e);
+ return Optional.empty();
+ }
+ });
+
+ return combineOptionals(contactFullnameOpt, contactMailAddressOpt,
+ (contactFullname, contactMailAddress) ->
+ new ContactFields(contactMailAddress, contactFullname, contactFullname));
+ }
+
+ private static Optional combineOptionals(Optional opt1, Optional opt2, BiFunction f) {
+ return opt1.flatMap(t1 -> opt2.map(t2 -> f.apply(t1, t2)));
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (consumeContactsDisposable != null) {
+ consumeContactsDisposable.dispose();
+ }
+ }
+}
diff --git a/tmail-backend/tmail-third-party/openpaas/src/test/java/com/linagora/tmail/contact/OpenPaasContactsConsumerTest.java b/tmail-backend/tmail-third-party/openpaas/src/test/java/com/linagora/tmail/contact/OpenPaasContactsConsumerTest.java
new file mode 100644
index 0000000000..6950268537
--- /dev/null
+++ b/tmail-backend/tmail-third-party/openpaas/src/test/java/com/linagora/tmail/contact/OpenPaasContactsConsumerTest.java
@@ -0,0 +1,154 @@
+package com.linagora.tmail.contact;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.RabbitMQExtension.IsolationPolicy.WEAK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Durations.TEN_SECONDS;
+
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.apache.james.jmap.api.model.AccountId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
+
+import com.linagora.tmail.OpenPaasConfiguration;
+import com.linagora.tmail.api.OpenPaasRestClient;
+import com.linagora.tmail.api.OpenPaasServerExtension;
+import com.linagora.tmail.james.jmap.contact.EmailAddressContactSearchEngine;
+import com.linagora.tmail.james.jmap.contact.InMemoryEmailAddressContactSearchEngine;
+
+class OpenPaasContactsConsumerTest {
+
+ @RegisterExtension
+ static OpenPaasServerExtension openPaasServerExtension = new OpenPaasServerExtension();
+
+ @RegisterExtension
+ static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
+ .isolationPolicy(WEAK);
+
+ private EmailAddressContactSearchEngine searchEngine;
+ private OpenPaasContactsConsumer consumer;
+
+ @BeforeEach
+ void setup() throws URISyntaxException {
+ OpenPaasRestClient restClient = new OpenPaasRestClient(
+ new OpenPaasConfiguration(
+ openPaasServerExtension.getBaseUrl(),
+ "admin",
+ "admin")
+ );
+ searchEngine = new InMemoryEmailAddressContactSearchEngine();
+ consumer = new OpenPaasContactsConsumer(rabbitMQExtension.getReceiverProvider(),
+ rabbitMQExtension.getSender(),
+ rabbitMQExtension.getRabbitMQ().withQuorumQueueConfiguration(),
+ searchEngine, restClient);
+
+ consumer.start();
+ }
+
+ @AfterEach
+ void afterEach() throws IOException {
+ consumer.close();
+ }
+
+ @Test
+ void consumeMessageShouldNotCrashOnInvalidMessages() throws InterruptedException {
+ IntStream.range(0, 10).forEach(i -> sendMessage("BAD_PAYLOAD" + i));
+
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ sendMessage("""
+ {
+ "bookId": "abc0a663bdaffe0026290xyz",
+ "bookName": "contacts",
+ "contactId": "fd9b3c98-fc77-4187-92ac-d9f58d400968",
+ "userId": "abc0a663bdaffe0026290xyz",
+ "vcard": [
+ "vcard",
+ [
+ [ "version", {}, "text", "4.0" ],
+ [ "kind", {}, "text", "individual" ],
+ [ "fn", {}, "text", "Jane Doe" ],
+ [ "email", {}, "text", "jhon@doe.com" ],
+ [ "org", {}, "text", [ "ABC, Inc.", "North American Division", "Marketing" ] ]
+ ]
+ ]
+ }
+ """);
+
+ await().timeout(TEN_SECONDS).untilAsserted(() ->
+ assertThat(
+ Flux.from(searchEngine.autoComplete(AccountId.fromString(OpenPaasServerExtension.ALICE_EMAIL()), "jhon", 10))
+ .collectList().block())
+ .hasSize(1));
+ }
+
+ @Test
+ void contactShouldBeIndexedWhenContactUserAddedMessage() {
+ sendMessage("""
+ {
+ "bookId": "abc0a663bdaffe0026290xyz",
+ "bookName": "contacts",
+ "contactId": "fd9b3c98-fc77-4187-92ac-d9f58d400968",
+ "userId": "abc0a663bdaffe0026290xyz",
+ "vcard": [
+ "vcard",
+ [
+ [ "version", {}, "text", "4.0" ],
+ [ "kind", {}, "text", "individual" ],
+ [ "fn", {}, "text", "Jane Doe" ],
+ [ "email", {}, "text", "jhon@doe.com" ],
+ [ "org", {}, "text", [ "ABC, Inc.", "North American Division", "Marketing" ] ]
+ ]
+ ]
+ }
+ """);
+
+ await().timeout(TEN_SECONDS).untilAsserted(() ->
+ assertThat(
+ Flux.from(searchEngine.autoComplete(AccountId.fromString(OpenPaasServerExtension.ALICE_EMAIL()), "jhon", 10))
+ .collectList().block())
+ .hasSize(1));
+ }
+
+ @Test
+ void contactDisplayNameShouldBeSetFromTheReceivedOpenPaasContactObject() {
+
+ }
+
+ @Test
+ void givenDisplayNameFromOpenPaasNotEmptyThenStoredDisplayNameShouldBeOverridden() {
+
+ }
+
+ @Test
+ void givenDisplayNameFromOpenPaasIsEmptyThenStoredDisplayNameShouldPersist() {
+
+ }
+
+ @Test
+ void automaticContactIndexingShouldNotOverrideContactInfoFromOpenPaas() {
+ // The automatic contact indexing is triggered when you send or receive a message
+ // from someone, then their contact info be automatically indexed in the contacts' search engine.
+ }
+
+ private void sendMessage(String message) {
+ rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage(
+ OpenPaasContactsConsumer.EXCHANGE_NAME,
+ EMPTY_ROUTING_KEY,
+ message.getBytes(UTF_8))))
+ .block();
+ }
+}
\ No newline at end of file