Skip to content

Commit

Permalink
Fix new boefjes issue for scheduler (1.16) (#3329)
Browse files Browse the repository at this point in the history
Co-authored-by: JP Bruins Slot <[email protected]>
Co-authored-by: Jan Klopper <[email protected]>
Co-authored-by: ammar92 <[email protected]>
  • Loading branch information
4 people authored Aug 7, 2024
1 parent 2caf60d commit f76eee6
Show file tree
Hide file tree
Showing 3 changed files with 463 additions and 127 deletions.
170 changes: 90 additions & 80 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,67 +22,67 @@ def __init__(
):
super().__init__(host, source, timeout, pool_connections)

self.lock = threading.Lock()

# For every organisation we cache its plugins, it references the
# plugin-id as key and the plugin as value.
self.organisations_plugin_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
self.plugin_cache_lock = threading.Lock()
self.plugin_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)

# For every organisation we cache on which type of object (consumes)
# the boefjes consume, it references the object type (consumes)
# as the key and a dict of boefjes as value.
self.organisations_boefje_type_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
self.boefje_cache_lock = threading.Lock()
self.boefje_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)

# For every organisation we cache on which type of object (consumes)
# the normalizers consume, it references the object type (consumes)
# as the key and a dict of normalizers as value.
self.organisations_normalizer_type_cache: dict_utils.ExpiringDict = dict_utils.ExpiringDict(lifetime=cache_ttl)
self.normalizer_cache_lock = threading.Lock()
self.normalizer_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)

# For every organisation we cache which new boefjes for an organisation
# have been enabled.
self.organisations_new_boefjes_cache: dict = {}
self.new_boefjes_cache_lock = threading.Lock()
self.new_boefjes_cache: dict = {}

# Initialise the cache.
self.flush_caches()

def flush_caches(self) -> None:
self.flush_organisations_plugin_cache()
self.flush_organisations_normalizer_type_cache()
self.flush_organisations_boefje_type_cache()
self.flush_plugin_cache()
self.flush_normalizer_cache()
self.flush_boefje_cache()

def flush_organisations_plugin_cache(self) -> None:
def flush_plugin_cache(self) -> None:
self.logger.debug("Flushing the katalogus plugin cache for organisations")

with self.lock:
with self.plugin_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_plugin_cache.expiration_enabled = False
self.organisations_plugin_cache.reset()
self.plugin_cache.expiration_enabled = False
self.plugin_cache.reset()

orgs = self.get_organisations()
for org in orgs:
if org.id not in self.organisations_plugin_cache:
self.organisations_plugin_cache[org.id] = {}
self.organisations_new_boefjes_cache[org.id] = {}
self.plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
self.organisations_plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}
self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

self.organisations_plugin_cache.expiration_enabled = True
self.plugin_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus plugin cache for organisations")

def flush_organisations_boefje_type_cache(self) -> None:
def flush_boefje_cache(self) -> None:
"""boefje.consumes -> plugin type boefje"""
self.logger.debug("Flushing the katalogus boefje type cache for organisations")

with self.lock:
with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_boefje_type_cache.expiration_enabled = False
self.organisations_boefje_type_cache.reset()
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.organisations_boefje_type_cache[org.id] = {}
self.boefje_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
Expand All @@ -97,28 +97,28 @@ def flush_organisations_boefje_type_cache(self) -> None:
# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.organisations_boefje_type_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue

for type_ in plugin.consumes:
self.organisations_boefje_type_cache[org.id].setdefault(type_, []).append(plugin)
self.boefje_cache[org.id].setdefault(type_, []).append(plugin)

self.organisations_boefje_type_cache.expiration_enabled = True
self.boefje_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus boefje type cache for organisations")

def flush_organisations_normalizer_type_cache(self) -> None:
def flush_normalizer_cache(self) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("Flushing the katalogus normalizer type cache for organisations")

with self.lock:
with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.organisations_normalizer_type_cache.expiration_enabled = False
self.organisations_normalizer_type_cache.reset()
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.organisations_normalizer_type_cache[org.id] = {}
self.normalizer_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
Expand All @@ -131,9 +131,9 @@ def flush_organisations_normalizer_type_cache(self) -> None:
continue

for type_ in plugin.consumes:
self.organisations_normalizer_type_cache[org.id].setdefault(type_, []).append(plugin)
self.normalizer_cache[org.id].setdefault(type_, []).append(plugin)

self.organisations_normalizer_type_cache.expiration_enabled = True
self.normalizer_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus normalizer type cache for organisations")

Expand Down Expand Up @@ -161,74 +161,84 @@ def get_organisations(self) -> list[Organisation]:
response = self.get(url)
return [Organisation(**organisation) for organisation in response.json().values()]

@exception_handler
def get_plugins_by_organisation(self, organisation_id: str) -> list[Plugin]:
url = f"{self.host}/v1/organisations/{organisation_id}/plugins"
response = self.get(url)
return [Plugin(**plugin) for plugin in response.json()]

def get_plugins_by_org_id(self, organisation_id: str) -> list[Plugin]:
def _get_from_cache() -> list[Plugin]:
with self.plugin_cache_lock:
return dict_utils.deep_get(self.plugin_cache, [organisation_id])

try:
with self.lock:
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_organisations_plugin_cache()
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id])
self.flush_plugin_cache()
return _get_from_cache()

def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin:
def _get_from_cache() -> Plugin:
with self.plugin_cache_lock:
return dict_utils.deep_get(self.plugin_cache, [organisation_id, plugin_id])

try:
with self.lock:
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_organisations_plugin_cache()
return dict_utils.deep_get(self.organisations_plugin_cache, [organisation_id, plugin_id])
self.flush_plugin_cache()
return _get_from_cache()

def get_boefjes_by_type_and_org_id(self, boefje_type: str, organisation_id: str) -> list[Plugin]:
def _get_from_cache() -> list[Plugin]:
with self.boefje_cache_lock:
return dict_utils.deep_get(self.boefje_cache, [organisation_id, boefje_type])

try:
with self.lock:
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_organisations_boefje_type_cache()
return dict_utils.deep_get(self.organisations_boefje_type_cache, [organisation_id, boefje_type])
self.flush_boefje_cache()
return _get_from_cache()

def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_type: str) -> list[Plugin]:
try:
with self.lock:
def _get_from_cache() -> list[Plugin]:
with self.normalizer_cache_lock:
return dict_utils.deep_get(
self.organisations_normalizer_type_cache,
self.normalizer_cache,
[organisation_id, normalizer_type],
)

try:
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_organisations_normalizer_type_cache()
return dict_utils.deep_get(
self.organisations_normalizer_type_cache,
[organisation_id, normalizer_type],
)
self.flush_normalizer_cache()
return _get_from_cache()

def get_new_boefjes_by_org_id(self, organisation_id: str) -> list[Plugin]:
# Get the enabled boefjes for the organisation from katalogus
plugins = self.get_plugins_by_organisation(organisation_id)
enabled_boefjes = {
plugin.id: plugin
for plugin in plugins
if plugin.enabled is True and plugin.type == "boefje" and plugin.consumes
}

# Check if there are new boefjes
new_boefjes = []
for boefje_id, boefje in enabled_boefjes.items():
if boefje_id in self.organisations_new_boefjes_cache.get(organisation_id, {}):
continue

new_boefjes.append(boefje)

self.organisations_new_boefjes_cache[organisation_id] = enabled_boefjes

self.logger.debug(
"%d new boefjes found for organisation %s",
len(new_boefjes),
organisation_id,
organisation_id=organisation_id,
boefjes=[boefje.name for boefje in new_boefjes],
)

return new_boefjes
with self.new_boefjes_cache_lock:
# Get the enabled boefjes for the organisation from katalogus
plugins = self.get_plugins_by_organisation(organisation_id)
enabled_boefjes = {
plugin.id: plugin
for plugin in plugins
if plugin.enabled is True and plugin.type == "boefje" and plugin.consumes
}

# Check if there are new boefjes
new_boefjes = []
for boefje_id, boefje in enabled_boefjes.items():
if boefje_id not in self.new_boefjes_cache.get(organisation_id, {}):
new_boefjes.append(boefje)

# Update the cache
self.new_boefjes_cache[organisation_id] = enabled_boefjes

self.logger.debug(
"%d new boefjes found for organisation %s",
len(new_boefjes),
organisation_id,
organisation_id=organisation_id,
boefjes=[boefje.name for boefje in new_boefjes],
)

return new_boefjes
4 changes: 4 additions & 0 deletions mula/scheduler/utils/dict_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,7 @@ def __len__(self) -> int:
def __iter__(self) -> Iterator[str]:
with self.lock:
return iter(self.cache)

def setdefault(self, key: str, default: Any) -> Any:
with self.lock:
return self.cache.setdefault(key, default)
Loading

0 comments on commit f76eee6

Please sign in to comment.