Skip to content

Commit

Permalink
Add locking to katalogus service (1.13) (#2144)
Browse files Browse the repository at this point in the history
Co-authored-by: JP Bruins Slot <[email protected]>
  • Loading branch information
dekkers and jpbruinsslot authored Dec 15, 2023
1 parent b22b455 commit 2e507f7
Showing 1 changed file with 61 additions and 48 deletions.
109 changes: 61 additions & 48 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from typing import Dict, List

from scheduler.connectors.errors import exception_handler
Expand All @@ -15,6 +16,8 @@ class Katalogus(HTTPService):
def __init__(self, host: str, source: str, timeout: int = 5, cache_ttl: int = 30):
super().__init__(host, source, timeout)

self.lock = threading.Lock()

# For every organisation we cache its plugins, it references the
# plugin-id as key and the plugin as value.
self.organisations_plugin_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
Expand Down Expand Up @@ -44,76 +47,82 @@ def flush_caches(self) -> None:
def flush_organisations_plugin_cache(self) -> None:
self.logger.debug("flushing plugin cache [cache=%s]", self.organisations_plugin_cache.cache)

# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_plugin_cache.expiration_enabled = False
self.organisations_plugin_cache.reset()
with self.lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_plugin_cache.expiration_enabled = False
self.organisations_plugin_cache.reset()

orgs = self.get_organisations()
for org in orgs:
if org.id not in self.organisations_plugin_cache:
self.organisations_plugin_cache[org.id] = {}
self.organisations_new_boefjes_cache[org.id] = {}

orgs = self.get_organisations()
for org in orgs:
if org.id not in self.organisations_plugin_cache:
self.organisations_plugin_cache[org.id] = {}
self.organisations_new_boefjes_cache[org.id] = {}
plugins = self.get_plugins_by_organisation(org.id)
self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

plugins = self.get_plugins_by_organisation(org.id)
self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}
self.organisations_plugin_cache.expiration_enabled = True

self.organisations_plugin_cache.expiration_enabled = True
self.logger.debug("flushed plugins cache [cache=%s]", self.organisations_plugin_cache.cache)

def flush_organisations_boefje_type_cache(self) -> None:
"""boefje.consumes -> plugin type boefje"""
self.logger.debug("flushing boefje cache [cache=%s]", self.organisations_boefje_type_cache.cache)

# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_boefje_type_cache.expiration_enabled = False
self.organisations_boefje_type_cache.reset()
with self.lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_boefje_type_cache.expiration_enabled = False
self.organisations_boefje_type_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.organisations_boefje_type_cache[org.id] = {}
orgs = self.get_organisations()
for org in orgs:
self.organisations_boefje_type_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue
for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue

if plugin.enabled is False:
continue
if plugin.enabled is False:
continue

# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue
# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue

for type_ in plugin.consumes:
self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin)

self.organisations_boefje_type_cache.expiration_enabled = True

self.organisations_boefje_type_cache.expiration_enabled = True
self.logger.debug("flushed boefje cache [cache=%s]", self.organisations_boefje_type_cache.cache)

def flush_organisations_normalizer_type_cache(self) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("flushing normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache)

# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_normalizer_type_cache.expiration_enabled = False
self.organisations_normalizer_type_cache.reset()
with self.lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_normalizer_type_cache.expiration_enabled = False
self.organisations_normalizer_type_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.organisations_normalizer_type_cache[org.id] = {}

orgs = self.get_organisations()
for org in orgs:
self.organisations_normalizer_type_cache[org.id] = {}
for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue
if plugin.enabled is False:
continue

if plugin.enabled is False:
continue
for type_ in plugin.consumes:
self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin)

for type_ in plugin.consumes:
self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin)
self.organisations_normalizer_type_cache.expiration_enabled = True

self.organisations_normalizer_type_cache.expiration_enabled = True
self.logger.debug("flushed normalizer cache [cache=%s]", self.organisations_normalizer_type_cache.cache)

@exception_handler
Expand Down Expand Up @@ -147,28 +156,32 @@ def get_plugins_by_organisation(self, organisation_id: str) -> List[Plugin]:

def get_plugins_by_org_id(self, organisation_id: str) -> List[Plugin]:
try:
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
with self.lock:
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
except dict_utils.ExpiredError:
self.flush_organisations_plugin_cache()
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])

def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin:
try:
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
with self.lock:
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
except dict_utils.ExpiredError:
self.flush_organisations_plugin_cache()
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])

def get_boefjes_by_type_and_org_id(self, boefje_type: str, organisation_id: str) -> List[Plugin]:
try:
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
with self.lock:
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
except dict_utils.ExpiredError:
self.flush_organisations_boefje_type_cache()
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])

def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_type: str) -> List[Plugin]:
try:
return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])
with self.lock:
return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])
except dict_utils.ExpiredError:
self.flush_organisations_normalizer_type_cache()
return dict_utils.deep_get(self.organisations_normalizer_type_cache, [organisation_id, normalizer_type])
Expand Down

0 comments on commit 2e507f7

Please sign in to comment.