Skip to content

Commit

Permalink
refactored code to parse URL correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
caronc committed Dec 1, 2024
1 parent bb5218a commit 5a32481
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 58 deletions.
133 changes: 92 additions & 41 deletions apprise/plugins/office365.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@
from ..locale import gettext_lazy as _


class Office365WebhookMode:
"""
Office 365 Webhook Mode
"""
# Send message as ourselves using the /me/ endpoint
SELF = 'self'

# Send message as ourselves using the /users/ endpoint
AS_USER = 'user'


# Define the modes in a list for validation purposes
OFFICE365_WEBHOOK_MODES = (
Office365WebhookMode.SELF,
Office365WebhookMode.AS_USER,
)


class NotifyOffice365(NotifyBase):
"""
A wrapper for Office 365 Notifications
Expand All @@ -62,7 +80,7 @@ class NotifyOffice365(NotifyBase):
service_url = 'https://office.com/'

# The default protocol
secure_protocol = 'o365'
secure_protocol = ('azure', 'o365')

# Allow 300 requests per minute.
# 60/300 = 0.2
Expand Down Expand Up @@ -103,7 +121,6 @@ class NotifyOffice365(NotifyBase):
'{schema}://{email}/{tenant}/{client_id}/{secret}',
'{schema}://{email}/{tenant}/{client_id}/{secret}/{targets}',
# Send from 'me'
'{schema}://{tenant}/{client_id}/{secret}',
'{schema}://{tenant}/{client_id}/{secret}/{targets}',
)

Expand Down Expand Up @@ -164,15 +181,33 @@ class NotifyOffice365(NotifyBase):
'oauth_secret': {
'alias_of': 'secret',
},
'mode': {
'name': _('Webhook Mode'),
'type': 'choice:string',
'values': OFFICE365_WEBHOOK_MODES,
'default': Office365WebhookMode.SELF,
},
})

def __init__(self, tenant, email, client_id, secret,
targets=None, cc=None, bcc=None, **kwargs):
def __init__(self, tenant, client_id, secret, email=None,
mode=None, targets=None, cc=None, bcc=None, **kwargs):
"""
Initialize Office 365 Object
"""
super().__init__(**kwargs)

# Prepare our Mode
self.mode = self.template_args['mode']['default'] \
if not mode else next(
(f for f in OFFICE365_WEBHOOK_MODES
if f.startswith(
mode.lower())), None)
if mode and not self.mode:
msg = \
'The specified Webhook mode ({}) was not found '.format(mode)
self.logger.warning(msg)
raise TypeError(msg)

# Tenant identifier
self.tenant = validate_regex(
tenant, *self.template_tokens['tenant']['regex'])
Expand All @@ -182,16 +217,24 @@ def __init__(self, tenant, email, client_id, secret,
self.logger.warning(msg)
raise TypeError(msg)

result = is_email(email)
if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.email = None
if email is not None:
result = is_email(email)
if not result:
msg = 'An invalid Office 365 Email Account ID' \
'({}) was specified.'.format(email)
self.logger.warning(msg)
raise TypeError(msg)

# Otherwise store our the email address
self.email = result['full_email']

elif self.mode != Office365WebhookMode.SELF:
msg = 'An expected Office 365 Email was not specified ' \
'(mode={})'.format(self.mode)
self.logger.warning(msg)
raise TypeError(msg)

# Otherwise store our the email address
self.email = result['full_email']

# Client Key (associated with generated OAuth2 Login)
self.client_id = validate_regex(
client_id, *self.template_tokens['client_id']['regex'])
Expand Down Expand Up @@ -318,7 +361,7 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
# Define our URL to post to
url = '{graph_url}/v1.0/me/sendMail'.format(
graph_url=self.graph_url,
) if not self.self.email \
) if not self.email \
else '{graph_url}/v1.0/users/{userid}/sendMail'.format(
userid=self.email,
graph_url=self.graph_url,
Expand Down Expand Up @@ -616,7 +659,7 @@ def url_identifier(self):
here.
"""
return (
self.secure_protocol, self.email, self.tenant, self.client_id,
self.secure_protocol[0], self.email, self.tenant, self.client_id,
self.secret,
)

Expand All @@ -625,8 +668,13 @@ def url(self, privacy=False, *args, **kwargs):
Returns the URL built dynamically based on specified arguments.
"""

# Our URL parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
# Define any URL parameters
params = {
'mode': self.mode,
}

# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))

if self.cc:
# Handle our Carbon Copy Addresses
Expand All @@ -642,21 +690,21 @@ def url(self, privacy=False, *args, **kwargs):
'' if not self.names.get(e)
else '{}:'.format(self.names[e]), e) for e in self.bcc])

return '{schema}://{tenant}:{email}/{client_id}/{secret}' \
return '{schema}://{email}{tenant}/{client_id}/{secret}' \
'/{targets}/?{params}'.format(
schema=self.secure_protocol,
schema=self.secure_protocol[0],
tenant=self.pprint(self.tenant, privacy, safe=''),
# email does not need to be escaped because it should
# already be a valid host and username at this point
email=self.email,
email=self.email + '/' if self.email else '',
client_id=self.pprint(self.client_id, privacy, safe=''),
secret=self.pprint(
self.secret, privacy, mode=PrivacyMode.Secret,
safe=''),
targets='/'.join(
[NotifyOffice365.quote('{}{}'.format(
'' if not e[0] else '{}:'.format(e[0]), e[1]),
safe='') for e in self.targets]),
safe='@') for e in self.targets]),
params=NotifyOffice365.urlencode(params))

def __len__(self):
Expand Down Expand Up @@ -687,6 +735,7 @@ def parse_url(url):

# Initialize our tenant
results['tenant'] = None

# Initialize our email
results['email'] = None

Expand All @@ -697,28 +746,36 @@ def parse_url(url):
results['email'] = \
NotifyOffice365.unquote(results['qsd']['from'])

# If tenant is occupied, then the user defined makes up our email
elif results['user']:
results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['user']),
NotifyOffice365.unquote(results['host']),
)

else:
# Hostname is no longer part of `from` and possibly instead
# is the tenant id
entries.insert(0, NotifyOffice365.unquote(results['host']))

# Tenant
if 'tenant' in results['qsd'] and \
len(results['qsd']['tenant']):
if 'tenant' in results['qsd'] and len(results['qsd']['tenant']):
# Extract the Tenant from the argument
results['tenant'] = \
NotifyOffice365.unquote(results['qsd']['tenant'])

# If tenant is occupied, then the user defined makes
# up our email
if not results['email'] and results['user']:
results['email'] = '{}@{}'.format(
NotifyOffice365.unquote(results['user']),
NotifyOffice365.unquote(results['host']),
)
elif entries:
results['tenant'] = NotifyOffice365.unquote(entries.pop(0))

# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])

elif not results['user']:
# Only tenant id specified (emails are sent 'from me')
results['tenant'] = NotifyOffice365.unquote(results['host'])
elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))

#
# Prepare our target listing
Expand All @@ -740,16 +797,6 @@ def parse_url(url):
# We're done
break

# OAuth2 ID
if 'oauth_id' in results['qsd'] and len(results['qsd']['oauth_id']):
# Extract the API Key from an argument
results['client_id'] = \
NotifyOffice365.unquote(results['qsd']['oauth_id'])

elif entries:
# Get our client_id is the first entry on the path
results['client_id'] = NotifyOffice365.unquote(entries.pop(0))

# OAuth2 Secret
if 'oauth_secret' in results['qsd'] and \
len(results['qsd']['oauth_secret']):
Expand Down Expand Up @@ -778,4 +825,8 @@ def parse_url(url):
if 'bcc' in results['qsd'] and len(results['qsd']['bcc']):
results['bcc'] = results['qsd']['bcc']

# Handle Mode
if 'mode' in results['qsd'] and len(results['qsd']['mode']):
results['mode'] = results['qsd']['mode']

return results
Loading

0 comments on commit 5a32481

Please sign in to comment.