Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-1163] Inject OpenPaaS user contacts into Tmail's auto-complete database #1215

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
14 changes: 14 additions & 0 deletions tmail-backend/tmail-third-party/openpaas/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
<description>OpenPaaS integration for Twake Mail</description>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jmap-extensions</artifactId>
HoussemNasri marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-rabbitmq</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-rabbitmq</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
package com.linagora.tmail.api;

import java.util.List;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public record OpenPaasUserResponse(@JsonProperty("id") String id,
@JsonProperty("firstname") String firstname,
@JsonProperty("lastname") String lastname,
@JsonProperty("preferredEmail") String preferredEmail,
@JsonProperty("emails") List<String> emails,
@JsonProperty("main_phone") String mainPhone,
@JsonProperty("displayName") String displayName) {
public record OpenPaasUserResponse(@JsonProperty("preferredEmail") String preferredEmail) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.linagora.tmail.contact;

public record ContactAddedRabbitMqMessage(String bookId, String bookName, String contactId,
String userId, JCardObject vcard) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linagora.tmail.contact;

import java.util.Optional;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.base.Preconditions;

@JsonDeserialize(using = JCardObjectDeserializer.class)
public record JCardObject(Optional<String> fnOpt, Optional<String> emailOpt) {

public JCardObject {
Preconditions.checkNotNull(fnOpt);
Preconditions.checkNotNull(emailOpt);
}

/**
* Purpose: To specify the formatted text corresponding to the name of
* the object the vCard represents.
* <p>
* Example: Mr. John Q. Public\, Esq.
*/
@Override
public Optional<String> fnOpt() {
return fnOpt;
}

/**
* Purpose: To specify the electronic mail address for communication
* with the object the vCard represents.
* <p>
* Example: [email protected]
*/
@Override
public Optional<String> emailOpt() {
return emailOpt;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.linagora.tmail.contact;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.util.streams.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;


public class JCardObjectDeserializer extends StdDeserializer<JCardObject> {
private static final Logger LOGGER = LoggerFactory.getLogger(JCardObjectDeserializer.class);

private static final String FN = "fn";
private static final String EMAIL = "email";
private static final Set<String> SUPPORTED_PROPERTY_NAMES = Set.of(FN, EMAIL);
private static final int PROPERTY_NAME_INDEX = 0;
private static final int PROPERTIES_ARRAY_INDEX = 1;
private static final int TEXT_PROPERTY_VALUE_INDEX = 3;

public JCardObjectDeserializer() {
this(null);
}

protected JCardObjectDeserializer(Class<?> vc) {
super(vc);
}

@Override
public JCardObject deserialize(JsonParser p, DeserializationContext ctxt)
throws IOException {
JsonNode node = p.getCodec().readTree(p);

JsonNode jCardPropertiesArray = node.get(PROPERTIES_ARRAY_INDEX);
Map<String, String> jCardProperties =
collectJCardProperties(jCardPropertiesArray.iterator());

if (!jCardProperties.containsKey(FN)) {
String json = node.toString();
LOGGER.warn("""
Missing 'fn' property in the provided JCard object. 'fn' is required according to the specifications.
Received data: {}.
Ensure the 'fn' property is present and correctly formatted.""", json);
}

return new JCardObject(getOptionalFromMap(jCardProperties, FN), getOptionalFromMap(jCardProperties, EMAIL));
}

private static Map<String, String> collectJCardProperties(Iterator<JsonNode> propertiesIterator) {
return Iterators.toStream(propertiesIterator)
.map(JCardObjectDeserializer::getPropertyKeyValuePair)
.flatMap(Optional::stream)
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

private static Optional<ImmutablePair<String, String>> getPropertyKeyValuePair(JsonNode propertyNode) {
String propertyName = propertyNode.get(PROPERTY_NAME_INDEX).asText();
if (SUPPORTED_PROPERTY_NAMES.contains(propertyName)) {
String propertyValue = propertyNode.get(TEXT_PROPERTY_VALUE_INDEX).asText();
return Optional.of(ImmutablePair.of(propertyName, propertyValue));
} else {
return Optional.empty();
}
}

private Optional<String> getOptionalFromMap(Map<String, String> map, String key) {
return Optional.ofNullable(map.getOrDefault(key, null));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.linagora.tmail.contact;

import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.BiFunction;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.mail.internet.AddressException;

import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.core.MailAddress;
import org.apache.james.core.Username;
import org.apache.james.jmap.api.model.AccountId;
import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linagora.tmail.api.OpenPaasRestClient;
import com.linagora.tmail.james.jmap.EmailAddressContactInjectKeys;
import com.linagora.tmail.james.jmap.contact.ContactFields;
import com.linagora.tmail.james.jmap.contact.EmailAddressContact;
import com.linagora.tmail.james.jmap.contact.EmailAddressContactSearchEngine;
import com.rabbitmq.client.BuiltinExchangeType;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;


public class OpenPaasContactsConsumer implements Startable, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(OpenPaasContactsConsumer.class);

private static final boolean REQUEUE_ON_NACK = false;
public static final String EXCHANGE_NAME = "contacts:contact:add";
public static final String QUEUE_NAME = "ConsumeOpenPaasContactsQueue";
public static final String DEAD_LETTER_EXCHANGE = "contacts:contact:add:dead:letter";
public static final String DEAD_LETTER_QUEUE = "ConsumeOpenPaasContactsQueue-dead-letter";

private Disposable consumeContactsDisposable;
private final ReceiverProvider receiverProvider;
private final Sender sender;
private final RabbitMQConfiguration commonRabbitMQConfiguration;
private final EmailAddressContactSearchEngine contactSearchEngine;
private final ObjectMapper objectMapper = new ObjectMapper();
private final OpenPaasRestClient openPaasRestClient;

@Inject
public OpenPaasContactsConsumer(@Named(EmailAddressContactInjectKeys.AUTOCOMPLETE) ReceiverProvider receiverProvider,
@Named(EmailAddressContactInjectKeys.AUTOCOMPLETE) Sender sender,
Comment on lines +63 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inject a RabbitMQCHannelPool

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inject a RabbitMQCHannelPool

Not done

RabbitMQConfiguration commonRabbitMQConfiguration,
EmailAddressContactSearchEngine contactSearchEngine,
OpenPaasRestClient openPaasRestClient) {
this.receiverProvider = receiverProvider;
this.sender = sender;
this.commonRabbitMQConfiguration = commonRabbitMQConfiguration;
this.contactSearchEngine = contactSearchEngine;
this.openPaasRestClient = openPaasRestClient;
}

public void start() {
Flux.concat(
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)
.durable(DURABLE).type(BuiltinExchangeType.FANOUT.getType())),
sender.declareExchange(ExchangeSpecification.exchange(DEAD_LETTER_EXCHANGE)
.durable(DURABLE)),
sender.declareQueue(QueueSpecification
.queue(DEAD_LETTER_QUEUE)
.durable(DURABLE)
.arguments(commonRabbitMQConfiguration.workQueueArgumentsBuilder().build())),
HoussemNasri marked this conversation as resolved.
Show resolved Hide resolved
sender.declareQueue(QueueSpecification
.queue(QUEUE_NAME)
.durable(DURABLE)
.arguments(commonRabbitMQConfiguration.workQueueArgumentsBuilder()
.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put directly the dead letter queue here as an exchange and save a bit of boiler plate.

.put("x-dead-letter-routing-key", EMPTY_ROUTING_KEY)
.build())),
sender.bind(BindingSpecification.binding()
.exchange(EXCHANGE_NAME)
.queue(QUEUE_NAME)
.routingKey(EMPTY_ROUTING_KEY)),
sender.bind(BindingSpecification.binding()
.exchange(DEAD_LETTER_EXCHANGE)
.queue(DEAD_LETTER_QUEUE)
.routingKey(EMPTY_ROUTING_KEY)))
.then()
.block();

consumeContactsDisposable = doConsumeContactMessages();
}

private Disposable doConsumeContactMessages() {
return delivery()
.flatMap(delivery -> messageConsume(delivery, new String(delivery.getBody(), StandardCharsets.UTF_8)))
.subscribe();
}

public Flux<AcknowledgableDelivery> delivery() {
return Flux.using(receiverProvider::createReceiver,
receiver -> receiver.consumeManualAck(QUEUE_NAME),
Receiver::close);
}

private Mono<EmailAddressContact> messageConsume(AcknowledgableDelivery ackDelivery, String messagePayload) {
return Mono.just(messagePayload)
.map(this::parseContactAddedRabbitMqMessage)
.flatMap(this::handleMessage)
.doOnSuccess(result -> {
LOGGER.warn("Consumed contact successfully '{}'", result);
ackDelivery.ack();
})
.onErrorResume(error -> {
LOGGER.error("Error when consume message '{}'", messagePayload, error);
ackDelivery.nack(REQUEUE_ON_NACK);
return Mono.empty();
});
}

private ContactAddedRabbitMqMessage parseContactAddedRabbitMqMessage(String message) {
try {
return objectMapper.readValue(message, ContactAddedRabbitMqMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse ContactAddedRabbitMqMessage", e);
}
}

private Mono<EmailAddressContact> handleMessage(ContactAddedRabbitMqMessage contactAddedMessage) {
LOGGER.info("Consumed jCard object message: {}", contactAddedMessage);
Optional<ContactFields> contactFieldsOpt = toContactFields(contactAddedMessage.vcard());
return openPaasRestClient.retrieveMailAddress(contactAddedMessage.userId())
.map(ownerMailAddress -> AccountId.fromUsername(Username.fromMailAddress(ownerMailAddress)))
.flatMap(ownerAccountId ->
Mono.justOrEmpty(contactFieldsOpt)
.flatMap(contactFields -> doAddContact(ownerAccountId, contactFields)));
}

private Mono<EmailAddressContact> doAddContact(AccountId ownerAccountId, ContactFields contactFields) {
return Mono.from(contactSearchEngine.index(ownerAccountId, contactFields));
}

private Optional<ContactFields> toContactFields(JCardObject jCardObject) {
Optional<String> contactFullnameOpt = jCardObject.fnOpt();
Optional<MailAddress> contactMailAddressOpt = jCardObject.emailOpt()
.flatMap(contactEmail -> {
try {
return Optional.of(new MailAddress(contactEmail));
} catch (AddressException e) {
LOGGER.warn("Invalid contact email address: {}", contactEmail, e);
return Optional.empty();
}
});

return combineOptionals(contactFullnameOpt, contactMailAddressOpt,
(contactFullname, contactMailAddress) ->
new ContactFields(contactMailAddress, contactFullname, contactFullname));
}

private static <T,K,V> Optional<V> combineOptionals(Optional<T> opt1, Optional<K> opt2, BiFunction<T, K, V> f) {
return opt1.flatMap(t1 -> opt2.map(t2 -> f.apply(t1, t2)));
}
Comment on lines +167 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the heck is that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We map two optionals into one and apply a .map on them. The result optional is present only if both input optionals are present. It is like .zip in scala.


@Override
public void close() throws IOException {
if (consumeContactsDisposable != null) {
consumeContactsDisposable.dispose();
}
}
}
Loading