Skip to content

Commit

Permalink
Merge pull request #75 from aai-institute/infinimol
Browse files Browse the repository at this point in the history
Minor improvements in type annotations
  • Loading branch information
opcode81 authored Feb 20, 2024
2 parents d560fe0 + ba00fba commit 363d139
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
48 changes: 24 additions & 24 deletions src/sensai/evaluation/eval_stats/eval_stats_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class ClassificationMetric(Metric["ClassificationEvalStats"], ABC):
requires_probabilities = False

def __init__(self, name=None, bounds: Tuple[float, float] = (0, 1), requires_probabilities: Optional[bool] = None):
def __init__(self, name: Optional[str] = None, bounds: Tuple[float, float] = (0, 1), requires_probabilities: Optional[bool] = None):
"""
:param name: the name of the metric; if None use the class' name attribute
:param bounds: the minimum and maximum values the metric can take on
Expand All @@ -38,7 +38,7 @@ def __init__(self, name=None, bounds: Tuple[float, float] = (0, 1), requires_pro
def compute_value_for_eval_stats(self, eval_stats: "ClassificationEvalStats"):
return self.compute_value(eval_stats.y_true, eval_stats.y_predicted, eval_stats.y_predicted_class_probabilities)

def compute_value(self, y_true, y_predicted, y_predicted_class_probabilities=None):
def compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: Optional[PredictionArray] = None):
if self.requires_probabilities and y_predicted_class_probabilities is None:
raise ValueError(f"{self} requires class probabilities")
return self._compute_value(y_true, y_predicted, y_predicted_class_probabilities)
Expand All @@ -51,14 +51,14 @@ def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
class ClassificationMetricAccuracy(ClassificationMetric):
name = "accuracy"

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
return accuracy_score(y_true=y_true, y_pred=y_predicted)


class ClassificationMetricBalancedAccuracy(ClassificationMetric):
name = "balancedAccuracy"

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
return balanced_accuracy_score(y_true=y_true, y_pred=y_predicted)


Expand Down Expand Up @@ -86,7 +86,7 @@ def __init__(self, *labels: Any, probability_threshold=None, zero_value=0.0):
self.probability_threshold = probability_threshold
self.zero_value = zero_value

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
y_true = np.array(y_true)
y_predicted = np.array(y_predicted)
indices = []
Expand All @@ -111,7 +111,7 @@ class ClassificationMetricGeometricMeanOfTrueClassProbability(ClassificationMetr
name = "geoMeanTrueClassProb"
requires_probabilities = True

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
y_predicted_proba_true_class = np.zeros(len(y_true))
for i in range(len(y_true)):
true_class = y_true[i]
Expand All @@ -131,7 +131,7 @@ def __init__(self, n: int):
self.n = n
super().__init__(name=f"top{n}Accuracy")

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
labels = y_predicted_class_probabilities.columns
cnt = 0
for i, rowValues in enumerate(y_predicted_class_probabilities.values.tolist()):
Expand All @@ -156,7 +156,7 @@ def __init__(self, threshold: float, zero_value=0.0):
self.zeroValue = zero_value
super().__init__(name=f"accuracy[p_max >= {threshold}]")

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
labels = y_predicted_class_probabilities.columns
label_to_col_idx = {l: i for i, l in enumerate(labels)}
rel_freq = RelativeFrequencyCounter()
Expand Down Expand Up @@ -188,7 +188,7 @@ def __init__(self, threshold: float):
self.threshold = threshold
super().__init__(name=f"relFreq[p_max >= {threshold}]")

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
rel_freq = RelativeFrequencyCounter()
for i, probabilities in enumerate(y_predicted_class_probabilities.values.tolist()):
p_max = np.max(probabilities)
Expand All @@ -211,7 +211,7 @@ class BinaryClassificationMetricPrecision(BinaryClassificationMetric):
def __init__(self, positive_class_label):
super().__init__(positive_class_label)

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
return precision_score(y_true, y_predicted, pos_label=self.positiveClassLabel, zero_division=0)

def get_paired_metrics(self) -> List[BinaryClassificationMetric]:
Expand All @@ -224,7 +224,7 @@ class BinaryClassificationMetricRecall(BinaryClassificationMetric):
def __init__(self, positive_class_label):
super().__init__(positive_class_label)

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
return recall_score(y_true, y_predicted, pos_label=self.positiveClassLabel)


Expand All @@ -234,7 +234,7 @@ class BinaryClassificationMetricF1Score(BinaryClassificationMetric):
def __init__(self, positive_class_label):
super().__init__(positive_class_label)

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
return f1_score(y_true, y_predicted, pos_label=self.positiveClassLabel)


Expand Down Expand Up @@ -264,7 +264,7 @@ def compute_value_for_eval_stats(self, eval_stats: "ClassificationEvalStats"):
best_recall = recall
return self.zero_value if best_recall is None else best_recall

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
raise NotImplementedError(f"{self.__class__.__qualname__} only supports computeValueForEvalStats")


Expand All @@ -285,7 +285,7 @@ def __init__(self, threshold: float, positive_class_label: Any, zero_value=0.0):
self.zero_value = zero_value
super().__init__(positive_class_label, name=f"precision[{threshold}]")

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
rel_freq_correct = RelativeFrequencyCounter()
class_idx_positive = list(y_predicted_class_probabilities.columns).index(self.positiveClassLabel)
for i, (probabilities, classLabel_true) in enumerate(zip(y_predicted_class_probabilities.values.tolist(), y_true)):
Expand Down Expand Up @@ -315,7 +315,7 @@ def __init__(self, threshold: float, positive_class_label: Any, zero_value=0.0):
self.zero_value = zero_value
super().__init__(positive_class_label, name=f"recall[{threshold}]")

def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):
def _compute_value(self, y_true: PredictionArray, y_predicted: PredictionArray, y_predicted_class_probabilities: PredictionArray):
rel_freq_recalled = RelativeFrequencyCounter()
class_idx_positive = list(y_predicted_class_probabilities.columns).index(self.positiveClassLabel)
for i, (probabilities, classLabel_true) in enumerate(zip(y_predicted_class_probabilities.values.tolist(), y_true)):
Expand All @@ -327,12 +327,12 @@ def _compute_value(self, y_true, y_predicted, y_predicted_class_probabilities):


class ClassificationEvalStats(PredictionEvalStats["ClassificationMetric"]):
def __init__(self, y_predicted: PredictionArray = None,
y_true: PredictionArray = None,
y_predicted_class_probabilities: pd.DataFrame = None,
labels: PredictionArray = None,
metrics: Sequence["ClassificationMetric"] = None,
additional_metrics: Sequence["ClassificationMetric"] = None,
def __init__(self, y_predicted: Optional[PredictionArray] = None,
y_true: Optional[PredictionArray] = None,
y_predicted_class_probabilities: Optional[pd.DataFrame] = None,
labels: Optional[PredictionArray] = None,
metrics: Optional[Sequence["ClassificationMetric"]] = None,
additional_metrics: Optional[Sequence["ClassificationMetric"]] = None,
binary_positive_label=GUESS):
"""
:param y_predicted: the predicted class labels
Expand Down Expand Up @@ -480,18 +480,18 @@ def get_combined_eval_stats(self) -> ClassificationEvalStats:


class ConfusionMatrix:
def __init__(self, y_true, y_predicted):
def __init__(self, y_true: PredictionArray, y_predicted: PredictionArray):
self.labels = sklearn.utils.multiclass.unique_labels(y_true, y_predicted)
self.confusionMatrix = confusion_matrix(y_true, y_predicted, labels=self.labels)

def plot(self, normalize=True, title_add: str = None):
def plot(self, normalize: bool = True, title_add: str = None):
title = 'Normalized Confusion Matrix' if normalize else 'Confusion Matrix (Counts)'
return plot_matrix(self.confusionMatrix, title, self.labels, self.labels, 'true class', 'predicted class', normalize=normalize,
title_add=title_add)


class BinaryClassificationCounts:
def __init__(self, is_positive_prediction: Sequence[bool], is_positive_ground_truth: Sequence[bool], zero_denominator_metric_value=0):
def __init__(self, is_positive_prediction: Sequence[bool], is_positive_ground_truth: Sequence[bool], zero_denominator_metric_value: float = 0.):
"""
:param is_positive_prediction: the sequence of Booleans indicating whether the model predicted the positive class
:param is_positive_ground_truth: the sequence of Booleans indicating whether the true class is the positive class
Expand Down
16 changes: 8 additions & 8 deletions src/sensai/hyperopt.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class SAHyperOpt(TrackingMixin):
log = log.getChild(__qualname__)

class State(SAState):
def __init__(self, params, random_state: Random, results: Dict, compute_metric: Callable[[Dict[str, Any]], float]):
def __init__(self, params: Dict[str, Any], random_state: Random, results: Dict, compute_metric: Callable[[Dict[str, Any]], float]):
self.compute_metric = compute_metric
self.results = results
self.params = dict(params)
Expand Down Expand Up @@ -445,13 +445,13 @@ def __init__(self,
ops_and_weights: List[Tuple[Callable[['SAHyperOpt.State'], 'SAHyperOpt.ParameterChangeOperator'], float]],
initial_parameters: Dict[str, Any],
metrics_evaluator: MetricsDictProvider,
metric_to_optimise,
minimise_metric=False,
collect_data_frame=True,
metric_to_optimise: str,
minimise_metric: bool = False,
collect_data_frame: bool = True,
csv_results_path: Optional[str] = None,
parameter_combination_equivalence_class_value_cache: ParameterCombinationEquivalenceClassValueCache = None,
p0=0.5,
p1=0.0):
p0: float = 0.5,
p1: float = 0.0):
"""
:param model_factory: a factory for the generation of models which is called with the current parameter combination
(all keyword arguments), initially initialParameters
Expand Down Expand Up @@ -521,15 +521,15 @@ def _eval_params(cls,
parameter_combination_equivalence_class_value_cache.set(params, metrics)
return metrics

def _compute_metric(self, params):
def _compute_metric(self, params: Dict[str, Any]):
metrics = self._eval_params(self.model_factory, self.evaluator_or_validator, self.parameters_metrics_collection,
self.parameter_combination_equivalence_class_value_cache, self.tracked_experiment, **params)
metric_value = metrics[self.metric_to_optimise]
if not self.minimise_metric:
return -metric_value
return metric_value

def run(self, max_steps=None, duration=None, random_seed=42, collect_stats=True):
def run(self, max_steps: Optional[int] = None, duration: Optional[float] = None, random_seed: int = 42, collect_stats: bool = True):
sa = SimulatedAnnealing(lambda: SAProbabilitySchedule(None, SAProbabilityFunctionLinear(p0=self.p0, p1=self.p1)),
self.ops_and_weights, max_steps=max_steps, duration=duration, random_seed=random_seed, collect_stats=collect_stats)
results = {}
Expand Down

0 comments on commit 363d139

Please sign in to comment.