Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlock on IDENTITIES_TASKS_ON #65

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 82 additions & 81 deletions mordred/task_identities.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,89 +531,90 @@ def execute(self):
break
# ** END SYNC LOGIC **

cfg = self.config.get_conf()

uuids_refresh = []

if self.unify:
for algo in cfg['sortinghat']['matching']:
if not algo:
# cfg['sortinghat']['matching'] is an empty list
logger.debug('Unify not executed because empty algorithm')
continue
kwargs = {'matching': algo, 'fast_matching': True,
'strict_mapping': cfg['sortinghat']['strict_mapping']}
logger.info("[sortinghat] Unifying identities using algorithm %s",
kwargs['matching'])
uuids = self.do_unify(kwargs)
uuids_refresh += uuids
logger.debug("uuids to refresh from unify: %s", uuids)

if self.affiliate:
if not cfg['sortinghat']['affiliate']:
logger.debug("Not doing affiliation")
else:
# Global enrollments using domains
logger.info("[sortinghat] Executing affiliate")
uuids = self.do_affiliate()
uuids_refresh += uuids
logger.debug("uuids to refresh from affiliate: %s", uuids)

if self.autoprofile:
# autoprofile = [] -> cfg['sortinghat']['autoprofile'][0] = ['']
if ('autoprofile' not in cfg['sortinghat'] or
not cfg['sortinghat']['autoprofile'][0]):
logger.info("[sortinghat] Autoprofile not configured. Skipping.")
else:
logger.info("[sortinghat] Executing autoprofile for sources: %s",
cfg['sortinghat']['autoprofile'])
sources = cfg['sortinghat']['autoprofile']
self.do_autoprofile(sources)

# The uuids must be refreshed in all backends (data sources)
# Give 5s so the queue is filled and if not, continue without it
try:
autorefresh_backends_uuids = TasksManager.UPDATED_UUIDS_QUEUE.get(timeout=5)
for backend_section in autorefresh_backends_uuids:
autorefresh_backends_uuids[backend_section] += uuids_refresh
TasksManager.UPDATED_UUIDS_QUEUE.put(autorefresh_backends_uuids)
logger.debug("Autorefresh uuids queue after processing identities: %s", autorefresh_backends_uuids)
except Empty:
logger.warning("Autorefresh uuids not active because the queue for it is empty.")
cfg = self.config.get_conf()

uuids_refresh = []

if self.unify:
for algo in cfg['sortinghat']['matching']:
if not algo:
# cfg['sortinghat']['matching'] is an empty list
logger.debug('Unify not executed because empty algorithm')
continue
kwargs = {'matching': algo, 'fast_matching': True,
'strict_mapping': cfg['sortinghat']['strict_mapping']}
logger.info("[sortinghat] Unifying identities using algorithm %s",
kwargs['matching'])
uuids = self.do_unify(kwargs)
uuids_refresh += uuids
logger.debug("uuids to refresh from unify: %s", uuids)

if self.affiliate:
if not cfg['sortinghat']['affiliate']:
logger.debug("Not doing affiliation")
else:
# Global enrollments using domains
logger.info("[sortinghat] Executing affiliate")
uuids = self.do_affiliate()
uuids_refresh += uuids
logger.debug("uuids to refresh from affiliate: %s", uuids)

if self.autoprofile:
# autoprofile = [] -> cfg['sortinghat']['autoprofile'][0] = ['']
if ('autoprofile' not in cfg['sortinghat'] or
not cfg['sortinghat']['autoprofile'][0]):
logger.info("[sortinghat] Autoprofile not configured. Skipping.")
else:
logger.info("[sortinghat] Executing autoprofile for sources: %s",
cfg['sortinghat']['autoprofile'])
sources = cfg['sortinghat']['autoprofile']
self.do_autoprofile(sources)

if self.bots:
if 'bots_names' not in cfg['sortinghat']:
logger.info("[sortinghat] Bots name list not configured. Skipping.")
else:
logger.info("[sortinghat] Marking bots: %s",
cfg['sortinghat']['bots_names'])
for name in cfg['sortinghat']['bots_names']:
# First we need the uuids for the profile name
uuids = self.__get_uuids_from_profile_name(name)
# Then we can modify the profile setting bot flag
profile = {"is_bot": True}
for uuid in uuids:
api.edit_profile(self.db, uuid, **profile)
# For quitting the bot flag - debug feature
if 'no_bots_names' in cfg['sortinghat']:
logger.info("[sortinghat] Removing Marking bots: %s",
cfg['sortinghat']['no_bots_names'])
for name in cfg['sortinghat']['no_bots_names']:
# The uuids must be refreshed in all backends (data sources)
# Give 5s so the queue is filled and if not, continue without it
try:
autorefresh_backends_uuids = TasksManager.UPDATED_UUIDS_QUEUE.get(timeout=5)
for backend_section in autorefresh_backends_uuids:
autorefresh_backends_uuids[backend_section] += uuids_refresh
TasksManager.UPDATED_UUIDS_QUEUE.put(autorefresh_backends_uuids)
logger.debug("Autorefresh uuids queue after processing identities: %s", autorefresh_backends_uuids)
except Empty:
logger.warning("Autorefresh uuids not active because the queue for it is empty.")

if self.bots:
if 'bots_names' not in cfg['sortinghat']:
logger.info("[sortinghat] Bots name list not configured. Skipping.")
else:
logger.info("[sortinghat] Marking bots: %s",
cfg['sortinghat']['bots_names'])
for name in cfg['sortinghat']['bots_names']:
# First we need the uuids for the profile name
uuids = self.__get_uuids_from_profile_name(name)
profile = {"is_bot": False}
# Then we can modify the profile setting bot flag
profile = {"is_bot": True}
for uuid in uuids:
api.edit_profile(self.db, uuid, **profile)

# Autorefresh must be done once identities processing has finished
# Give 5s so the queue is filled and if not, continue without it
try:
autorefresh_backends = TasksManager.AUTOREFRESH_QUEUE.get(timeout=5)
for backend_section in autorefresh_backends:
autorefresh_backends[backend_section] = True
TasksManager.AUTOREFRESH_QUEUE.put(autorefresh_backends)
logger.debug("Autorefresh queue after processing identities: %s", autorefresh_backends)
except Empty:
logger.warning("Autorefresh not active because the queue for it is empty.")

with TasksManager.IDENTITIES_TASKS_ON_LOCK:
TasksManager.IDENTITIES_TASKS_ON = False
# For quitting the bot flag - debug feature
if 'no_bots_names' in cfg['sortinghat']:
logger.info("[sortinghat] Removing Marking bots: %s",
cfg['sortinghat']['no_bots_names'])
for name in cfg['sortinghat']['no_bots_names']:
uuids = self.__get_uuids_from_profile_name(name)
profile = {"is_bot": False}
for uuid in uuids:
api.edit_profile(self.db, uuid, **profile)

# Autorefresh must be done once identities processing has finished
# Give 5s so the queue is filled and if not, continue without it
try:
autorefresh_backends = TasksManager.AUTOREFRESH_QUEUE.get(timeout=5)
for backend_section in autorefresh_backends:
autorefresh_backends[backend_section] = True
TasksManager.AUTOREFRESH_QUEUE.put(autorefresh_backends)
logger.debug("Autorefresh queue after processing identities: %s", autorefresh_backends)
except Empty:
logger.warning("Autorefresh not active because the queue for it is empty.")
finally:
with TasksManager.IDENTITIES_TASKS_ON_LOCK:
TasksManager.IDENTITIES_TASKS_ON = False