From 8edfc702a8927f343efbd06eb76f688d752ddcab Mon Sep 17 00:00:00 2001 From: freakeinstein Date: Mon, 17 Jan 2022 14:25:51 +0530 Subject: [PATCH] added disk persist --- src/index.py | 6 +++++ src/manager.py | 69 +++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/src/index.py b/src/index.py index de4b6e6..d542c3a 100644 --- a/src/index.py +++ b/src/index.py @@ -137,6 +137,12 @@ async def compress_data (): @app.before_serving async def init_variables(): app.manager = man_.Manager() + # prepare HUB from backup + try: + app.manager.prepare_hub() + except Exception as e: + logging.error("Backup restore failed") + logging.error(e) # initialize background task controller app.manager.bg_task_active = True diff --git a/src/manager.py b/src/manager.py index 547abd3..c225441 100644 --- a/src/manager.py +++ b/src/manager.py @@ -3,6 +3,9 @@ logging.getLogger().setLevel(logging.DEBUG) import asyncio +import os +import json +import copy from utils import CID, schema @@ -39,19 +42,67 @@ def get_encoder_name (schema_in): return encoder_name class Manager (): - def __init__(self): + def __init__ (self): # to track all database - encoder mappings self.db_to_encoders_map = {} self.encoders_to_obj_map = {} - def __del__(self): + def __del__ (self): logging.debug("Killed manager") - def preload_model (self, json_schema): + def persist_db (self, json_schema, database_name, persist=True): + # do nothing + if not persist: + logging.debug("Skipping schema persist to Disk") + return database_name + + # create backup directory + location = os.environ["DATA_STORE_LOCATION"]+"hub_backup/" + try: + if not os.path.exists(location): + # create backup directory + os.mkdir(location) + except Exception as e: + logging.error("Backup directory creation failed") + logging.error(e) + return None + + # store schema, replacing the old one + try: + with open(location+database_name, "w") as f_: + json.dump(json_schema, f_) + except Exception as e: + logging.error("Schema JSON writing failed") + logging.error(e) + return None + + return database_name + + # initial preperation of Aquila HUB from backup + def prepare_hub (self): + # load schema files + location = os.environ["DATA_STORE_LOCATION"]+"hub_backup/" + for database_name in os.listdir(location): + with open(location+database_name, "r") as schema__: + schema_ = json.load(schema__) + database_name_ = self.preload_model(schema_, persist=False) + if database_name_ == None: + logging.debug("Model preloading failed for database: "+database_name) + elif database_name_ == database_name: + logging.debug("Model preloaded for database: "+database_name) + else: + logging.debug("Model missmatch for database "+database_name+", schema backup is modified. Hope you know what you're doing.") + + def preload_model (self, json_schema, persist=True): """ Download a model and load it into memory """ + # create a schema copy + schema_copy = copy.deepcopy(json_schema) # parse schema + # NOTE: get_database_name makes modifications to "json_schema" variable + # so, beware of it's subsequent usage. If you want to use the original schema, + # use "schema_copy" variable instead database_name = get_database_name(json_schema) encoder_name = get_encoder_name(json_schema) @@ -65,16 +116,16 @@ def preload_model (self, json_schema): self.encoders_to_obj_map[encoder_name] = enc_.Encoder(encoder_name, asyncio.Queue()) # encoder already loaded? if self.encoders_to_obj_map[encoder_name].preload_model(json_schema, database_name): - return database_name + return self.persist_db(schema_copy, database_name, persist) else: # reset all, don't create DB & encoder self.encoders_to_obj_map[encoder_name] = None self.db_to_encoders_map[database_name] = None return None else: - return database_name + return self.persist_db(schema_copy, database_name, persist) else: - return database_name + return self.persist_db(schema_copy, database_name, persist) else: return None @@ -104,10 +155,6 @@ async def compress_data (self, database_name, texts): else: return None - # write to disk - def write_to_disk (self): - pass - # define background task to process request queue for each database object async def background_task(self): logging.debug("===== Background task INIT =====") @@ -119,7 +166,5 @@ async def background_task(self): continue # process request await self.encoders_to_obj_map[key_].process_queue() - # Write to Disk # TODO: write to disk all metadata for each database object serially - self.write_to_disk() await asyncio.sleep(SLEEP_PROTECTION)