diff --git a/qiita_db/meta_util.py b/qiita_db/meta_util.py index 7c48b8493..c0df0fe89 100644 --- a/qiita_db/meta_util.py +++ b/qiita_db/meta_util.py @@ -36,6 +36,8 @@ from hashlib import md5 from re import sub from json import loads, dump, dumps +import signal +import traceback from qiita_db.util import create_nested_path, retrieve_resource_data from qiita_db.util import resource_allocation_plot @@ -555,23 +557,8 @@ def generate_plugin_releases(): f(redis_key, v) -def get_software_commands(active): - software_list = [s for s in qdb.software.Software.iter(active=active)] - software_commands = defaultdict(lambda: defaultdict(list)) - - for software in software_list: - sname = software.name - sversion = software.version - commands = software.commands - - for command in commands: - software_commands[sname][sversion].append(command.name) - software_commands[sname] = dict(software_commands[sname]) - - return dict(software_commands) - - -def update_resource_allocation_redis(active=True): +def update_resource_allocation_redis(active=True, verbose=False, + time_limit=300): """Updates redis with plots and information about current software. Parameters @@ -579,72 +566,176 @@ def update_resource_allocation_redis(active=True): active: boolean, optional Defaults to True. Should only be False when testing. + verbose: boolean, optional + Defaults to False. Prints status on what function is running. + + time_limit: integer, optional + Defaults to 300, representing 5 minutes. This is the limit for how long + resource_allocation_plot function will run. + """ time = datetime.now().strftime('%m-%d-%y') - scommands = get_software_commands(active) - redis_key = 'resources:commands' - r_client.set(redis_key, str(scommands)) + # Retreave available col_name for commands + with qdb.sql_connection.TRN: + sql = 'SELECT col_name FROM qiita.resource_allocation_column_names;' + qdb.sql_connection.TRN.add(sql) + col_names = qdb.sql_connection.TRN.execute_fetchflatten() + + # Retreave available software + software_list = list(qdb.software.Software.iter(active=active)) + scommands = {} + for software in software_list: + sname = software.name + sversion = software.version + + if sname not in scommands: + scommands[sname] = {} + + if sversion not in scommands[sname]: + scommands[sname][sversion] = {} + + for command in software.commands: + cmd_name = command.name + scommands[sname][sversion][cmd_name] = col_names + + # software commands for which resource allocations were sucessfully + # calculated + scommands_allocation = {} for sname, versions in scommands.items(): for version, commands in versions.items(): - for cname in commands: - col_name = "samples * columns" + for cname, col_names in commands.items(): df = retrieve_resource_data(cname, sname, version, COLUMNS) + if verbose: + print(("\nRetrieving allocation resources for:\n" + + f" software: {sname}\n" + + f" version: {version}\n" + + f" command: {cname}")) if len(df) == 0: + if verbose: + print(("\nNo allocation resources available for" + + f" software: {sname}" + + f" version: {version}" + + f" command: {cname}\n")) continue + # column_name_str looks like col1*col2*col3, etc + for col_name in col_names: + new_column = None + col_name_split = col_name.split('*') + df_copy = df.dropna(subset=col_name_split) + + # Create a column with the desired columns + for curr_column in col_name_split: + if new_column is None: + new_column = df_copy[curr_column] + else: + new_column *= df_copy[curr_column] + if verbose: + print( + ("\nBuilding resource allocation plot for:\n" + + f" software: {sname}\n" + + f" version: {version}\n" + + f" command: {cname}\n" + + f" column name: {col_name}\n" + + f" {datetime.now().strftime('%b %d %H:%M:%S')}")) + + def timeout_handler(signum, frame): + raise TimeoutError(( + "\nresource_allocation_plot " + + "execution exceeded time limit." + + "For:\n" + f" software: {sname}\n" + + f" version: {version}\n" + + f" command: {cname}\n" + + f" column name: {col_name}\n" + + f" {datetime.now().strftime('%b %d %H:%M:%S')}")) + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(time_limit) + try: + fig, axs = resource_allocation_plot(df_copy, + col_name, + new_column, + verbose=verbose) + signal.alarm(0) + except TimeoutError: + print("Timeout reached!") + traceback.print_exc() + continue + + titles = [0, 0] + images = [0, 0] + + # Splitting 1 image plot into 2 separate for better layout. + for i, ax in enumerate(axs): + titles[i] = ax.get_title() + ax.set_title("") + # new_fig, new_ax – copy with either only memory plot + # or only time + new_fig = plt.figure() + new_ax = new_fig.add_subplot(111) + line = ax.lines[0] + new_ax.plot(line.get_xdata(), line.get_ydata(), + linewidth=1, color='orange') + handles, labels = ax.get_legend_handles_labels() + for handle, label, scatter_data in zip( + handles, + labels, + ax.collections): + color = handle.get_facecolor() + new_ax.scatter(scatter_data.get_offsets()[:, 0], + scatter_data.get_offsets()[:, 1], + s=scatter_data.get_sizes(), + label=label, + color=color) + + new_ax.set_xscale('log') + new_ax.set_yscale('log') + new_ax.set_xlabel(ax.get_xlabel()) + new_ax.set_ylabel(ax.get_ylabel()) + new_ax.legend(loc='upper left') + + new_fig.tight_layout() + plot = BytesIO() + new_fig.savefig(plot, format='png') + plot.seek(0) + img = 'data:image/png;base64,' + quote( + b64encode(plot.getvalue()).decode('ascii')) + images[i] = img + plt.close(new_fig) + plt.close(fig) + + # SID, CID, col_name + values = [ + ("img_mem", images[0], r_client.set), + ("img_time", images[1], r_client.set), + ('time', time, r_client.set), + ("title_mem", titles[0], r_client.set), + ("title_time", titles[1], r_client.set) + ] + if verbose: + print( + ("Saving resource allocation image for\n" + + f" software: {sname}\n" + + f" version: {version}\n" + + f" command: {cname}\n" + + f" column name: {col_name}\n" + + f" {datetime.now().strftime('%b %d %H:%M:%S')}")) + + for k, v, f in values: + redis_key = 'resources$#%s$#%s$#%s$#%s:%s' % ( + cname, sname, version, col_name, k) + r_client.delete(redis_key) + f(redis_key, v) + + if sname not in scommands_allocation: + scommands_allocation[sname] = {} + if version not in scommands_allocation[sname]: + scommands_allocation[sname][version] = {} + if cname not in scommands_allocation[sname][version]: + scommands_allocation[sname][version][cname] = [] + scommands_allocation[sname][version][cname].append( + col_name) - fig, axs = resource_allocation_plot(df, col_name) - titles = [0, 0] - images = [0, 0] - - # Splitting 1 image plot into 2 separate for better layout. - for i, ax in enumerate(axs): - titles[i] = ax.get_title() - ax.set_title("") - # new_fig, new_ax – copy with either only memory plot or - # only time - new_fig = plt.figure() - new_ax = new_fig.add_subplot(111) - line = ax.lines[0] - new_ax.plot(line.get_xdata(), line.get_ydata(), - linewidth=1, color='orange') - handles, labels = ax.get_legend_handles_labels() - for handle, label, scatter_data in zip(handles, - labels, - ax.collections): - color = handle.get_facecolor() - new_ax.scatter(scatter_data.get_offsets()[:, 0], - scatter_data.get_offsets()[:, 1], - s=scatter_data.get_sizes(), label=label, - color=color) - - new_ax.set_xscale('log') - new_ax.set_yscale('log') - new_ax.set_xlabel(ax.get_xlabel()) - new_ax.set_ylabel(ax.get_ylabel()) - new_ax.legend(loc='upper left') - - new_fig.tight_layout() - plot = BytesIO() - new_fig.savefig(plot, format='png') - plot.seek(0) - img = 'data:image/png;base64,' + quote( - b64encode(plot.getvalue()).decode('ascii')) - images[i] = img - plt.close(new_fig) - plt.close(fig) - - # SID, CID, col_name - values = [ - ("img_mem", images[0], r_client.set), - ("img_time", images[1], r_client.set), - ('time', time, r_client.set), - ("title_mem", titles[0], r_client.set), - ("title_time", titles[1], r_client.set) - ] - - for k, v, f in values: - redis_key = 'resources$#%s$#%s$#%s$#%s:%s' % ( - cname, sname, version, col_name, k) - r_client.delete(redis_key) - f(redis_key, v) + redis_key = 'resources:commands' + r_client.set(redis_key, str(scommands_allocation)) diff --git a/qiita_db/support_files/patches/94.sql b/qiita_db/support_files/patches/94.sql index 3b565278b..1d063e835 100644 --- a/qiita_db/support_files/patches/94.sql +++ b/qiita_db/support_files/patches/94.sql @@ -1,7 +1,27 @@ -- Jan 13, 2025 -- Adding a table for formulas for resource allocations -CREATE TABLE qiita.allocation_equations ( - equation_id SERIAL PRIMARY KEY, - equation_name TEXT NOT NULL, - expression TEXT NOT NULL - ); \ No newline at end of file +CREATE TABLE qiita.resource_allocation_equations ( + equation_id SERIAL PRIMARY KEY, + equation_name TEXT NOT NULL, + expression TEXT NOT NULL + ); + +INSERT INTO qiita.resource_allocation_equations(equation_name, expression) VALUES + ('mem_model1', '(k * (np.log(x))) + (x * a) + b'), + ('mem_model2', '(k * (np.log(x))) + (b * ((np.log(x))**2)) + a'), + ('mem_model3', '(k * (np.log(x))) + (b * ((np.log(x))**2)) + (a * ((np.np.log(x))**3))'), + ('mem_model4', '(k * (np.log(x))) + (b * ((np.log(x))**2)) + (a * ((np.log(x))**2.5))'), + ('time_model1', 'a + b + ((np.log(x)) * k)'), + ('time_model2', 'a + (b * x) + ((np.log(x)) * k)'), + ('time_model3', 'a + (b * ((np.log(x))**2)) + ((np.log(x)) * k)'), + ('time_model4', '(a * ((np.log(x))**3)) + (b * ((np.log(x))**2)) + ((np.log(x)) * k)'); + +CREATE TABLE qiita.resource_allocation_column_names ( + col_name_id SERIAL PRIMARY KEY, + col_name TEXT NOT NULL + ); + +INSERT INTO qiita.resource_allocation_column_names(col_name) VALUES + ('samples'), ('columns'), ('input_size'), + ('samples*columns'), ('samples*input_size'), + ('columns*input_size'), ('samples*columns*input_size'); diff --git a/qiita_db/support_files/patches/test_db_sql/94.sql b/qiita_db/support_files/patches/test_db_sql/94.sql deleted file mode 100644 index 41ec7d8a7..000000000 --- a/qiita_db/support_files/patches/test_db_sql/94.sql +++ /dev/null @@ -1,10 +0,0 @@ -INSERT INTO qiita.allocation_equations(equation_name, expression) - VALUES - ('mem_model1', '(k * (np.log(x))) + (x * a) + b'), -('mem_model2', '(k * (np.log(x))) + (b * ((np.log(x))**2)) + a'), -('mem_model3', '(k * (np.log(x))) + (b * ((np.log(x))**2)) + (a * ((np.np.log(x))**3))'), -('mem_model4', '(k * (np.log(x))) + (b * ((np.log(x))**2)) + (a * ((np.log(x))**2.5))'), -('time_model1', 'a + b + ((np.log(x)) * k)'), -('time_model2', 'a + (b * x) + ((np.log(x)) * k)'), -('time_model3', 'a + (b * ((np.log(x))**2)) + ((np.log(x)) * k)'), -('time_model4', '(a * ((np.log(x))**3)) + (b * ((np.log(x))**2)) + ((np.log(x)) * k)'); diff --git a/qiita_db/test/test_meta_util.py b/qiita_db/test/test_meta_util.py index fdf55d101..7503d3c4b 100644 --- a/qiita_db/test/test_meta_util.py +++ b/qiita_db/test/test_meta_util.py @@ -522,7 +522,7 @@ def test_generate_plugin_releases(self): def test_update_resource_allocation_redis(self): cname = "Split libraries FASTQ" sname = "QIIMEq2" - col_name = "samples * columns" + col_name = "samples*columns" version = "1.9.1" qdb.meta_util.update_resource_allocation_redis(False) title_mem_str = 'resources$#%s$#%s$#%s$#%s:%s' % ( diff --git a/qiita_db/test/test_user.py b/qiita_db/test/test_user.py index 0d14d2c8a..20a90f3e9 100644 --- a/qiita_db/test/test_user.py +++ b/qiita_db/test/test_user.py @@ -482,7 +482,7 @@ def test_mark_messages(self): user.mark_messages([1, 2]) obs = user.messages() exp = [True, True, False] - self.assertEqual([x[3] for x in obs], exp) + self.assertCountEqual([x[3] for x in obs], exp) user.mark_messages([1], read=False) obs = user.messages() diff --git a/qiita_db/test/test_util.py b/qiita_db/test/test_util.py index e59be819f..0f330c567 100644 --- a/qiita_db/test/test_util.py +++ b/qiita_db/test/test_util.py @@ -1316,7 +1316,7 @@ def setUp(self): self.cname = "Split libraries FASTQ" self.sname = "QIIMEq2" self.version = "1.9.1" - self.col_name = 'samples * columns' + self.col_name = 'samples*columns' self.columns = [ "sName", "sVersion", "cID", "cName", "processing_job_id", "parameters", "samples", "columns", "input_size", "extra_info", @@ -1327,9 +1327,13 @@ def setUp(self): self.df = qdb.util.retrieve_resource_data( self.cname, self.sname, self.version, self.columns) + self.df.dropna(subset=['samples', 'columns'], inplace=True) + self.df[self.col_name] = self.df.samples * self.df['columns'] + def test_plot_return(self): # check the plot returns correct objects - fig1, axs1 = qdb.util.resource_allocation_plot(self.df, self.col_name) + fig1, axs1 = qdb.util.resource_allocation_plot(self.df, self.col_name, + self.df[self.col_name]) self.assertIsInstance( fig1, Figure, "Returned object fig1 is not a Matplotlib Figure") @@ -1339,13 +1343,10 @@ def test_plot_return(self): "Returned object axs1 is not a single Matplotlib Axes object") def test_minimize_const(self): - self.df = self.df[ - (self.df.cName == self.cname) & (self.df.sName == self.sname)] - self.df.dropna(subset=['samples', 'columns'], inplace=True) - self.df[self.col_name] = self.df.samples * self.df['columns'] + fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False) - mem_models, time_models = qdb.util.retrieve_equations() + mem_models, time_models = qdb.util._retrieve_equations() bm_name, bm, options = qdb.util._resource_allocation_plot_helper( self.df, axs[0], 'MaxRSSRaw', mem_models, self.col_name) # check that the algorithm chooses correct model for MaxRSSRaw and @@ -1420,7 +1421,6 @@ def test_db_update(self): '8a7a8461-e8a1-4b4e-a428-1bc2f4d3ebd0' ] } - qdb.util.update_resource_allocation_table(test=test_data) for curr_cname, ids in types.items(): diff --git a/qiita_db/util.py b/qiita_db/util.py index 33fd63da3..616388277 100644 --- a/qiita_db/util.py +++ b/qiita_db/util.py @@ -2326,46 +2326,51 @@ def send_email(to, subject, body): smtp.close() -def resource_allocation_plot(df, col_name): +def resource_allocation_plot(df, col_name_str, curr_column, verbose=False): """Builds resource allocation plot for given filename and jobs Parameters ---------- - file : str, required - Builds plot for the specified file name. Usually provided as tsv.gz - col_name: str, required - Specifies x axis for the graph - + df : pd.Dataframe, required + Builds plot for the specified dataframe. + col_name_str: str, required + Column name for the x axis that will be used to build the plots. + curr_column: pd.Series, requirew + Pandas Series representing a column with col_name_str. Returns ---------- matplotlib.pyplot object Returns a matplotlib object with a plot """ - df.dropna(subset=['samples', 'columns'], inplace=True) - df[col_name] = df.samples * df['columns'] - df[col_name] = df[col_name].astype(int) - fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False) - ax = axs[0] - mem_models, time_models = retrieve_equations() + + mem_models, time_models = _retrieve_equations() + df[col_name_str] = curr_column # models for memory + if verbose: + print("\tCalculating best model for memory") _resource_allocation_plot_helper( - df, ax, "MaxRSSRaw", mem_models, col_name) + df, ax, "MaxRSSRaw", mem_models, col_name_str, verbose=verbose) ax = axs[1] + # models for time + if verbose: + print("\tCalculating best model for time") _resource_allocation_plot_helper( - df, ax, "ElapsedRaw", time_models, col_name) - + df, ax, "ElapsedRaw", time_models, col_name_str, verbose=verbose) return fig, axs -def retrieve_equations(): +def _retrieve_equations(): ''' Helper function for resource_allocation_plot. Retrieves equations from db. Creates dictionary for memory and time models. + This function is needed because it utilizes np as a part of eval() below. + In test_util.py we need to retrieve equations without importing np to + comply with PEP8 styling standard. Returns ------- @@ -2379,19 +2384,20 @@ def retrieve_equations(): time_models = {} res = [] with qdb.sql_connection.TRN: - sql = ''' SELECT * FROM qiita.allocation_equations; ''' + sql = '''SELECT equation_name, expression + FROM qiita.resource_allocation_equations;''' qdb.sql_connection.TRN.add(sql) res = qdb.sql_connection.TRN.execute_fetchindex() for models in res: - if 'mem' in models[1]: - memory_models[models[1]] = { - "equation_name": models[2], - "equation": lambda x, k, a, b: eval(models[2]) + if 'mem' in models[0]: + memory_models[models[0]] = { + "equation_name": models[1], + "equation": lambda x, k, a, b: eval(models[1]) } else: - time_models[models[1]] = { - "equation_name": models[2], - "equation": lambda x, k, a, b: eval(models[2]) + time_models[models[0]] = { + "equation_name": models[1], + "equation": lambda x, k, a, b: eval(models[1]) } return (memory_models, time_models) @@ -2452,7 +2458,7 @@ def retrieve_resource_data(cname, sname, version, columns): def _resource_allocation_plot_helper( - df, ax, curr, models, col_name): + df, ax, curr, models, col_name, verbose=False): """Helper function for resource allocation plot. Builds plot for MaxRSSRaw and ElapsedRaw @@ -2517,9 +2523,14 @@ def _resource_allocation_plot_helper( ax.set_ylabel(curr) ax.set_xlabel(col_name) + if verbose: + print(f"\t\tFitting best model for {curr}; column {col_name}") # 50 - number of maximum iterations, 3 - number of failures we tolerate best_model_name, best_model, options = _resource_allocation_calculate( - df, x_data, y_data, models, curr, col_name, 50, 3) + df, x_data, y_data, models, curr, col_name, 50, 3, verbose) + if verbose: + print( + f"\t\tSuccessfully chose best model for {curr}; column {col_name}") k, a, b = options.x x_plot = np.array(sorted(df[col_name].unique())) y_plot = best_model(x_plot, k, a, b) @@ -2542,11 +2553,14 @@ def _resource_allocation_plot_helper( success_df, failures_df = _resource_allocation_success_failures( df, k, a, b, best_model, col_name, curr) failures = failures_df.shape[0] - ax.scatter(failures_df[col_name], failures_df[curr], color='red', s=3, - label="failures") - success_df['node_name'] = success_df['node_name'].fillna('unknown') + if failures != 0: + ax.scatter(failures_df[col_name], failures_df[curr], color='red', s=3, + label="failures") + + success_df['node_name'].fillna('unknown', inplace=True) + slurm_hosts = set(success_df['node_name'].tolist()) - cmap = colormaps.get_cmap('Accent') + cmap = colormaps['Accent'] if len(slurm_hosts) > len(cmap.colors): raise ValueError(f"""'Accent' colormap only has {len(cmap.colors)} colors, but {len(slurm_hosts)} hosts are provided.""") @@ -2567,7 +2581,7 @@ def _resource_allocation_plot_helper( def _resource_allocation_calculate( - df, x, y, models, type_, col_name, depth, tolerance): + df, x, y, models, type_, col_name, depth, tolerance, verbose): """Helper function for resource allocation plot. Calculates best_model and best_result given the models list and x,y data. @@ -2608,6 +2622,11 @@ def _resource_allocation_calculate( best_failures = np.inf best_max = np.inf for model_name, model in models.items(): + if verbose: + print( + f"\t\t\tCalculating {model_name} for {type_}; " + f"{col_name} {datetime.now().strftime('%b %d %H:%M:%S')}" + ) model_equation = model['equation'] # start values for binary search, where sl is left, sr is right # penalty weight must be positive & non-zero, hence, sl >= 1. @@ -2628,7 +2647,7 @@ def _resource_allocation_calculate( options = minimize(_resource_allocation_custom_loss, init, args=(x, y, model_equation, middle)) k, a, b = options.x - # important: here we take the 2nd (last) value of tuple since + # IMPORTANT: here we take the 2nd (last) value of tuple since # the helper function returns success, then failures. failures_df = _resource_allocation_success_failures( df, k, a, b, model_equation, col_name, type_)[-1] @@ -2682,6 +2701,8 @@ def _resource_allocation_calculate( best_model_name = model_name best_model = model_equation best_result = res + elif best_result is None: + best_result = res return best_model_name, best_model, best_result @@ -2748,9 +2769,9 @@ def _resource_allocation_success_failures(df, k, a, b, model, col_name, type_): """ x_plot = np.array(df[col_name]) - df[f'c{type_}'] = model(x_plot, k, a, b) - success_df = df[df[type_] <= df[f'c{type_}']] - failures_df = df[df[type_] > df[f'c{type_}']] + y_plot = model(x_plot, k, a, b) + success_df = df[df[type_] <= y_plot].copy() + failures_df = df[df[type_] > y_plot].copy() return (success_df, failures_df) diff --git a/qiita_pet/handlers/resources.py b/qiita_pet/handlers/resources.py index 0b5873997..62d9e338f 100644 --- a/qiita_pet/handlers/resources.py +++ b/qiita_pet/handlers/resources.py @@ -6,8 +6,7 @@ from qiita_core.qiita_settings import r_client from qiita_core.util import execute_as_transaction -commands = 'resources:commands' -default_col_name = "samples * columns" +COMMANDS = 'resources:commands' class ResourcesHandler(BaseHandler): @@ -34,7 +33,7 @@ def _get_resources(self, cname, sname, version, col_name, callback): @execute_as_transaction def _get_commands(self, callback): - res = r_client.get(commands) + res = r_client.get(COMMANDS) callback(res) @authenticated @@ -47,7 +46,6 @@ def get(self): commands_str = commands.decode('utf-8') commands_dict = ast.literal_eval(commands_str) commands_json = json.dumps(commands_dict) - self.render('resources.html', img_mem=None, img_time=None, time=None, @@ -69,9 +67,10 @@ def post(self): software = data.get('software') version = data.get('version') command = data.get('command') + col_name = data.get('col_name') resources = yield Task(self._get_resources, command, software, - version, default_col_name) + version, col_name) mcof, mmodel, mreal, mcalc, mfail = list( map(lambda x: x.split(b": ")[1].strip().decode('utf-8'), diff --git a/qiita_pet/templates/resources.html b/qiita_pet/templates/resources.html index 988880e8c..85b2e0ac9 100644 --- a/qiita_pet/templates/resources.html +++ b/qiita_pet/templates/resources.html @@ -35,6 +35,14 @@