From 942300cc8607a52bd5c1d06ea97e07fcc96e111d Mon Sep 17 00:00:00 2001 From: Onur Atakan ULUSOY Date: Wed, 27 Dec 2023 17:14:58 +0300 Subject: [PATCH] Added thread based submission for active_module and added quiet option --- upsonic/remote/controller.py | 48 +++++++++++++++++++++++++++--------- upsonic/remote/interface.py | 35 +++++++++++++++----------- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/upsonic/remote/controller.py b/upsonic/remote/controller.py index 357772b..a9c3b2d 100644 --- a/upsonic/remote/controller.py +++ b/upsonic/remote/controller.py @@ -6,7 +6,7 @@ from hashlib import sha256 -import time + import pickle import os @@ -14,11 +14,14 @@ import copy import inspect +import threading +import time class Upsonic_Remote: def _log(self, message): - self.console.log(message) + if not self.quiet: + self.console.log(message) def __enter__(self): return self # pragma: no cover @@ -26,10 +29,13 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): pass # pragma: no cover - def __init__(self, database_name, api_url, password=None, enable_hashing:bool=False, verify=True, locking=False, client_id=None, cache=False, cache_counter=None, version=False, client_version=False, key_encyption=False, meta_datas = True): + def __init__(self, database_name, api_url, password=None, enable_hashing:bool=False, verify=True, locking=False, client_id=None, cache=False, cache_counter=None, version=False, client_version=False, key_encyption=False, meta_datas = True, quiet=False, thread_number=1): import requests from requests.auth import HTTPBasicAuth + self.thread_number = thread_number + + self.quiet = quiet self.meta_datas = meta_datas @@ -109,7 +115,9 @@ def active_module(self, module, encryption_key="a", compress=None): classes = [obj for name, obj in inspect.getmembers(module) if inspect.isclass(obj)] - + + threads = [] + for element in functions + classes: name = element.__module__ +"." + element.__name__ first_element = name.split(".")[0] @@ -120,7 +128,18 @@ def active_module(self, module, encryption_key="a", compress=None): try: - self.set(name, element, encryption_key=encryption_key, compress=compress) + while len(threads) >= self.thread_number: + for each in threads: + if not each.is_alive(): + threads.remove(each) + time.sleep(0.1) + + print("Thread_submitted", len(threads)) + the_thread = threading.Thread(target=self.set, args=(name, element), kwargs={"encryption_key": encryption_key, "compress": compress}) + the_thread.start() + + thread = the_thread + threads.append(thread) except: import traceback traceback.print_exc() @@ -128,6 +147,10 @@ def active_module(self, module, encryption_key="a", compress=None): self.delete(name) + for each in threads: + each.join() + + def get_set_version_tag(self, client_id=None): the_key = "set_version_number" @@ -301,7 +324,7 @@ def lock_control(self, key): def lock_key(self, key): if self._lock_control(key, locking_operation=True): - self.console.log(f"[bold red] '{key}' is already locked") + self._log(f"[bold red] '{key}' is already locked") return False the_client_id = self.client_id @@ -309,7 +332,7 @@ def lock_key(self, key): the_client_id = "Unknown" if self.set(key+"_lock", the_client_id, locking_operation=True, encryption_key=None) == "Data set successfully": - self.console.log(f"[bold green] '{key}' is locked") + self._log(f"[bold green] '{key}' is locked") return True else: return False @@ -317,17 +340,17 @@ def lock_key(self, key): def unlock_key(self, key): result = self._lock_control(key, locking_operation=True) if not result: - self.console.log(f"[bold red] '{key}' is already unlocked") + self._log(f"[bold red] '{key}' is already unlocked") return False if self._lock_control(key): - self.console.log(f"[bold red] '{key}' is locked by another client") + self._log(f"[bold red] '{key}' is locked by another client") return False if self.delete(key+"_lock") == "Data deleted successfully": - self.console.log(f"[bold green] '{key}' is unlocked") + self._log(f"[bold green] '{key}' is unlocked") return True else: return False @@ -340,10 +363,11 @@ def _update_set(self, key, meta): + def set(self, key, value, encryption_key="a", compress=None, cache_policy=0, locking_operation=False, update_operation=False, version_tag=None, no_version=False): if not locking_operation: if self.lock_control(key): - self.console.log(f"[bold red] '{key}' is locked") + self._log(f"[bold red] '{key}' is locked") return None the_type = type(value).__name__ @@ -434,7 +458,7 @@ def get(self, key, encryption_key="a", no_cache=False, version_tag=None, no_vers if the_hash != self._cache_hash[key] and the_hash is not None: self._cache_hash[key] = the_hash self.cache_hash_save() - self.console.log("Cache is updated") + self._log("Cache is updated") try: self.cache_pop(key) except FileNotFoundError: diff --git a/upsonic/remote/interface.py b/upsonic/remote/interface.py index 876dba0..27d2026 100644 --- a/upsonic/remote/interface.py +++ b/upsonic/remote/interface.py @@ -47,7 +47,7 @@ def decrypt(key, message): -def Upsonic_Cloud_Free(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None): +def Upsonic_Cloud_Free(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None): if database_name == None: database_name = os.environ.get("database_key") if access_key == None: @@ -84,14 +84,17 @@ def Upsonic_Cloud_Free(database_name=None, access_key=None, locking=None, client meta_datas = os.environ.get("meta_datas", "true").lower() == "true" + if thread_number == None: + thread_number = int(os.environ.get("thread_number", "1")) + from upsonic import Upsonic_Remote return Upsonic_Remote( - database_name, "https://cloud_1.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas + database_name, "https://cloud_1.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number ) # pragma: no cover -def Upsonic_Cloud_Pro(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None): +def Upsonic_Cloud_Pro(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None): if database_name == None: database_name = os.environ.get("database_key") if access_key == None: @@ -124,15 +127,16 @@ def Upsonic_Cloud_Pro(database_name=None, access_key=None, locking=None, client_ if meta_datas == None: meta_datas = os.environ.get("meta_datas", "true").lower() == "true" - + if thread_number == None: + thread_number = int(os.environ.get("thread_number", "1")) from upsonic import Upsonic_Remote return Upsonic_Remote( - database_name, "https://cloud_2.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas + database_name, "https://cloud_2.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number ) # pragma: no cover -def Upsonic_Cloud_Premium(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None): +def Upsonic_Cloud_Premium(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None): if database_name == None: database_name = os.environ.get("database_key") if access_key == None: @@ -165,14 +169,15 @@ def Upsonic_Cloud_Premium(database_name=None, access_key=None, locking=None, cli if meta_datas == None: meta_datas = os.environ.get("meta_datas", "true").lower() == "true" - + if thread_number == None: + thread_number = int(os.environ.get("thread_number", "1")) from upsonic import Upsonic_Remote return Upsonic_Remote( - database_name, "https://cloud_3.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas + database_name, "https://cloud_3.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number ) # pragma: no cover -def Upsonic_Cloud_Startup(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None): +def Upsonic_Cloud_Startup(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None): if database_name == None: database_name = os.environ.get("database_key") if access_key == None: @@ -206,17 +211,18 @@ def Upsonic_Cloud_Startup(database_name=None, access_key=None, locking=None, cli if meta_datas == None: meta_datas = os.environ.get("meta_datas", "true").lower() == "true" - + if thread_number == None: + thread_number = int(os.environ.get("thread_number", "1")) from upsonic import Upsonic_Remote return Upsonic_Remote( - database_name, "https://cloud_4.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas + database_name, "https://cloud_4.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number ) # pragma: no cover -def Upsonic_Cloud_Readonly(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None): +def Upsonic_Cloud_Readonly(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None): if database_name == None: database_name = os.environ.get("database_key") if access_key == None: @@ -250,11 +256,12 @@ def Upsonic_Cloud_Readonly(database_name=None, access_key=None, locking=None, cl if meta_datas == None: meta_datas = os.environ.get("meta_datas", "true").lower() == "true" - + if thread_number == None: + thread_number = int(os.environ.get("thread_number", "1")) from upsonic import Upsonic_Remote return Upsonic_Remote( - database_name, "https://cloud_0.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas + database_name, "https://cloud_0.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number ) # pragma: no cover