Skip to content

Commit

Permalink
added disk persist
Browse files Browse the repository at this point in the history
  • Loading branch information
freakeinstein committed Jan 17, 2022
1 parent ed9563d commit 8edfc70
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 12 deletions.
6 changes: 6 additions & 0 deletions src/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ async def compress_data ():
@app.before_serving
async def init_variables():
app.manager = man_.Manager()
# prepare HUB from backup
try:
app.manager.prepare_hub()
except Exception as e:
logging.error("Backup restore failed")
logging.error(e)
# initialize background task controller
app.manager.bg_task_active = True

Expand Down
69 changes: 57 additions & 12 deletions src/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
logging.getLogger().setLevel(logging.DEBUG)

import asyncio
import os
import json
import copy

from utils import CID, schema

Expand Down Expand Up @@ -39,19 +42,67 @@ def get_encoder_name (schema_in):
return encoder_name

class Manager ():
def __init__(self):
def __init__ (self):
# to track all database - encoder mappings
self.db_to_encoders_map = {}
self.encoders_to_obj_map = {}

def __del__(self):
def __del__ (self):
logging.debug("Killed manager")

def preload_model (self, json_schema):
def persist_db (self, json_schema, database_name, persist=True):
# do nothing
if not persist:
logging.debug("Skipping schema persist to Disk")
return database_name

# create backup directory
location = os.environ["DATA_STORE_LOCATION"]+"hub_backup/"
try:
if not os.path.exists(location):
# create backup directory
os.mkdir(location)
except Exception as e:
logging.error("Backup directory creation failed")
logging.error(e)
return None

# store schema, replacing the old one
try:
with open(location+database_name, "w") as f_:
json.dump(json_schema, f_)
except Exception as e:
logging.error("Schema JSON writing failed")
logging.error(e)
return None

return database_name

# initial preperation of Aquila HUB from backup
def prepare_hub (self):
# load schema files
location = os.environ["DATA_STORE_LOCATION"]+"hub_backup/"
for database_name in os.listdir(location):
with open(location+database_name, "r") as schema__:
schema_ = json.load(schema__)
database_name_ = self.preload_model(schema_, persist=False)
if database_name_ == None:
logging.debug("Model preloading failed for database: "+database_name)
elif database_name_ == database_name:
logging.debug("Model preloaded for database: "+database_name)
else:
logging.debug("Model missmatch for database "+database_name+", schema backup is modified. Hope you know what you're doing.")

def preload_model (self, json_schema, persist=True):
"""
Download a model and load it into memory
"""
# create a schema copy
schema_copy = copy.deepcopy(json_schema)
# parse schema
# NOTE: get_database_name makes modifications to "json_schema" variable
# so, beware of it's subsequent usage. If you want to use the original schema,
# use "schema_copy" variable instead
database_name = get_database_name(json_schema)
encoder_name = get_encoder_name(json_schema)

Expand All @@ -65,16 +116,16 @@ def preload_model (self, json_schema):
self.encoders_to_obj_map[encoder_name] = enc_.Encoder(encoder_name, asyncio.Queue())
# encoder already loaded?
if self.encoders_to_obj_map[encoder_name].preload_model(json_schema, database_name):
return database_name
return self.persist_db(schema_copy, database_name, persist)
else:
# reset all, don't create DB & encoder
self.encoders_to_obj_map[encoder_name] = None
self.db_to_encoders_map[database_name] = None
return None
else:
return database_name
return self.persist_db(schema_copy, database_name, persist)
else:
return database_name
return self.persist_db(schema_copy, database_name, persist)
else:
return None

Expand Down Expand Up @@ -104,10 +155,6 @@ async def compress_data (self, database_name, texts):
else:
return None

# write to disk
def write_to_disk (self):
pass

# define background task to process request queue for each database object
async def background_task(self):
logging.debug("===== Background task INIT =====")
Expand All @@ -119,7 +166,5 @@ async def background_task(self):
continue
# process request
await self.encoders_to_obj_map[key_].process_queue()
# Write to Disk # TODO: write to disk all metadata for each database object serially
self.write_to_disk()

await asyncio.sleep(SLEEP_PROTECTION)

0 comments on commit 8edfc70

Please sign in to comment.