diff --git a/setup.py b/setup.py index 19f0f322..545055c9 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ def calculate_version(): 'update_checker>=0.16', 'tqdm>=4.36.1', 'stopit>=1.1.1', - 'pandas>=1.5.3,<2.0.0', + 'pandas>=2.2.0', 'joblib>=1.1.1', 'xgboost>=1.7.0', 'matplotlib>=3.6.2', diff --git a/tpot2/evolvers/base_evolver.py b/tpot2/evolvers/base_evolver.py index 9959f9ab..83623754 100644 --- a/tpot2/evolvers/base_evolver.py +++ b/tpot2/evolvers/base_evolver.py @@ -483,9 +483,10 @@ def optimize(self, generations=None): except KeyboardInterrupt: if self.verbose >= 3: print("KeyboardInterrupt") - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="INVALID") self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") @@ -623,8 +624,7 @@ def evaluate_population_full(self, budget=None): parallel_timeout = 10 #scores = tpot2.utils.eval_utils.parallel_eval_objective_list(individuals_to_evaluate, self.objective_functions, self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, parallel_timeout=parallel_timeout, **self.objective_kwargs) - scores, start_times, end_times = tpot2.utils.eval_utils.parallel_eval_objective_list2(individuals_to_evaluate, self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, **self.objective_kwargs) - + scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(individuals_to_evaluate, self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, **self.objective_kwargs) self.population.update_column(individuals_to_evaluate, column_names=self.objective_names, data=scores) if budget is not None: @@ -632,8 +632,9 @@ def evaluate_population_full(self, budget=None): self.population.update_column(individuals_to_evaluate, column_names="Submitted Timestamp", data=start_times) self.population.update_column(individuals_to_evaluate, column_names="Completed Timestamp", data=end_times) - self.population.remove_invalid_from_population(column_names=self.objective_names) - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") + self.population.update_column(individuals_to_evaluate, column_names="Eval Error", data=eval_errors) + self.population.remove_invalid_from_population(column_names="Eval Error") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") def get_unevaluated_individuals(self, column_names, budget=None, individual_list=None): if individual_list is not None: @@ -695,7 +696,7 @@ def evaluate_population_selection_early_stop(self,survival_counts, thresholds=No if parallel_timeout < 0: parallel_timeout = 10 - scores, start_times, end_times = tpot2.utils.eval_utils.parallel_eval_objective_list2(individual_list=unevaluated_individuals_this_step, + scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(individual_list=unevaluated_individuals_this_step, objective_list=self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, @@ -706,14 +707,14 @@ def evaluate_population_selection_early_stop(self,survival_counts, thresholds=No client=self._client, **self.objective_kwargs, ) - + self.population.update_column(unevaluated_individuals_this_step, column_names=this_step_names, data=scores) self.population.update_column(unevaluated_individuals_this_step, column_names="Submitted Timestamp", data=start_times) self.population.update_column(unevaluated_individuals_this_step, column_names="Completed Timestamp", data=end_times) + self.population.update_column(unevaluated_individuals_this_step, column_names="Eval Error", data=eval_errors) - - self.population.remove_invalid_from_population(column_names=this_step_names) - self.population.remove_invalid_from_population(column_names=this_step_names, invalid_value="TIMEOUT") + self.population.remove_invalid_from_population(column_names="Eval Error") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") #remove invalids: invalids = [] diff --git a/tpot2/evolvers/steady_state_evolver.py b/tpot2/evolvers/steady_state_evolver.py index a45e4059..952efa82 100644 --- a/tpot2/evolvers/steady_state_evolver.py +++ b/tpot2/evolvers/steady_state_evolver.py @@ -22,6 +22,15 @@ import dask import warnings + +def ind_mutate(ind, rng_): + rng = np.random.default_rng(rng_) + return ind.mutate(rng_=rng) + +def ind_crossover(ind1, ind2, rng_): + rng = np.random.default_rng(rng_) + return ind1.crossover(ind2, rng_=rng) + class SteadyStateEvolver(): def __init__( self, individual_generator , @@ -241,6 +250,8 @@ def optimize(self): done = False start_time = time.time() + + enough_parents_evaluated=False while not done: ############################### @@ -257,20 +268,31 @@ def optimize(self): #Loop through all futures, collect completed and timeout futures. for completed_future in list(submitted_futures.keys()): - + eval_error = None #get scores and update if completed_future.done(): #if future is done #If the future is done but threw and error, record the error if completed_future.exception() or completed_future.status == "error": #if the future is done and threw an error print("Exception in future") print(completed_future.exception()) - scores = ["INVALID" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "INVALID" elif completed_future.cancelled(): #if the future is done and was cancelled print("Cancelled future (likely memory related)") - scores = ["INVALID" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "INVALID" else: #if the future is done and did not throw an error, get the scores try: scores = completed_future.result() + + #check if scores contain "INVALID" or "TIMEOUT" + if "INVALID" in scores: + eval_error = "INVALID" + scores = [np.nan] + elif "TIMEOUT" in scores: + eval_error = "TIMEOUT" + scores = [np.nan] + except Exception as e: print("Exception in future, but not caught by dask") print(e) @@ -279,7 +301,8 @@ def optimize(self): print("status", completed_future.status) print("done", completed_future.done()) print("cancelld ", completed_future.cancelled()) - scores = ["INVALID" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "INVALID" else: #if future is not done #check if the future has been running for too long, cancel the future @@ -289,7 +312,8 @@ def optimize(self): if self.verbose >= 4: print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n') - scores = ["TIMEOUT" for _ in range(len(self.objective_names))] + scores = [np.nan for _ in range(len(self.objective_names))] + eval_error = "TIMEOUT" else: continue #otherwise, continue to next future @@ -304,6 +328,7 @@ def optimize(self): scores = [scores[0] for _ in range(len(self.objective_names))] self.population.update_column(this_individual, column_names=self.objective_names, data=scores) self.population.update_column(this_individual, column_names="Completed Timestamp", data=time.time()) + self.population.update_column(this_individual, column_names="Eval Error", data=eval_error) if budget is not None: self.population.update_column(this_individual, column_names="Budget", data=this_budget) @@ -314,9 +339,8 @@ def optimize(self): #now we have a list of completed futures - - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="INVALID") - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") ############################### @@ -429,33 +453,56 @@ def optimize(self): ############################### n_individuals_to_submit = self.max_queue_size - len(submitted_futures) if n_individuals_to_submit > 0: - parents_df = self.population.get_column(self.population.population, column_names=self.objective_names+ ["Individual"], to_numpy=False) - parents_df = parents_df[~parents_df[self.objective_names].isin(["TIMEOUT","INVALID"]).any(axis=1)] - parents_df = parents_df[~parents_df[self.objective_names].isna().any(axis=1)] - - cur_evaluated_population = parents_df["Individual"].to_numpy() - if len(cur_evaluated_population) > 0: - scores = parents_df[self.objective_names].to_numpy() - weighted_scores = scores * self.objective_function_weights - #number of crossover pairs and mutation only parent to generate - - if len(parents_df) < 2: - var_ops = ["mutate" for _ in range(n_individuals_to_submit)] - else: - var_ops = [self.rng.choice(["crossover","mutate_then_crossover","crossover_then_mutate",'mutate'],p=[self.crossover_probability,self.mutate_then_crossover_probability, self.crossover_then_mutate_probability,self.mutate_probability]) for _ in range(n_individuals_to_submit)] - - parents = [] - for op in var_ops: + #count non-nan values in the objective columns + if not enough_parents_evaluated: + parents_df = self.population.get_column(self.population.population, column_names=self.objective_names, to_numpy=False) + scores = parents_df[self.objective_names[0]].to_numpy() + #count non-nan values in the objective columns + n_evaluated = np.count_nonzero(~np.isnan(scores)) + if n_evaluated >0 : + enough_parents_evaluated=True + + # parents_df = self.population.get_column(self.population.population, column_names=self.objective_names+ ["Individual"], to_numpy=False) + # parents_df = parents_df[~parents_df[self.objective_names].isin(["TIMEOUT","INVALID"]).any(axis=1)] + # parents_df = parents_df[~parents_df[self.objective_names].isna().any(axis=1)] + + # cur_evaluated_population = parents_df["Individual"].to_numpy() + # if len(cur_evaluated_population) > 0: + # scores = parents_df[self.objective_names].to_numpy() + # weighted_scores = scores * self.objective_function_weights + # #number of crossover pairs and mutation only parent to generate + + # if len(parents_df) < 2: + # var_ops = ["mutate" for _ in range(n_individuals_to_submit)] + # else: + # var_ops = [self.rng.choice(["crossover","mutate_then_crossover","crossover_then_mutate",'mutate'],p=[self.crossover_probability,self.mutate_then_crossover_probability, self.crossover_then_mutate_probability,self.mutate_probability]) for _ in range(n_individuals_to_submit)] + + # parents = [] + # for op in var_ops: + # if op == "mutate": + # parents.extend(np.array(cur_evaluated_population)[self.parent_selector(weighted_scores, k=1, n_parents=1, rng_=self.rng)]) + # else: + # parents.extend(np.array(cur_evaluated_population)[self.parent_selector(weighted_scores, k=1, n_parents=2, rng_=self.rng)]) + + # #_offspring = self.population.create_offspring2(parents, var_ops, rng_=self.rng, add_to_population=True) + # offspring = self.population.create_offspring2(parents, var_ops, [ind_mutate], None, [ind_crossover], None, add_to_population=True, keep_repeats=False, mutate_until_unique=True, rng_=self.rng) + + if enough_parents_evaluated: + + parents = self.population.parent_select(selector=self.parent_selector, weights=self.objective_function_weights, columns_names=self.objective_names, k=n_individuals_to_submit, n_parents=2, rng_=self.rng) + p = np.array([self.crossover_probability, self.mutate_then_crossover_probability, self.crossover_then_mutate_probability, self.mutate_probability]) + p = p / p.sum() + var_op_list = self.rng.choice(["crossover", "mutate_then_crossover", "crossover_then_mutate", "mutate"], size=n_individuals_to_submit, p=p) + + for i, op in enumerate(var_op_list): if op == "mutate": - parents.extend(np.array(cur_evaluated_population)[self.parent_selector(weighted_scores, k=1, n_parents=1, rng_=self.rng)]) - else: - parents.extend(np.array(cur_evaluated_population)[self.parent_selector(weighted_scores, k=1, n_parents=2, rng_=self.rng)]) + parents[i] = parents[i][0] #mutations take a single individual - _offspring = self.population.create_offspring(parents, var_ops, rng_=self.rng, n_jobs=1, add_to_population=True) + offspring = self.population.create_offspring2(parents, var_op_list, [ind_mutate], None, [ind_crossover], None, add_to_population=True, keep_repeats=False, mutate_until_unique=True, rng_=self.rng) # If we don't have enough evaluated individuals to use as parents for variation, we create new individuals randomly # This can happen if the individuals in the initial population are invalid - if len(cur_evaluated_population) == 0 and len(submitted_futures) < self.max_queue_size: + elif len(submitted_futures) < self.max_queue_size: initial_population = self.population.evaluated_individuals.iloc[:self.initial_population_size*3] invalid_initial_population = initial_population[initial_population[self.objective_names].isin(["TIMEOUT","INVALID"]).any(axis=1)] diff --git a/tpot2/individual_representations/graph_pipeline_individual/individual.py b/tpot2/individual_representations/graph_pipeline_individual/individual.py index f890e80f..9ca1a9da 100644 --- a/tpot2/individual_representations/graph_pipeline_individual/individual.py +++ b/tpot2/individual_representations/graph_pipeline_individual/individual.py @@ -1137,7 +1137,10 @@ def _cached_transform(cache_nunber=0): pass def __str__(self): - return self.export_pipeline().__str__() + try: + return f"" def unique_id(self) -> GraphKey: if self.key is None: diff --git a/tpot2/population.py b/tpot2/population.py index a3a0c54c..4e842eb9 100644 --- a/tpot2/population.py +++ b/tpot2/population.py @@ -85,6 +85,7 @@ def __init__( self, column_names = ["Parents", "Variation_Function"] self.evaluated_individuals = pd.DataFrame(columns=column_names) self.evaluated_individuals["Parents"] = self.evaluated_individuals["Parents"].astype('object') + self.use_unique_id = True #Todo clean this up. perhaps pull unique_id() out of baseestimator and have it be supplied as a function self.n_jobs = n_jobs self.callback=callback @@ -125,7 +126,6 @@ def remove_invalid_from_population(self, column_names, invalid_value = "INVALID" ''' if isinstance(column_names, str): #TODO check this column_names = [column_names] - new_pop = [] is_valid = lambda ind: ind.unique_id() not in self.evaluated_individuals.index or invalid_value not in self.evaluated_individuals.loc[ind.unique_id(),column_names].to_list() self.population = [ind for ind in self.population if is_valid(ind)] @@ -301,13 +301,15 @@ def create_offspring(self, parents_list, var_op_list, rng_=None, add_to_populati parent_keys = [parent.unique_id() for parent in parents] if not pd.api.types.is_object_dtype(self.evaluated_individuals["Parents"]): #TODO Is there a cleaner way of doing this? Not required for some python environments? self.evaluated_individuals["Parents"] = self.evaluated_individuals["Parents"].astype('object') + if not pd.api.types.is_object_dtype(self.evaluated_individuals["Variation_Function"]):#TODO Is there a cleaner way of doing this? Not required for some python environments? + self.evaluated_individuals["Variation_Function"] = self.evaluated_individuals["Variation_Function"].astype('object') self.evaluated_individuals.at[new_child.unique_id(),"Parents"] = tuple(parent_keys) #if var_op is a function if hasattr(var_op, '__call__'): self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op.__name__ else: - self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op + self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = str(var_op) new_offspring.append(new_child) @@ -377,8 +379,16 @@ def create_offspring2(self, parents_list, var_op_list, mutation_functions,mutati if not pd.api.types.is_object_dtype(self.evaluated_individuals["Parents"]): #TODO Is there a cleaner way of doing this? Not required for some python environments? self.evaluated_individuals["Parents"] = self.evaluated_individuals["Parents"].astype('object') self.evaluated_individuals.at[new_child.unique_id(),"Parents"] = tuple(parent_keys) + + #check if Variation_Function variable is an object type + if not pd.api.types.is_object_dtype(self.evaluated_individuals["Variation_Function"]): #TODO Is there a cleaner way of doing this? Not required for some python environments? + self.evaluated_individuals["Variation_Function"] = self.evaluated_individuals["Variation_Function"].astype('object') - self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op + #if var_op is a function + if hasattr(var_op, '__call__'): + self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = var_op.__name__ + else: + self.evaluated_individuals.at[new_child.unique_id(),"Variation_Function"] = str(var_op) new_offspring.append(new_child) diff --git a/tpot2/utils/eval_utils.py b/tpot2/utils/eval_utils.py index d3fe68bc..ccc847f3 100644 --- a/tpot2/utils/eval_utils.py +++ b/tpot2/utils/eval_utils.py @@ -152,7 +152,6 @@ def parallel_eval_objective_list2(individual_list, submitted_futures = {} scores_dict = {} submitted_inds = set() - while len(submitted_futures) < max_queue_size and len(individual_stack)>0: individual = individual_stack.pop() future = client.submit(eval_objective_list, individual, objective_list, verbose=verbose, timeout=max_eval_time_seconds,**objective_kwargs) @@ -181,13 +180,25 @@ def parallel_eval_objective_list2(individual_list, if completed_future.exception() or completed_future.status == "error": #if the future is done and threw an error print("Exception in future") print(completed_future.exception()) - scores = ["INVALID"] + scores = [np.nan for _ in range(n_expected_columns)] + eval_error = "INVALID" elif completed_future.cancelled(): #if the future is done and was cancelled print("Cancelled future (likely memory related)") - scores = ["INVALID"] + scores = [np.nan for _ in range(n_expected_columns)] + eval_error = "INVALID" else: #if the future is done and did not throw an error, get the scores try: scores = completed_future.result() + #check if scores contain "INVALID" or "TIMEOUT" + if "INVALID" in scores: + eval_error = "INVALID" + scores = [np.nan for _ in range(n_expected_columns)] + elif "TIMEOUT" in scores: + eval_error = "TIMEOUT" + scores = [np.nan for _ in range(n_expected_columns)] + else: + eval_error = None + except Exception as e: print("Exception in future, but not caught by dask") print(e) @@ -196,7 +207,8 @@ def parallel_eval_objective_list2(individual_list, print("status", completed_future.status) print("done", completed_future.done()) print("cancelld ", completed_future.cancelled()) - scores = ["INVALID"] + scores = [np.nan for _ in range(n_expected_columns)] + eval_error = "INVALID" else: #if future is not done #check if the future has been running for too long, cancel the future @@ -206,7 +218,8 @@ def parallel_eval_objective_list2(individual_list, if verbose >= 4: print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n') - scores = ["TIMEOUT"] + scores = [np.nan for _ in range(n_expected_columns)] + eval_error = "TIMEOUT" else: continue #otherwise, continue to next future @@ -215,6 +228,7 @@ def parallel_eval_objective_list2(individual_list, scores_dict[cur_individual] = {"scores": scores, "start_time": submitted_futures[completed_future]["time"], "end_time": time.time(), + "eval_error": eval_error, } @@ -235,10 +249,9 @@ def parallel_eval_objective_list2(individual_list, final_scores = [scores_dict[individual]["scores"] for individual in individual_list] final_start_times = [scores_dict[individual]["start_time"] for individual in individual_list] final_end_times = [scores_dict[individual]["end_time"] for individual in individual_list] - + final_eval_errors = [scores_dict[individual]["eval_error"] for individual in individual_list] final_scores = process_scores(final_scores, n_expected_columns) - - return final_scores, final_start_times, final_end_times + return final_scores, final_start_times, final_end_times, final_eval_errors ###################