Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Outlook] Feature: Office365 multi-user support #2844

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
897fed8
[Outlook] Feature: Office365 multi-user support
ilyasabdellaoui Sep 23, 2024
92af497
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Sep 23, 2024
c9f3fb1
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Sep 24, 2024
06cdee0
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Sep 29, 2024
8076809
[Outlook] Fix: Update order values and add tooltip for client_emails
ilyasabdellaoui Sep 29, 2024
104e299
[Outlook] Feat: Implement JSON batching for user fetching in Outlook …
ilyasabdellaoui Sep 29, 2024
71a3087
[Outlook] refactor: update client_emails to use list type in config
ilyasabdellaoui Sep 29, 2024
e86c9bc
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Sep 30, 2024
b92477e
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Oct 8, 2024
ef61bc9
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Oct 19, 2024
a99bbaf
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Oct 22, 2024
ef143ad
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Nov 10, 2024
72543a1
[Outlook] Feat: Add test for JSON batch user fetching with client emails
ilyasabdellaoui Nov 10, 2024
bc16fff
[Outlook] Fix: Update abstract method to yield for async iteration
ilyasabdellaoui Nov 11, 2024
85c3f5d
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Nov 11, 2024
e4923b5
[Outlook] Refactor: use GRAPH_API_BATCH_SIZE constant instead of hard…
ilyasabdellaoui Nov 12, 2024
920697a
[Outlook] Refactor: Set wildcard as default for client email list whe…
ilyasabdellaoui Nov 14, 2024
3b144fa
[Outlook] Test: Improve batch endpoint mock to reflect Graph API resp…
ilyasabdellaoui Nov 14, 2024
2d3a28b
Merge branch 'main' into feature/office365-multi-user-support
ilyasabdellaoui Nov 14, 2024
4912724
Merge branch 'main' into feature/office365-multi-user-support
artem-shelkovnikov Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 157 additions & 21 deletions connectors/sources/outlook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import asyncio
import os
from abc import ABC, abstractmethod
from copy import copy
from datetime import date
from functools import cached_property, partial
Expand Down Expand Up @@ -48,9 +49,12 @@

RETRIES = 3
RETRY_INTERVAL = 2
WILDCARD = "*"

QUEUE_MEM_SIZE = 5 * 1024 * 1024 # Size in Megabytes

GRAPH_API_BATCH_SIZE = 20 # Max batch size supported for fetching users from Graph API

OUTLOOK_SERVER = "outlook_server"
OUTLOOK_CLOUD = "outlook_cloud"
API_SCOPE = "https://graph.microsoft.com/.default"
Expand Down Expand Up @@ -212,6 +216,18 @@ class SSLFailed(Exception):
pass


class OutlookUserFetchFailed(Exception):
"""Exception class to notify that fetching a specific user from Outlook failed."""

pass


class BatchRequestFailed(Exception):
"""Exception class to notify that a batch request to fetch users failed."""

pass


class ManageCertificate:
async def store_certificate(self, certificate):
async with aiofiles.open(CERT_FILE, "w") as file:
Expand Down Expand Up @@ -348,13 +364,13 @@ async def get_user_accounts(self):
yield user_account


class Office365Users:
"""Fetch users from Office365 Active Directory"""
class BaseOffice365User(ABC):
"""Abstract base class for Office 365 user management"""

def __init__(self, client_id, client_secret, tenant_id):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.tenant_id = tenant_id

@cached_property
def _get_session(self):
Expand Down Expand Up @@ -403,6 +419,21 @@ async def _fetch_token(self):
except Exception as exception:
self._check_errors(response=exception)

@abstractmethod
async def get_users(self):
yield

@abstractmethod
async def get_user_accounts(self):
yield


class Office365Users(BaseOffice365User):
"""Fetch users from Office365 Active Directory"""

def __init__(self, client_id, client_secret, tenant_id):
super().__init__(client_id, client_secret, tenant_id)

@retryable(
retries=RETRIES,
interval=RETRY_INTERVAL,
Expand Down Expand Up @@ -456,6 +487,78 @@ async def get_user_accounts(self):
yield user_account


class MultiOffice365Users(BaseOffice365User):
"""Fetch multiple Office365 users based on a list of email addresses."""

def __init__(self, client_id, client_secret, tenant_id, client_emails):
super().__init__(client_id, client_secret, tenant_id)
self.client_emails = client_emails

async def get_users(self):
access_token = await self._fetch_token()
errors = []
for i in range(0, len(self.client_emails), GRAPH_API_BATCH_SIZE):
batch_emails = self.client_emails[i : i + GRAPH_API_BATCH_SIZE]
requests = [
{"id": str(index + 1), "method": "GET", "url": f"/users/{email}"}
for index, email in enumerate(batch_emails)
]
batch_request_body = {"requests": requests}
try:
async with self._get_session.post(
url="https://graph.microsoft.com/v1.0/$batch",
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
json=batch_request_body,
) as response:
json_response = await response.json()
for res in json_response.get("responses", []):
user_id = res.get("id")
status = res.get("status")
if status == 200:
yield res.get("body")
else:
msg = f"Error for user {user_id}: {res.get('body')}"
errors.append(OutlookUserFetchFailed(msg))
except Exception as e:
msg = f"Batch request failed: {str(e)}"
errors.append(BatchRequestFailed(msg))

if errors:
msg = "Errors occurred while fetching users: " + "\n".join(
str(e) for e in errors
)
raise Exception(msg)

async def get_user_accounts(self):
async for user in self.get_users():
mail = user.get("mail")
if mail is None:
continue

credentials = OAuth2Credentials(
client_id=self.client_id,
tenant_id=self.tenant_id,
client_secret=self.client_secret,
identity=Identity(primary_smtp_address=mail),
)
configuration = Configuration(
credentials=credentials,
auth_type=OAUTH2,
service_endpoint=EWS_ENDPOINT,
retry_policy=FaultTolerance(max_wait=120),
)
user_account = Account(
primary_smtp_address=mail,
config=configuration,
autodiscover=False,
access_type=IMPERSONATION,
)
yield user_account


class OutlookDocFormatter:
"""Format Outlook object documents to Elasticsearch document"""

Expand Down Expand Up @@ -583,6 +686,28 @@ def attachment_doc_formatter(self, attachment, attachment_type, timezone):
}


class UserFactory:
"""Factory class for creating Office365 user instances"""

@staticmethod
def create_user(configuration: dict) -> BaseOffice365User:
client_emails = configuration.get("client_emails", WILDCARD)
if client_emails == WILDCARD or client_emails == [WILDCARD]:
return Office365Users(
client_id=configuration["client_id"],
client_secret=configuration["client_secret"],
tenant_id=configuration["tenant_id"],
)
else:
client_emails = [email.strip() for email in client_emails]
return MultiOffice365Users(
client_id=configuration["client_id"],
client_secret=configuration["client_secret"],
tenant_id=configuration["tenant_id"],
client_emails=client_emails,
)


class OutlookClient:
"""Outlook client to handle API calls made to Outlook"""

Expand All @@ -605,11 +730,7 @@ def set_logger(self, logger_):
@cached_property
def _get_user_instance(self):
if self.is_cloud:
return Office365Users(
client_id=self.configuration["client_id"],
client_secret=self.configuration["client_secret"],
tenant_id=self.configuration["tenant_id"],
)
return UserFactory.create_user(self.configuration)

return ExchangeUsers(
ad_server=self.configuration["active_directory_server"],
Expand All @@ -627,7 +748,8 @@ async def _fetch_all_users(self):
yield user

async def ping(self):
await anext(self._get_user_instance.get_users())
async for _user in self._get_user_instance.get_users():
return

async def get_mails(self, account):
for mail_type in MAIL_TYPES:
Expand Down Expand Up @@ -666,9 +788,12 @@ async def get_tasks(self, account):
yield task

async def get_contacts(self, account):
folder = account.root / "Top of Information Store" / "Contacts"
for contact in await asyncio.to_thread(folder.all().only, *CONTACT_FIELDS):
yield contact
try:
folder = account.root / "Top of Information Store" / "Contacts"
for contact in await asyncio.to_thread(folder.all().only, *CONTACT_FIELDS):
yield contact
except Exception:
raise


class OutlookDataSource(BaseDataSource):
Expand Down Expand Up @@ -735,45 +860,54 @@ def get_default_configuration(cls):
"sensitive": True,
"type": "str",
},
"client_emails": {
"depends_on": [{"field": "data_source", "value": OUTLOOK_CLOUD}],
"label": "Client Email Addresses (comma-separated)",
ilyasabdellaoui marked this conversation as resolved.
Show resolved Hide resolved
"order": 5,
ilyasabdellaoui marked this conversation as resolved.
Show resolved Hide resolved
"tooltip": "Specify the email addresses to limit data fetching to specific clients. If set to *, data will be fetched for all users.",
"required": False,
"type": "list",
"value": "*",
},
"exchange_server": {
"depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}],
"label": "Exchange Server",
"order": 5,
"order": 6,
"tooltip": "Exchange server's IP address. E.g. 127.0.0.1",
"type": "str",
},
"active_directory_server": {
"depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}],
"label": "Active Directory Server",
"order": 6,
"order": 7,
"tooltip": "Active Directory server's IP address. E.g. 127.0.0.1",
"type": "str",
},
"username": {
"depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}],
"label": "Exchange server username",
"order": 7,
"order": 8,
"type": "str",
},
"password": {
"depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}],
"label": "Exchange server password",
"order": 8,
"order": 9,
"sensitive": True,
"type": "str",
},
"domain": {
"depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}],
"label": "Exchange server domain name",
"order": 9,
"order": 10,
"tooltip": "Domain name such as gmail.com, outlook.com",
"type": "str",
},
"ssl_enabled": {
"depends_on": [{"field": "data_source", "value": OUTLOOK_SERVER}],
"display": "toggle",
"label": "Enable SSL",
"order": 10,
"order": 11,
"type": "bool",
"value": False,
},
Expand All @@ -783,13 +917,13 @@ def get_default_configuration(cls):
{"field": "ssl_enabled", "value": True},
],
"label": "SSL certificate",
"order": 11,
"order": 12,
"type": "str",
},
"use_text_extraction_service": {
"display": "toggle",
"label": "Use text extraction service",
"order": 12,
"order": 13,
"tooltip": "Requires a separate deployment of the Elastic Text Extraction Service. Requires that pipeline settings disable text extraction.",
"type": "bool",
"ui_restrictions": ["advanced"],
Expand All @@ -798,7 +932,7 @@ def get_default_configuration(cls):
"use_document_level_security": {
"display": "toggle",
"label": "Enable document level security",
"order": 13,
"order": 14,
"tooltip": "Document level security ensures identities and permissions set in Outlook are maintained in Elasticsearch. This enables you to restrict and personalize read-access users and groups have to documents in this index. Access control syncs ensure this metadata is kept up to date in your Elasticsearch documents.",
"type": "bool",
"value": False,
Expand Down Expand Up @@ -1072,9 +1206,11 @@ async def get_docs(self, filtering=None):
dictionary: dictionary containing meta-data of the files.
"""
async for account in self.client._get_user_instance.get_user_accounts():
self._logger.debug(f"Processing account: {account}")
timezone = account.default_timezone or DEFAULT_TIMEZONE

async for mail in self._fetch_mails(account=account, timezone=timezone):
self._logger.debug(f"Fetched mail: {mail}")
yield mail

async for contact in self._fetch_contacts(
Expand Down
Loading