From 13385a54337f09c7a38ce8af1a903f00b9482150 Mon Sep 17 00:00:00 2001 From: nickotto Date: Thu, 21 Mar 2024 13:08:18 -0700 Subject: [PATCH] new column "Eval Error" and dtype adjustments --- tpot2/evolvers/base_evolver.py | 21 ++++++++++--------- tpot2/evolvers/steady_state_evolver.py | 29 +++++++++++++++++++------- tpot2/population.py | 16 +++++++++++--- tpot2/utils/eval_utils.py | 28 +++++++++++++++++++------ 4 files changed, 67 insertions(+), 27 deletions(-) diff --git a/tpot2/evolvers/base_evolver.py b/tpot2/evolvers/base_evolver.py index 9959f9ab..83623754 100644 --- a/tpot2/evolvers/base_evolver.py +++ b/tpot2/evolvers/base_evolver.py @@ -483,9 +483,10 @@ def optimize(self, generations=None): except KeyboardInterrupt: if self.verbose >= 3: print("KeyboardInterrupt") - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="INVALID") self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") @@ -623,8 +624,7 @@ def evaluate_population_full(self, budget=None): parallel_timeout = 10 #scores = tpot2.utils.eval_utils.parallel_eval_objective_list(individuals_to_evaluate, self.objective_functions, self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, parallel_timeout=parallel_timeout, **self.objective_kwargs) - scores, start_times, end_times = tpot2.utils.eval_utils.parallel_eval_objective_list2(individuals_to_evaluate, self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, **self.objective_kwargs) - + scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(individuals_to_evaluate, self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, **self.objective_kwargs) self.population.update_column(individuals_to_evaluate, column_names=self.objective_names, data=scores) if budget is not None: @@ -632,8 +632,9 @@ def evaluate_population_full(self, budget=None): self.population.update_column(individuals_to_evaluate, column_names="Submitted Timestamp", data=start_times) self.population.update_column(individuals_to_evaluate, column_names="Completed Timestamp", data=end_times) - self.population.remove_invalid_from_population(column_names=self.objective_names) - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") + self.population.update_column(individuals_to_evaluate, column_names="Eval Error", data=eval_errors) + self.population.remove_invalid_from_population(column_names="Eval Error") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") def get_unevaluated_individuals(self, column_names, budget=None, individual_list=None): if individual_list is not None: @@ -695,7 +696,7 @@ def evaluate_population_selection_early_stop(self,survival_counts, thresholds=No if parallel_timeout < 0: parallel_timeout = 10 - scores, start_times, end_times = tpot2.utils.eval_utils.parallel_eval_objective_list2(individual_list=unevaluated_individuals_this_step, + scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(individual_list=unevaluated_individuals_this_step, objective_list=self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, @@ -706,14 +707,14 @@ def evaluate_population_selection_early_stop(self,survival_counts, thresholds=No client=self._client, **self.objective_kwargs, ) - + self.population.update_column(unevaluated_individuals_this_step, column_names=this_step_names, data=scores) self.population.update_column(unevaluated_individuals_this_step, column_names="Submitted Timestamp", data=start_times) self.population.update_column(unevaluated_individuals_this_step, column_names="Completed Timestamp", data=end_times) + self.population.update_column(unevaluated_individuals_this_step, column_names="Eval Error", data=eval_errors) - - self.population.remove_invalid_from_population(column_names=this_step_names) - self.population.remove_invalid_from_population(column_names=this_step_names, invalid_value="TIMEOUT") + self.population.remove_invalid_from_population(column_names="Eval Error") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") #remove invalids: invalids = [] diff --git a/tpot2/evolvers/steady_state_evolver.py b/tpot2/evolvers/steady_state_evolver.py index a45e4059..cec3b800 100644 --- a/tpot2/evolvers/steady_state_evolver.py +++ b/tpot2/evolvers/steady_state_evolver.py @@ -257,20 +257,31 @@ def optimize(self): #Loop through all futures, collect completed and timeout futures. for completed_future in list(submitted_futures.keys()): - + eval_error = None #get scores and update if completed_future.done(): #if future is done #If the future is done but threw and error, record the error if completed_future.exception() or completed_future.status == "error": #if the future is done and threw an error print("Exception in future") print(completed_future.exception()) - scores = ["INVALID" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "INVALID" elif completed_future.cancelled(): #if the future is done and was cancelled print("Cancelled future (likely memory related)") - scores = ["INVALID" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "INVALID" else: #if the future is done and did not throw an error, get the scores try: scores = completed_future.result() + + #check if scores contain "INVALID" or "TIMEOUT" + if "INVALID" in scores: + eval_error = "INVALID" + scores = [np.nan] + elif "TIMEOUT" in scores: + eval_error = "TIMEOUT" + scores = [np.nan] + except Exception as e: print("Exception in future, but not caught by dask") print(e) @@ -279,7 +290,8 @@ def optimize(self): print("status", completed_future.status) print("done", completed_future.done()) print("cancelld ", completed_future.cancelled()) - scores = ["INVALID" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "INVALID" else: #if future is not done #check if the future has been running for too long, cancel the future @@ -289,7 +301,8 @@ def optimize(self): if self.verbose >= 4: print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n') - scores = ["TIMEOUT" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "TIMEOUT" else: continue #otherwise, continue to next future @@ -304,6 +317,7 @@ def optimize(self): scores = [scores[0] for _ in range(len(self.objective_names))] self.population.update_column(this_individual, column_names=self.objective_names, data=scores) self.population.update_column(this_individual, column_names="Completed Timestamp", data=time.time()) + self.population.update_column(this_individual, column_names="Eval Error", data=eval_error) if budget is not None: self.population.update_column(this_individual, column_names="Budget", data=this_budget) @@ -314,9 +328,8 @@ def optimize(self): #now we have a list of completed futures - - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="INVALID") - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") ############################### diff --git a/tpot2/population.py b/tpot2/population.py index a3a0c54c..4e842eb9 100644 --- a/tpot2/population.py +++ b/tpot2/population.py @@ -85,6 +85,7 @@ def __init__( self, column_names = ["Parents", "Variation_Function"] self.evaluated_individuals = pd.DataFrame(columns=column_names) self.evaluated_individuals["Parents"] = self.evaluated_individuals["Parents"].astype('object') + self.use_unique_id = True #Todo clean this up. perhaps pull unique_id() out of baseestimator and have it be supplied as a function self.n_jobs = n_jobs self.callback=callback @@ -125,7 +126,6 @@ def remove_invalid_from_population(self, column_names, invalid_value = "INVALID" ''' if isinstance(column_names, str): #TODO check this column_names = [column_names] - new_pop = [] is_valid = lambda ind: ind.unique_id() not in self.evaluated_individuals.index or invalid_value not in self.evaluated_individuals.loc[ind.unique_id(),column_names].to_list() self.population = [ind for ind in self.population if is_valid(ind)] @@ -301,13 +301,15 @@ def create_offspring(self, parents_list, var_op_list, rng_=None, add_to_populati parent_keys = [parent.unique_id() for parent in parents] if not pd.api.types.is_object_dtype(self.evaluated_individuals["Parents"]): #TODO Is there a cleaner way of doing this? Not required for some python environments? self.evaluated_individuals["Parents"] = self.evaluated_individuals["Parents"].astype('object') + if not pd.api.types.is_object_dtype(self.evaluated_individuals["Variation_Function"]):#TODO Is there a cleaner way of doing this? Not required for some python environments? + self.evaluated_individuals["Variation_Function"] = self.evaluated_individuals["Variation_Function"].astype('object') self.evaluated_individuals.at[new_child.unique_id(),"Parents"] = tuple(parent_keys) #if var_op is a function if hasattr(var_op, '__call__'): self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op.__name__ else: - self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op + self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = str(var_op) new_offspring.append(new_child) @@ -377,8 +379,16 @@ def create_offspring2(self, parents_list, var_op_list, mutation_functions,mutati if not pd.api.types.is_object_dtype(self.evaluated_individuals["Parents"]): #TODO Is there a cleaner way of doing this? Not required for some python environments? self.evaluated_individuals["Parents"] = self.evaluated_individuals["Parents"].astype('object') self.evaluated_individuals.at[new_child.unique_id(),"Parents"] = tuple(parent_keys) + + #check if Variation_Function variable is an object type + if not pd.api.types.is_object_dtype(self.evaluated_individuals["Variation_Function"]): #TODO Is there a cleaner way of doing this? Not required for some python environments? + self.evaluated_individuals["Variation_Function"] = self.evaluated_individuals["Variation_Function"].astype('object') - self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op + #if var_op is a function + if hasattr(var_op, '__call__'): + self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op.__name__ + else: + self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = str(var_op) new_offspring.append(new_child) diff --git a/tpot2/utils/eval_utils.py b/tpot2/utils/eval_utils.py index d3fe68bc..8d57cbac 100644 --- a/tpot2/utils/eval_utils.py +++ b/tpot2/utils/eval_utils.py @@ -152,7 +152,7 @@ def parallel_eval_objective_list2(individual_list, submitted_futures = {} scores_dict = {} submitted_inds = set() - + eval_error = None while len(submitted_futures) < max_queue_size and len(individual_stack)>0: individual = individual_stack.pop() future = client.submit(eval_objective_list, individual, objective_list, verbose=verbose, timeout=max_eval_time_seconds,**objective_kwargs) @@ -181,13 +181,25 @@ def parallel_eval_objective_list2(individual_list, if completed_future.exception() or completed_future.status == "error": #if the future is done and threw an error print("Exception in future") print(completed_future.exception()) - scores = ["INVALID"] + scores = [np.nan] + eval_error = "INVALID" elif completed_future.cancelled(): #if the future is done and was cancelled print("Cancelled future (likely memory related)") - scores = ["INVALID"] + scores = [np.nan] + eval_error = "INVALID" else: #if the future is done and did not throw an error, get the scores try: scores = completed_future.result() + #check if scores contain "INVALID" or "TIMEOUT" + if "INVALID" in scores: + eval_error = "INVALID" + scores = [np.nan] + elif "TIMEOUT" in scores: + eval_error = "TIMEOUT" + scores = [np.nan] + else: + eval_error = None + except Exception as e: print("Exception in future, but not caught by dask") print(e) @@ -196,7 +208,8 @@ def parallel_eval_objective_list2(individual_list, print("status", completed_future.status) print("done", completed_future.done()) print("cancelld ", completed_future.cancelled()) - scores = ["INVALID"] + scores = [np.nan] + eval_error = "INVALID" else: #if future is not done #check if the future has been running for too long, cancel the future @@ -206,7 +219,8 @@ def parallel_eval_objective_list2(individual_list, if verbose >= 4: print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n') - scores = ["TIMEOUT"] + scores = [np.nan] + eval_error = "TIMEOUT" else: continue #otherwise, continue to next future @@ -215,6 +229,7 @@ def parallel_eval_objective_list2(individual_list, scores_dict[cur_individual] = {"scores": scores, "start_time": submitted_futures[completed_future]["time"], "end_time": time.time(), + "eval_error": eval_error, } @@ -235,10 +250,11 @@ def parallel_eval_objective_list2(individual_list, final_scores = [scores_dict[individual]["scores"] for individual in individual_list] final_start_times = [scores_dict[individual]["start_time"] for individual in individual_list] final_end_times = [scores_dict[individual]["end_time"] for individual in individual_list] + final_eval_errors = [scores_dict[individual]["eval_error"] for individual in individual_list] final_scores = process_scores(final_scores, n_expected_columns) - return final_scores, final_start_times, final_end_times + return final_scores, final_start_times, final_end_times, final_eval_errors ###################