Skip to content

Commit

Permalink
Added thread based submission for active_module and added quiet option
Browse files Browse the repository at this point in the history
  • Loading branch information
onuratakan committed Dec 27, 2023
1 parent c6a28d3 commit 942300c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 26 deletions.
48 changes: 36 additions & 12 deletions upsonic/remote/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,36 @@

from hashlib import sha256

import time


import pickle
import os

import copy

import inspect
import threading
import time


class Upsonic_Remote:
def _log(self, message):
self.console.log(message)
if not self.quiet:
self.console.log(message)

def __enter__(self):
return self # pragma: no cover

def __exit__(self, exc_type, exc_val, exc_tb):
pass # pragma: no cover

def __init__(self, database_name, api_url, password=None, enable_hashing:bool=False, verify=True, locking=False, client_id=None, cache=False, cache_counter=None, version=False, client_version=False, key_encyption=False, meta_datas = True):
def __init__(self, database_name, api_url, password=None, enable_hashing:bool=False, verify=True, locking=False, client_id=None, cache=False, cache_counter=None, version=False, client_version=False, key_encyption=False, meta_datas = True, quiet=False, thread_number=1):
import requests
from requests.auth import HTTPBasicAuth

self.thread_number = thread_number

self.quiet = quiet

self.meta_datas = meta_datas

Expand Down Expand Up @@ -109,7 +115,9 @@ def active_module(self, module, encryption_key="a", compress=None):

classes = [obj for name, obj in inspect.getmembers(module)
if inspect.isclass(obj)]


threads = []

for element in functions + classes:
name = element.__module__ +"." + element.__name__
first_element = name.split(".")[0]
Expand All @@ -120,14 +128,29 @@ def active_module(self, module, encryption_key="a", compress=None):


try:
self.set(name, element, encryption_key=encryption_key, compress=compress)
while len(threads) >= self.thread_number:
for each in threads:
if not each.is_alive():
threads.remove(each)
time.sleep(0.1)

print("Thread_submitted", len(threads))
the_thread = threading.Thread(target=self.set, args=(name, element), kwargs={"encryption_key": encryption_key, "compress": compress})
the_thread.start()

thread = the_thread
threads.append(thread)
except:
import traceback
traceback.print_exc()
self._log(f"[bold red]Error on '{name}'")
self.delete(name)


for each in threads:
each.join()



def get_set_version_tag(self, client_id=None):
the_key = "set_version_number"
Expand Down Expand Up @@ -301,33 +324,33 @@ def lock_control(self, key):

def lock_key(self, key):
if self._lock_control(key, locking_operation=True):
self.console.log(f"[bold red] '{key}' is already locked")
self._log(f"[bold red] '{key}' is already locked")
return False

the_client_id = self.client_id
if the_client_id is None:
the_client_id = "Unknown"

if self.set(key+"_lock", the_client_id, locking_operation=True, encryption_key=None) == "Data set successfully":
self.console.log(f"[bold green] '{key}' is locked")
self._log(f"[bold green] '{key}' is locked")
return True
else:
return False

def unlock_key(self, key):
result = self._lock_control(key, locking_operation=True)
if not result:
self.console.log(f"[bold red] '{key}' is already unlocked")
self._log(f"[bold red] '{key}' is already unlocked")
return False

if self._lock_control(key):
self.console.log(f"[bold red] '{key}' is locked by another client")
self._log(f"[bold red] '{key}' is locked by another client")
return False



if self.delete(key+"_lock") == "Data deleted successfully":
self.console.log(f"[bold green] '{key}' is unlocked")
self._log(f"[bold green] '{key}' is unlocked")
return True
else:
return False
Expand All @@ -340,10 +363,11 @@ def _update_set(self, key, meta):




def set(self, key, value, encryption_key="a", compress=None, cache_policy=0, locking_operation=False, update_operation=False, version_tag=None, no_version=False):
if not locking_operation:
if self.lock_control(key):
self.console.log(f"[bold red] '{key}' is locked")
self._log(f"[bold red] '{key}' is locked")
return None

the_type = type(value).__name__
Expand Down Expand Up @@ -434,7 +458,7 @@ def get(self, key, encryption_key="a", no_cache=False, version_tag=None, no_vers
if the_hash != self._cache_hash[key] and the_hash is not None:
self._cache_hash[key] = the_hash
self.cache_hash_save()
self.console.log("Cache is updated")
self._log("Cache is updated")
try:
self.cache_pop(key)
except FileNotFoundError:
Expand Down
35 changes: 21 additions & 14 deletions upsonic/remote/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def decrypt(key, message):



def Upsonic_Cloud_Free(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None):
def Upsonic_Cloud_Free(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None):
if database_name == None:
database_name = os.environ.get("database_key")
if access_key == None:
Expand Down Expand Up @@ -84,14 +84,17 @@ def Upsonic_Cloud_Free(database_name=None, access_key=None, locking=None, client
meta_datas = os.environ.get("meta_datas", "true").lower() == "true"


if thread_number == None:
thread_number = int(os.environ.get("thread_number", "1"))


from upsonic import Upsonic_Remote
return Upsonic_Remote(
database_name, "https://cloud_1.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas
database_name, "https://cloud_1.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number
) # pragma: no cover


def Upsonic_Cloud_Pro(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None):
def Upsonic_Cloud_Pro(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None):
if database_name == None:
database_name = os.environ.get("database_key")
if access_key == None:
Expand Down Expand Up @@ -124,15 +127,16 @@ def Upsonic_Cloud_Pro(database_name=None, access_key=None, locking=None, client_
if meta_datas == None:
meta_datas = os.environ.get("meta_datas", "true").lower() == "true"


if thread_number == None:
thread_number = int(os.environ.get("thread_number", "1"))

from upsonic import Upsonic_Remote
return Upsonic_Remote(
database_name, "https://cloud_2.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas
database_name, "https://cloud_2.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number
) # pragma: no cover


def Upsonic_Cloud_Premium(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None):
def Upsonic_Cloud_Premium(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None):
if database_name == None:
database_name = os.environ.get("database_key")
if access_key == None:
Expand Down Expand Up @@ -165,14 +169,15 @@ def Upsonic_Cloud_Premium(database_name=None, access_key=None, locking=None, cli
if meta_datas == None:
meta_datas = os.environ.get("meta_datas", "true").lower() == "true"


if thread_number == None:
thread_number = int(os.environ.get("thread_number", "1"))

from upsonic import Upsonic_Remote
return Upsonic_Remote(
database_name, "https://cloud_3.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas
database_name, "https://cloud_3.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number
) # pragma: no cover

def Upsonic_Cloud_Startup(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None):
def Upsonic_Cloud_Startup(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None):
if database_name == None:
database_name = os.environ.get("database_key")
if access_key == None:
Expand Down Expand Up @@ -206,17 +211,18 @@ def Upsonic_Cloud_Startup(database_name=None, access_key=None, locking=None, cli
if meta_datas == None:
meta_datas = os.environ.get("meta_datas", "true").lower() == "true"


if thread_number == None:
thread_number = int(os.environ.get("thread_number", "1"))

from upsonic import Upsonic_Remote
return Upsonic_Remote(
database_name, "https://cloud_4.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas
database_name, "https://cloud_4.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number
) # pragma: no cover




def Upsonic_Cloud_Readonly(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None):
def Upsonic_Cloud_Readonly(database_name=None, access_key=None, locking=None, client_id=None, cache=None, cache_counter=5, version=None, client_version=None, key_encyption=None, meta_datas=None, quiet=False, thread_number=None):
if database_name == None:
database_name = os.environ.get("database_key")
if access_key == None:
Expand Down Expand Up @@ -250,11 +256,12 @@ def Upsonic_Cloud_Readonly(database_name=None, access_key=None, locking=None, cl
if meta_datas == None:
meta_datas = os.environ.get("meta_datas", "true").lower() == "true"


if thread_number == None:
thread_number = int(os.environ.get("thread_number", "1"))

from upsonic import Upsonic_Remote
return Upsonic_Remote(
database_name, "https://cloud_0.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas
database_name, "https://cloud_0.upsonic.co", access_key, verify=True, locking=locking, client_id=client_id, cache=cache, cache_counter=cache_counter, version=version, client_version=client_version, key_encyption=key_encyption, meta_datas=meta_datas, quiet=quiet, thread_number=thread_number
) # pragma: no cover


Expand Down

0 comments on commit 942300c

Please sign in to comment.