Skip to content

Commit

Permalink
Celery task for updating user data working
Browse files Browse the repository at this point in the history
properly; Logging of task execution in separate celery
log
  • Loading branch information
monotasker committed Sep 1, 2023
1 parent 345989b commit 379542b
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,28 @@
The service can also be triggered by a webhook signal from the remote ID provider. A webhook signal should be sent to the endpoint https://example.org/api/webhooks/idp_data_update/ and the request must include a security token (provided by the Invenio admins) in the request header. This token is set in the REMOTE_USER_DATA_WEBHOOK_TOKEN configuration variable.
The webhook signal should be a POST request with a JSON body. The body should be a JSON object whose top-level keys are the types of data object that have been updated on the remote IDP. The value of each key is an array of objects representing the updated entities. Each of these objects should include the key "id", whose value is the entity's string identifier on the remote IDP. It should also include the key "event", whose value is the type of event that is being signalled (e.g., "updated", "created", "deleted", etc.).
The webhook signal should be a POST request with a JSON body. The body should be a JSON object whose top-level keys are
:idp: The name of the remote IDP that is sending the signal. This is a
string that must match one of the keys in the
REMOTE_USER_DATA_API_ENDPOINTS configuration variable.
:updates: A JSON object whose top-level keys are the types of data object that
have been updated on the remote IDP. The value of each key is an
array of objects representing the updated entities. Each of these
objects should include the "id" property, whose value is the entity's
string identifier on the remote IDP. It should also include the
"event" property, whose value is the type of event that is being
signalled (e.g., "updated", "created", "deleted", etc.).
E.g.,
{'users': [{'id': '1234', 'event': 'updated'},
{'id': '5678', 'event': 'created'}],
'groups': [{'id': '1234', 'event': 'deleted'}]
{"idp": "knowledgeCommons",
"updates": {
"users": [{"id": "1234", "event": "updated"},
{"id": "5678", "event": "created"}],
"groups": [{"id": "1234", "event": "deleted"}]
}
}
Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

from kombu import Exchange

REMOTE_USER_DATA_ENTITY_TYPES = {
"users": {'events': ['created', 'updated', 'deleted']},
"groups": {'events': ['created', 'updated', 'deleted']}
}

REMOTE_USER_DATA_API_ENDPOINTS = {
"knowledgeCommons": {
"groups": {
Expand Down
108 changes: 68 additions & 40 deletions site/knowledge_commons_repository/invenio_remote_user_data/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datetime
from invenio_accounts.models import Role, User, UserIdentity
from invenio_accounts.utils import jwt_create_token
from invenio_queues.proxies import current_queues
from invenio_records_resources.services import Service
from invenio_utilities_tuw.utils import get_identity_for_user, get_user_by_identifier
from flask import after_this_request, request, session
Expand All @@ -22,6 +23,9 @@
from typing import Optional
from werkzeug.local import LocalProxy
from .components.groups import GroupsComponent
from .signals import remote_data_updated
from .utils import logger as update_logger
from .views import IDPUpdateWebhook

class RemoteUserDataService(Service):
"""Service for retrieving user data from a Remote server."""
Expand All @@ -39,45 +43,72 @@ def __init__(self, app, config={}, **kwargs):
self.user_data_stale = True
self.update_in_progress = False

# FIXME: Should we listen to other signals and update more often?
@identity_loaded.connect_via(app)
def on_identity_loaded(_, identity:Identity) -> None:
"""Update user data from remote server."""
self.user_data_stale = True
if 'user-data-updated' in session.keys():
last_update_dt = datetime.datetime.fromisoformat(session['user-data-updated'])
@remote_data_updated.connect_via(app)
def on_webhook_update_signal(_, events:list) -> None:
"""Update user data from remote server when webhook is triggered.
"""
self.logger.info('%%%%% webhook signal received')

for event in current_queues.queues['user-data-updates'].consume():
if event['entity_type'] == 'users' and event['event'] == 'updated':
try:
# confirm that user exists in Invenio
my_user_identity = UserIdentity.query.filter_by(id=event['id']).one_or_none()
assert my_user_identity is not None

timestamp = datetime.datetime.utcnow().isoformat()
session.setdefault('user-data-updated', {})[
my_user_identity.id_user] = timestamp
celery_result = do_user_data_update.delay(
my_user_identity.id_user,
event['idp'],
event['id'])
# self.logger.info(f'celery_result_id: {celery_result.id}')
except AssertionError:
update_logger.error(f'Cannot update: user {event["id"]} does not exist in Invenio.')
elif event['entity_type'] == 'groups' and event['event'] == 'updated':
# TODO: implement group updates and group/user creation
pass

@identity_changed.connect_via(app)
def on_identity_changed(_, identity:Identity) -> None:
"""Update user data from remote server when current user is changed."""
# FIXME: Do we need this check now that we're using webhook updates?
if self._data_is_stale(identity.id) and not self.update_in_progress:
my_user_identity = UserIdentity.query.filter_by(
id_user=identity.id).one_or_none()
# will have a UserIdentity if the user has logged in via an IDP
if my_user_identity is not None:
my_idp = my_user_identity.method
my_remote_id = my_user_identity.id

timestamp = datetime.datetime.utcnow().isoformat()
session.setdefault('user-data-updated', {})[
identity.id] = timestamp
celery_result = do_user_data_update.delay(identity.id,
my_idp,
my_remote_id)
# self.logger.debug(f'celery_result_id: {celery_result.id}')

def _data_is_stale(self, user_id) -> bool:
"""Check whether user data is stale."""
user_data_stale = True
if user_id and \
'user-data-updated' in session.keys() and \
type(session['user-data-updated']) != str and \
user_id in session['user-data-updated'].keys():
if session['user-data-updated'][user_id]:
last_update_dt = datetime.datetime.fromisoformat(session['user-data-updated'][user_id])
interval = datetime.datetime.utcnow() - last_update_dt
if interval <= self.update_interval:
self.user_data_stale = False

if self.user_data_stale and not self.update_in_progress:
self.update_in_progress = True
security_datastore = LocalProxy(lambda:
app.extensions["security"].datastore)
my_user = security_datastore.find_user(id=identity.id)
# self.updated_data is to store result for testing
self.updated_data = {}
if my_user is not None:
my_user_identity = UserIdentity.query.filter_by(
id_user=my_user.id).one_or_none()
# will have a UserIdentity if the user has logged in via an IDP
if my_user_identity is not None:
timestamp = datetime.datetime.utcnow().isoformat()
session['user-data-updated'] = timestamp
my_idp = my_user_identity.method
my_remote_id = my_user_identity.id
self.logger.debug('%%%%% launching celery task')
celery_result = do_user_data_update.delay(my_user.id,
my_idp,
my_remote_id)
self.logger.debug(f'celery_result_id: {celery_result.id}')
self.update_in_progress = False
user_data_stale = False
return user_data_stale

def update_data_from_remote(self, user_id:int,
idp:str, remote_id:str, **kwargs) -> dict:
"""Main method to update user data from remote server.
"""
self.logger.debug("Updating user data from remote server.")
update_logger.debug(f"Updating data from remote server -- user: {user_id}; idp: {idp}; remote_id: {remote_id}.")
changed_data = {}
updated_data = {}
user = get_user_by_identifier(user_id)
Expand Down Expand Up @@ -127,10 +158,11 @@ def fetch_from_remote_api(self, user:User,
except requests.exceptions.JSONDecodeError:
self.logger.debug(f'JSONDecodeError: User group data API response was not JSON:')
# self.logger.debug(f'{response.text}')
# time.sleep(60)
remote_data = {'groups': [{'name': 'awesome-mock2'},
time.sleep(30)
remote_data = {'groups': [{'name': 'awesome-mock5'},
{'name': 'admin'}]
}
self.logger.debug(f'USER GROUPS: {remote_data["groups"]}')

return remote_data

Expand Down Expand Up @@ -176,13 +208,13 @@ def update_invenio_group_memberships(self, user:User,
"""
grouper = GroupsComponent(self)
updated_local_groups = [r.name for r in user.roles]
self.logger.debug(f'ADDING GROUPS for user: {user}')
for group_name in changed_memberships["added_groups"]:
self.logger.debug(f'ADDING GROUP for user {user}: {group_name}')
group_role = grouper.find_or_create_group(group_name)
if group_role and grouper.add_user_to_group(group_role, user) is not None:
updated_local_groups.append(group_role.name)
self.logger.debug(f'DROPPING GROUPS for user: {user}')
for group_name in changed_memberships["dropped_groups"]:
self.logger.debug(f'DROPPING GROUP for user {user}: {group_name}')
group_role = grouper.find_group(group_name)
if group_role and grouper.remove_user_from_group(group_role, user) is not None:
updated_local_groups.remove(group_role.name)
Expand All @@ -191,8 +223,4 @@ def update_invenio_group_memberships(self, user:User,
grouper.delete_group(group_role.name)
assert updated_local_groups == user.roles

# my_identity = get_identity_for_user(user.email)
# self.logger.debug(f'USER NEEDS now: {my_identity.provides}')
# grouper.update_identity_needs(identity, updated_local_groups)

return updated_local_groups
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
#
# This file is part of the invenio-remote-user-data package.
# Copyright (C) 2023, MESH Research.
#
# invenio-remote-user-data is free software; you can redistribute it
# and/or modify it under the terms of the MIT License; see
# LICENSE file for more details.

"""Signals for invenio-remote-user-data.
"""

from blinker import Namespace
remote_update_signals = Namespace()

remote_data_updated = remote_update_signals.signal('remote-data-updated')
"""Remote data updated signal.
Sent when the idp_data_update webhook receives a signal about an update
from a remote IDP server and publishes an event to the "user-data-updates" event queue.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@
# from celery import current_app as current_celery_app
from celery import shared_task
from celery.utils.log import get_task_logger
from flask import current_app as app
import datetime
from flask import current_app as app, session
from knowledge_commons_repository.invenio_remote_user_data.proxies import current_remote_user_data_service

logger = get_task_logger(__name__)
task_logger = get_task_logger(__name__)

@shared_task(ignore_result=True)
@shared_task(ignore_result=False)
def do_user_data_update(user_id, idp, remote_id, **kwargs):
"""Send a notification that a user has logged in."""
service = current_remote_user_data_service
service.update_data_from_remote(user_id, idp, remote_id)
logger.debug('doing task&&&&&&&')
return True
"""Perform a user data update."""

with app.app_context():
task_logger.debug('doing task&&&&&&&')
task_logger.info(dir(task_logger))
task_logger.info(task_logger.handlers)
service = current_remote_user_data_service
service.update_data_from_remote(user_id, idp, remote_id)
return True

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s:%(levelname)s : %(message)s')
file_handler = logging.RotatingFileHandler('logs/remote_data_updates.log',
maxBytes=1000000, backupCount=5)
file_handler = logging.handlers.RotatingFileHandler(
'logs/remote_data_updates.log', maxBytes=1000000, backupCount=5)
file_handler.setFormatter(formatter)
if (logger.hasHandlers()):
logger.handlers.clear()
Expand Down
81 changes: 68 additions & 13 deletions site/knowledge_commons_repository/invenio_remote_user_data/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,24 @@
Signal content
--------------
Notifications can be sent for multiple updates to multiple entities in a single request. The signal body must be a JSON object whose top-level keys are the types of data object that have been updated on the remote IDP. The value of each key is an array of objects representing the updated entities. Each of these objects should include the key "id", whose value is the entity's string identifier on the remote IDP. It should also include the key "event", whose value is the type of event that is being signalled (e.g., "updated", "created", "deleted", etc.).
Notifications can be sent for multiple updates to multiple entities in a single request. The signal body must be a JSON object whose top-level keys are
:idp: The name of the remote IDP that is sending the signal. This is a
string that must match one of the keys in the
REMOTE_USER_DATA_API_ENDPOINTS configuration variable.
:updates: A JSON object whose top-level keys are the types of data object that
have been updated on the remote IDP. The value of each key is an array of objects representing the updated entities. Each of these objects should include an "id" property whose value is the entity's string identifier on the remote IDP. It should also include the "event" property, whose value is the type of event that is being signalled (e.g., "updated", "created", "deleted", etc.).
For example:
{
users: [{id: "1234", event: "updated"},
{id: "5678", event: "created"}],
groups: [{id: "1234", event: "deleted"}]
"idp": "knowledgeCommons",
"updates": {
"users": [{"id": "1234", "event": "updated"},
{"id": "5678", "event": "created"}],
"groups": [{"id": "1234", "event": "deleted"}]
}
}
Logging
Expand All @@ -57,11 +67,15 @@
"""

# from flask import render_template
from flask import Blueprint, jsonify, make_response, request
from flask import Blueprint, jsonify, make_response, request, current_app as app
from flask.views import MethodView
from werkzeug.exceptions import Forbidden, MethodNotAllowed, NotFound
from invenio_queues.proxies import current_queues
from werkzeug.exceptions import (
BadRequest, Forbidden, MethodNotAllowed, NotFound, Unauthorized
)
import os
from .utils import logger
from .signals import remote_data_updated

"""
Expand All @@ -84,16 +98,57 @@ def post (self):
"""
Render the support template
"""
entity_types = app.config['REMOTE_USER_DATA_ENTITY_TYPES']
headers = request.headers
bearer = headers.get('Authorization')
token = bearer.split()[1]
# if token != self.webhook_token:
# return "Unauthorized", 401

if 'users' in request.json.keys():
logger.info(f"Received user update signal: {request.json['users']}")
if 'groups' in request.json.keys():
logger.info(f"Received user update signal: {request.json['users']}")
if token != self.webhook_token:
raise Unauthorized

try:
idp = request.json['idp']
events = []
bad_entity_types = []
bad_events = []

for e in request.json['updates'].keys():
if e in entity_types.keys():
logger.info(f'{idp} Received {e} update signal: '
f'{request.json["updates"][e]}')
for u in request.json['updates'][e]:
if u['event'] in entity_types[e]['events']:
events.append({'idp': idp,
'entity_type': e,
'event': u['event'],
'id': u['id']})
else:
bad_events.append(u)
logger.error(f'{idp} Received update signal for '
f'unknown event: {u}')
else:
bad_entity_types.append(e)
logger.error(f'{idp} Received update signal for unknown '
f'entity type: {e}')
logger.error(request.json)

if len(events) > 0:
current_queues.queues["user-data-updates"].publish(events)
remote_data_updated.send(app._get_current_object(), events=events)
logger.info(f'Published {len(events)} events to queue and emitted remote_data_updated signal')
logger.info(events)
else:
logger.info(f'{idp} No valid events received')
logger.info(request.json['updates'])
raise BadRequest

# return error message after handling signals that are
# properly formed
if len(bad_entity_types) > 0 or len(bad_events) > 0:
# FIXME: raise better error, since request isn't completely rejected
raise BadRequest
except KeyError: # request is missing 'idp' or 'updates' keys
logger.error(f"Received malformed signal: {request.json}")
raise BadRequest

return jsonify({"message": "Webhook notification accepted",
"status": 202}), 202
Expand Down

0 comments on commit 379542b

Please sign in to comment.