-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetaheuristics.py
1070 lines (909 loc) · 52.6 KB
/
metaheuristics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import os
from typing import Optional, Callable, Tuple, Union, List, Iterable, Dict, cast
import random
import binpacking
import joblib
import numpy as np
from math import tanh
from pyspark import SparkContext
import time
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import OrdinalEncoder, MinMaxScaler, PolynomialFeatures
from load_balancer_parameters import LoadBalancerParameters, SVMParameters, ClusteringParameters, RFParameters
from utils import get_clustering_scoring_method_enum_value, get_clustering_algorithm_enum_value, get_kernel_enum_value
logging.getLogger().setLevel(logging.INFO)
# Folders where the models dumps are stored
SVM_TRAINED_MODELS_DIR = 'Trained_models/svm/'
CLUSTERING_TRAINED_MODELS_DIR = 'Trained_models/clustering/'
RF_TRAINED_MODELS_DIR = 'Trained_models/rf/'
# Result tuple with [0] -> fitness value, [1] -> execution time, [2] -> Partition ID, [3] -> Hostname,
# [4] -> number of evaluated features, [5] -> time lapse description, [6] -> time by iteration and [7] -> avg test time
# [8] -> mean of number of iterations of the model inside the CV, [9] -> train score. Number values are -1.0 if there
# were some error.
CrossValidationSparkResult = Tuple[float, float, int, str, int, str, float, float, float, float]
def __get_prediction_function(load_balancer_parameters: LoadBalancerParameters) -> Callable[
[np.ndarray, Union[SVMParameters, ClusteringParameters, RFParameters], str, bool, Optional[int], bool],
Tuple[List, List]
]:
"""
Gets the corresponding function to predict the execution time for the load balancer.
:param load_balancer_parameters: LoadBalancerParameters instance to check which model it's being used (clustering,
SVM or RF).
:return: Function to predict the execution time for the load balancer.
"""
if isinstance(load_balancer_parameters, SVMParameters):
return predict_execution_times_svm
elif isinstance(load_balancer_parameters, ClusteringParameters):
return predict_execution_times_clustering
elif isinstance(load_balancer_parameters, RFParameters):
return predict_execution_times_rf
else:
logging.error(f'Unknown load balancer parameters: {load_balancer_parameters}')
exit(-1)
def generate_stars_and_partitions_bins(bins: List) -> Dict[int, int]:
"""
Generates a dict with the idx of the star and the assigned partition.
:param bins: Bins generated by binpacking.
:return: Dict where keys are star index, values are the Spark partition.
"""
stars_and_partitions: Dict[int, int] = {}
for partition_id, aux_bin in enumerate(bins):
for star_idx in aux_bin.keys():
stars_and_partitions[star_idx] = partition_id
return stars_and_partitions
def get_best_spark(
subsets: np.ndarray,
workers_results: List[Tuple[int, CrossValidationSparkResult]],
more_is_better: bool
) -> Tuple[int, np.ndarray, np.ndarray]:
"""
Gets the best idx, feature subset and fitness value obtained during one iteration of the metaheuristic
:param subsets: List of features lists used in every star
:param workers_results: List of fitness values obtained for every star
:param more_is_better: If True, it returns the highest value (SVM and RF C-Index), lowest otherwise (LogRank p-value)
:return: Best idx of the stars (Black Hole idx), subset of features for the Black Hole, and all the data returned
by the star that computes the best fitness (from now, Black Hole)
"""
# Converts to a Numpy array to discard the star's index
workers_results_np = np.array(workers_results, dtype=object)
workers_results_np_aux = np.array([a_list[0] for a_list in workers_results_np[:, 1]], dtype=float) # Creates numpy arrays from lists
if more_is_better:
best_idx = np.argmax(workers_results_np_aux)
else:
best_idx = np.argmin(workers_results_np_aux)
best_idx = cast(int, best_idx)
star_idx, black_hole_result = workers_results_np[best_idx]
# Searches by the id of the star in the Spark cluster to get the subset of features
black_hole_subset = None
for subset in subsets:
if subset[0] == star_idx:
black_hole_subset = subset[1]
break
return star_idx, black_hole_subset, black_hole_result
def parallelize_fitness_execution(
sc: SparkContext,
stars_subsets: np.ndarray,
fitness_function: Callable[[np.ndarray], CrossValidationSparkResult]
) -> List[CrossValidationSparkResult]:
"""
Parallelize the fitness function computing on an Apache Spark cluster and return fitness metrics
@param sc: Spark context
@param stars_subsets: Stars subset to parallelize
@param fitness_function: Fitness function
@return: All the fitness metrics from all the star, and an execution time by worker
"""
stars_parallelized = sc.parallelize(stars_subsets)
return stars_parallelized \
.map(lambda star_features: fitness_function(star_features[1])) \
.collect()
def map_partition(
fitness_function: Callable[[np.ndarray], CrossValidationSparkResult],
records: Iterable[CrossValidationSparkResult]
) -> Iterable[Tuple[int, CrossValidationSparkResult]]:
"""Returns fitness result for all the elements in partition records. The index is returned to get the association
of predicted times later as Sparks returns all the values in different order from which the stars are assigned
in first place"""
for key, elem in records:
yield key, fitness_function(elem)
def parallelize_fitness_execution_by_partitions(
sc: SparkContext,
stars_subsets: np.ndarray,
fitness_function: Callable[[np.ndarray], CrossValidationSparkResult],
number_of_workers: int,
load_balancer_parameters: Optional[Union[SVMParameters, ClusteringParameters, RFParameters]],
debug: bool
) -> Tuple[List[Tuple[int, CrossValidationSparkResult]], float, Dict[int, float]]:
"""
Parallelize the fitness function computing on an Apache Spark cluster and return fitness metrics.
This function generates a partitioning that distributes the load equally to all nodes.
@param sc: Spark context.
@param stars_subsets: Stars subset to parallelize.
@param fitness_function: Fitness function.
@param number_of_workers: Number of workers nodes in the Spark cluster.
@param load_balancer_parameters: Parameters of the Clustering/RF/SVM for the load balancer.
@param debug: If True, show information about predicted times for every sar.
@return: A tuple with the star index and all the metrics from all the star, the passed time between the distribution of data along the
Spark Cluster and the collect() calling, and a dict with star indexes as keys and predicted times from the load
balancer (empty list in case parameters is None) as values.
"""
stars_parallelized = sc.parallelize(stars_subsets)
n_stars = len(stars_subsets)
if load_balancer_parameters is not None:
if load_balancer_parameters.number_of_samples is None:
logging.error('The number of samples in the load balancer parameters is None. Exiting...')
logging.error(str(load_balancer_parameters))
exit(-1)
# Sets parameters
predict_f = __get_prediction_function(load_balancer_parameters)
use_min_max = False # NOTE: the best MSE obtained during experiments was without MinMaxScaler
# GradientBoostingRegressor was always the best one in the experiments
if use_min_max:
model_plk = 'best_gradient_booster_model.pkl'
else:
model_plk = 'best_gradient_booster_model_no_min_max.pkl'
poly_degree = None
report_data_without_min_max = debug
# Predicts the execution times for every star
predicted_times, transformed_data = predict_f(
stars_subsets,
load_balancer_parameters,
model_plk,
use_min_max,
poly_degree,
report_data_without_min_max,
)
stars_and_times = {k: v for (k, v) in zip(range(n_stars), predicted_times)}
# Checks for negative values
negative_values_idx = np.where(predicted_times < 0)[0]
if len(negative_values_idx) > 0:
idx = negative_values_idx[0]
problem_stars = stars_subsets[idx][1]
logging.info(f'The load balancer predicted a negative number for star with index {idx}. Exiting...')
logging.error(f'Number of features: {np.count_nonzero(problem_stars)}')
logging.error(load_balancer_parameters)
exit(-1)
if debug:
logging.info(f'Data to predict (with transformation):\n{transformed_data}')
logging.info(f'Predicted times for every star: {stars_and_times}')
# The complete reporting is only implemented for SVMParameters
# TODO: uncomment when needed. Needs to fix the missing models with MinMaxScalers
# if isinstance(load_balancer_parameters, SVMParameters):
# report_all_load_balancer_models_svm(n_stars, load_balancer_parameters, stars_subsets)
# Generates the bins. NOTE: 'number_of_workers' is the number of bins
bins = binpacking.to_constant_bin_number(stars_and_times, number_of_workers)
if debug:
for bin_idx, elem in enumerate(bins):
logging.info(f'Bin {bin_idx} has {len(elem.keys())} elements. Sums up to {sum(elem.values())} seconds.')
# Generates a dict and uses it to assign the partition ID
stars_and_partitions = generate_stars_and_partitions_bins(bins)
partition_f = lambda key: stars_and_partitions[key]
else:
stars_and_times = {star_idx: -1.0 for star_idx in range(n_stars)} # Assigns -1.0 to all the stars in case predictions are disabled
partition_f = lambda key: key * number_of_workers // len(stars_subsets)
# NOTE: the mapPartitions() allows Scikit-surv models to use all the worker's cores during CrossValidation.
# This avoids problems of Spark parallelization interfering with Scikit-surv parallelized algorithms
# VERY IMPORTANT NOTE: the index is returned in mapPartitions() to get the association of predicted times later
# as Sparks returns all the values in different order from which the stars are assigned in first place
start_parallelization = time.time()
result = stars_parallelized \
.partitionBy(number_of_workers, partitionFunc=partition_f) \
.mapPartitions(lambda records: map_partition(fitness_function, records), preservesPartitioning=True) \
.collect()
total_iteration_time = time.time() - start_parallelization
logging.info(f'partitionBy(), mapPartitions() and collect() time -> {total_iteration_time} seconds')
return result, total_iteration_time, stars_and_times
def get_random_subset_of_features(n_features: int, random_state: Optional[int] = None) -> np.ndarray:
"""
Generates a random subset of Features. Answer taken from https://stackoverflow.com/a/47942584/7058363
:param n_features: Total number of features
:param random_state: Random state to replicate experiments. It allows to set the same number of features for every
star and the same shuffling. This has to be different every time this method is called to prevent repeated features.
:return: Categorical array with {0, 1} values indicate the absence/presence of the feature in the index
"""
res = np.zeros(n_features, dtype=int) # Gets an array of all the features in zero
# Generates a random number of features in 1. Then, shuffles and returns as boolean
if random_state is not None:
random.seed(random_state)
random_number_of_features = random.randint(1, n_features)
res[:random_number_of_features] = 1
if random_state is not None:
np.random.seed(random_state)
np.random.shuffle(res)
return res
def get_best(
subsets: np.ndarray,
fitness_values: Union[np.ndarray, List[float]]
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Get the best value of the fitness values."""
best_idx = np.argmax(fitness_values) # Keeps the idx to avoid ambiguous comparisons
return best_idx, subsets[best_idx], fitness_values[best_idx]
def binary_black_hole(
n_stars: int,
n_features: int,
n_iterations: int,
fitness_function: Callable[[np.ndarray], CrossValidationSparkResult],
random_state: Optional[int],
binary_threshold: Optional[float] = 0.6,
debug: bool = False,
) -> Tuple[np.ndarray, np.ndarray, Dict]:
"""
Computes the metaheuristic Binary Black Hole Algorithm. Taken from the paper
"Binary black hole algorithm for feature selection and classification on biological data"
Authors: Elnaz Pashaei, Nizamettin Aydin.
:param n_stars: Number of stars
:param n_features: Number of features
:param n_iterations: Number of iterations
:param fitness_function: Fitness function to compute on every star
:param random_state: Random state to replicate experiments. It allows to set the same number of features for every
star and the same shuffling.
:param binary_threshold: Binary threshold to set 1 or 0 the feature. If None it'll be computed randomly
:param debug: If True logs everything is happening inside BBHA
:return:
"""
# Lists for storing times and fitness data to train models
number_of_features: List[int] = []
fitness: List[float] = []
time_exec: List[float] = []
svm_times_by_iteration: List[float] = []
time_test: List[float] = []
num_of_iterations: List[float] = []
train_scores: List[float] = []
# Data structs setup
stars_subsets = np.empty((n_stars, n_features), dtype=int)
stars_fitness_values = np.empty((n_stars,), dtype=float)
# Initializes the stars with their subsets and their fitness values
if debug:
logging.info('Initializing stars...')
for i in range(n_stars):
# Adds 8 to difference the random state in the initialization of every star and the rest of the BBHA iterations
current_random_state = random_state * (i + 8) if random_state is not None else None
random_features_to_select = get_random_subset_of_features(n_features, current_random_state)
stars_subsets[i] = random_features_to_select # Initialize 'Population'
current_data = fitness_function(random_features_to_select)
current_fitness = current_data[0]
worker_execution_time = current_data[1]
evaluated_features = current_data[4]
time_by_iteration = current_data[6]
model_test_time = current_data[7]
mean_num_of_iterations = current_data[8]
train_score = current_data[9]
number_of_features.append(evaluated_features)
fitness.append((round(current_fitness, 4)))
time_exec.append(round(worker_execution_time, 4))
svm_times_by_iteration.append(round(time_by_iteration, 4))
time_test.append(round(model_test_time, 4))
num_of_iterations.append(round(mean_num_of_iterations, 4))
train_scores.append(round(train_score, 4))
stars_fitness_values[i] = current_fitness
# The star with the best fitness is the Black Hole
black_hole_idx, black_hole_subset, black_hole_fitness = get_best(stars_subsets, stars_fitness_values)
if debug:
logging.info(f'Black hole starting as star at index {black_hole_idx}')
# Iterations
for i in range(n_iterations):
if debug:
logging.info(f'Iteration {i + 1}/{n_iterations}')
for a in range(n_stars):
# Computes the current star fitness
current_star_subset = stars_subsets[a]
current_data = fitness_function(current_star_subset)
current_fitness = current_data[0]
worker_execution_time = current_data[1]
evaluated_features = current_data[4]
time_by_iteration = current_data[6]
model_test_time = current_data[7]
mean_num_of_iterations = current_data[8]
train_score = current_data[9]
number_of_features.append(evaluated_features)
fitness.append((round(current_fitness, 4)))
time_exec.append(round(worker_execution_time, 4))
svm_times_by_iteration.append(round(time_by_iteration, 4))
time_test.append(round(model_test_time, 4))
num_of_iterations.append(round(mean_num_of_iterations, 4))
train_scores.append(round(train_score, 4))
# If it's the black hole, skips the computation
if a == black_hole_idx:
continue
# If it's the best fitness, swaps that star with the current black hole
if current_fitness > black_hole_fitness:
if debug:
logging.info(f'Changing Black hole for star {a},'
f' BH fitness -> {black_hole_fitness} | Star {a} fitness -> {current_fitness}')
black_hole_idx = a
black_hole_subset, current_star_subset = current_star_subset.copy(), black_hole_subset.copy()
black_hole_fitness, current_fitness = current_fitness, black_hole_fitness
# If the fitness function was the same, but had fewer features in the star (better!), makes the swap
elif current_fitness == black_hole_fitness and np.count_nonzero(current_star_subset) < np.count_nonzero(
black_hole_subset):
if debug:
logging.info(f'Changing Black hole for star {a},'
f' BH fitness -> {black_hole_fitness} | Star {a} fitness -> {current_fitness}')
black_hole_idx = a
black_hole_subset, current_star_subset = current_star_subset.copy(), black_hole_subset.copy()
black_hole_fitness, current_fitness = current_fitness, black_hole_fitness
# Computes the event horizon
event_horizon = black_hole_fitness / np.sum(stars_fitness_values)
# Checks if the current star falls in the event horizon
dist_to_black_hole = np.linalg.norm(black_hole_subset - current_star_subset) # Euclidean distance
if dist_to_black_hole < event_horizon:
if debug:
logging.info(f'Star {a} has fallen inside event horizon. '
f'Event horizon -> {event_horizon} | Star distance -> {dist_to_black_hole}')
current_random_state = random_state * (i * (a + 1)) if random_state is not None else None
stars_subsets[a] = get_random_subset_of_features(n_features, current_random_state)
# Updates the binary array of the used features
for a in range(n_stars):
# Skips the black hole
if black_hole_idx == a:
continue
for d in range(n_features):
x_old = stars_subsets[a][d]
if random_state is not None:
np.random.seed(random_state * (d * (a + 1)))
threshold = binary_threshold if binary_threshold is not None else random.uniform(0, 1)
if random_state is not None:
np.random.seed(random_state * (d * (a + 2)))
x_new = x_old + random.uniform(0, 1) * (black_hole_subset[d] - x_old) # Position
stars_subsets[a][d] = 1 if abs(tanh(x_new)) > threshold else 0
# Adds all the metrics to the result
result_dict = {
'number_of_features': number_of_features,
'execution_times': time_exec,
'fitness': fitness,
'times_by_iteration': svm_times_by_iteration,
'test_times': time_test,
'train_scores': train_scores,
'number_of_iterations': num_of_iterations,
}
return black_hole_subset, black_hole_fitness, result_dict
def improved_binary_black_hole(
n_stars: int,
n_features: int,
n_iterations: int,
fitness_function: Callable[[np.ndarray], CrossValidationSparkResult],
coeff_1: float,
coeff_2: float,
binary_threshold: Optional[float] = 0.6,
debug: bool = False
):
"""
# TODO: add random_state
Computes the BBHA algorithm with some modification extracted from
"Improved black hole and multiverse algorithms for discrete sizing optimization of planar structures"
Authors: Saeed Gholizadeh, Navid Razavi & Emad Shojaei
:param n_stars: Number of stars
:param n_features: Number of features
:param n_iterations: Number of iterations
:param fitness_function: Fitness function to compute on every star
:param coeff_1: Parameter taken from the paper. Possible values = [2.2, 2.35]
:param coeff_2: Parameter taken from the paper. Possible values = [0.1, 0.2, 0.3]
:param binary_threshold: Binary threshold to set 1 or 0 the feature. If None it'll be computed randomly
:param debug: If True logs everything is happening inside BBHA
:return:
"""
print('Add random_state to the function before using it!')
exit(-1)
coeff_1_possible_values = [2.2, 2.35]
coeff_2_possible_values = [0.1, 0.2, 0.3]
if coeff_1 not in coeff_1_possible_values:
logging.info(f'The parameter coeff_1 must be one of the following values -> {coeff_1_possible_values}')
exit(1)
if coeff_2 not in coeff_2_possible_values:
logging.info(f'The parameter coeff_2 must be one of the following values -> {coeff_2_possible_values}')
exit(1)
# Data structs setup
stars_subsets = np.empty((n_stars, n_features), dtype=int)
stars_best_subset = np.empty((n_stars, n_features), dtype=int)
stars_fitness_values = np.empty((n_stars,), dtype=float)
stars_best_fitness_values = np.empty((n_stars,), dtype=float)
# Initializes the stars with their subsets and their fitness values
if debug:
logging.info('Initializing stars...')
for i in range(n_stars):
random_features_to_select = get_random_subset_of_features(n_features)
stars_subsets[i] = random_features_to_select # Initializes 'Population'
stars_fitness_values[i] = fitness_function(random_features_to_select)
# Best fitness and position
stars_best_fitness_values[i] = stars_fitness_values[i]
stars_best_subset[i] = stars_subsets[i]
# The star with the best fitness is the Black Hole
black_hole_idx, black_hole_subset, black_hole_fitness = get_best(stars_subsets, stars_fitness_values)
if debug:
logging.info(f'Black hole starting as star at index {black_hole_idx}')
# Iterations
for i in range(n_iterations):
if debug:
logging.info(f'Iteration {i + 1}/{n_iterations}')
for a in range(n_stars):
# If it's the black hole, skips the computation
if a == black_hole_idx:
continue
# Compute the current star fitness
current_star_subset = stars_subsets[a]
current_fitness = fitness_function(current_star_subset)
# Sets the best fitness and position
if current_fitness > stars_best_fitness_values[a]:
stars_best_fitness_values[a] = current_fitness
stars_best_subset[a] = current_star_subset
# If it's the best fitness, swaps that star with the current black hole
if current_fitness > black_hole_fitness:
if debug:
logging.info(f'Changing Black hole for star {a},'
f' BH fitness -> {black_hole_fitness} | Star {a} fitness -> {current_fitness}')
black_hole_idx = a
black_hole_subset, current_star_subset = current_star_subset.copy(), black_hole_subset.copy()
black_hole_fitness, current_fitness = current_fitness, black_hole_fitness
# If the fitness function was the same, but had fewer features in the star (better!), makes the swap
elif current_fitness == black_hole_fitness and np.count_nonzero(current_star_subset) < np.count_nonzero(
black_hole_subset):
if debug:
logging.info(f'Changing Black hole for star {a},'
f' BH fitness -> {black_hole_fitness} | Star {a} fitness -> {current_fitness}')
black_hole_idx = a
black_hole_subset, current_star_subset = current_star_subset.copy(), black_hole_subset.copy()
black_hole_fitness, current_fitness = current_fitness, black_hole_fitness
# Computes the event horizon
# Improvement 1: new function to define the event horizon
event_horizon = (1 / black_hole_fitness) / np.sum(1 / stars_fitness_values)
# Checks if the current star falls in the event horizon
dist_to_black_hole = np.linalg.norm(black_hole_subset - current_star_subset) # Euclidean distance
if dist_to_black_hole < event_horizon:
if debug:
logging.info(f'Star {a} has fallen inside event horizon. '
f'Event horizon -> {event_horizon} | Star distance -> {dist_to_black_hole}')
# Improvement 2: only ONE dimension of the feature array is changed
random_feature_idx = random.randint(0, n_features - 1)
stars_subsets[a][random_feature_idx] ^= 1 # Toggle 0/1
# Updates the binary array of the used features
# Improvement 3: new formula to 'move' the star
w = 1 - (i / n_iterations)
d1 = coeff_1 + w
d2 = coeff_2 + w
for a in range(n_stars):
# Skips the black hole
if black_hole_idx == a:
continue
for d in range(n_features):
x_old = stars_subsets[a][d]
x_best = stars_best_subset[a][d]
threshold = binary_threshold if binary_threshold is not None else random.uniform(0, 1)
bh_star_diff = black_hole_subset[d] - x_old
star_best_fit_diff = x_best - x_old
x_new = x_old + (d1 * random.uniform(0, 1) * bh_star_diff) + (
d2 * random.uniform(0, 1) * star_best_fit_diff)
stars_subsets[a][d] = 1 if abs(tanh(x_new)) > threshold else 0
return black_hole_subset, black_hole_fitness
def __get_subset_for_idx(idx_to_search: int, stars_results_values: np.ndarray) -> Tuple[int, np.ndarray]:
"""Finds the element in the stars_results_values array where the first position is equal to the idx."""
for idx_subset, elem in enumerate(stars_results_values):
if elem[0] == idx_to_search:
return idx_subset, elem[1]
logging.info(f'No element found for idx {idx_to_search}. Exiting...')
exit(-1)
def binary_black_hole_spark(
n_stars: int,
n_features: int,
n_iterations: int,
fitness_function: Callable[[np.ndarray], CrossValidationSparkResult],
sc: SparkContext,
number_of_workers: int,
load_balancer_parameters: Optional[Union[SVMParameters, ClusteringParameters, RFParameters]],
more_is_better: bool,
random_state: Optional[int],
binary_threshold: Optional[float] = 0.6,
debug: bool = False,
) -> Tuple[np.ndarray, float, np.ndarray, Dict]:
"""
Computes the metaheuristic Binary Black Hole Algorithm in a Spark cluster. Taken from the paper
"Binary black hole algorithm for feature selection and classification on biological data"
Authors: Elnaz Pashaei, Nizamettin Aydin.
:param n_stars: Number of stars
:param n_features: Number of features
:param n_iterations: Number of iterations
:param fitness_function: Fitness function to compute on every star
:param sc: Spark Context
:param number_of_workers: Number of workers in the Spark cluster
:param load_balancer_parameters: Parameters of the RF/SVM for the load balancer. Only used if 'load_balancer_parameters' != True
:param more_is_better: If True, it returns the highest value (SVM and RF C-Index), lowest otherwise (LogRank p-value)
:param random_state: Random state to replicate experiments. It allows to set the same number of features for every
star and the same shuffling.
:param binary_threshold: Binary threshold to set 1 or 0 the feature. If None it'll be computed randomly
:param debug: If True logs everything is happening inside BBHA
:return: The best subset found, the best fitness value found, the data returned by the Worker for the Black Hole, and all the data collected about times and execution during the experiment
"""
# Lists for storing times and fitness data to train models
number_of_features: List[int] = []
hosts: List[str] = []
partition_ids: List[int] = []
fitness: List[float] = []
time_exec: List[float] = []
predicted_time_exec: List[float] = []
svm_times_by_iteration: List[float] = []
time_test: List[float] = []
num_of_iterations: List[float] = []
train_scores: List[float] = []
# Data structs setup
stars_subsets = np.empty((n_stars, 2), dtype=object) # 2 = (1, features)
# Initializes the stars with their subsets and their fitness values
if debug:
logging.info('Initializing stars...')
for i in range(n_stars):
# Adds 8 to difference the random state in the initialization of every star and the rest of the BBHA iterations
current_random_state = random_state * (i + 8) if random_state is not None else None
random_features_to_select = get_random_subset_of_features(n_features, current_random_state)
stars_subsets[i] = (i, random_features_to_select) # Initializes 'Population' with a key for partitionBy()
initial_stars_results_values, _initial_total_time, initial_predicted_times_map = parallelize_fitness_execution_by_partitions(
sc,
stars_subsets,
fitness_function,
number_of_workers,
load_balancer_parameters,
debug
)
for star_idx_in_cluster, current_data in initial_stars_results_values:
current_fitness_mean = current_data[0]
worker_execution_time = current_data[1]
partition_id = current_data[2]
host_name = current_data[3]
evaluated_features = current_data[4]
time_by_iteration = current_data[6]
model_test_time = current_data[7]
mean_num_of_iterations = current_data[8]
train_score = current_data[9]
current_predicted_time = initial_predicted_times_map[star_idx_in_cluster]
number_of_features.append(evaluated_features)
hosts.append(host_name)
partition_ids.append(partition_id)
fitness.append((round(current_fitness_mean, 4)))
time_exec.append(round(worker_execution_time, 4))
svm_times_by_iteration.append(round(time_by_iteration, 4))
time_test.append(round(model_test_time, 4))
num_of_iterations.append(round(mean_num_of_iterations, 4))
train_scores.append(round(train_score, 4))
predicted_time_exec.append(round(current_predicted_time, 4))
logging.info(f'Star {star_idx_in_cluster} | Fitness: {current_fitness_mean}')
# The star with the best fitness is the black hole
black_hole_idx, black_hole_subset, black_hole_data = get_best_spark(stars_subsets,
initial_stars_results_values, more_is_better)
black_hole_fitness = black_hole_data[0]
if debug:
logging.info(f'Black hole starting as star at index {black_hole_idx}')
logging.info(f'Black hole fitness: {black_hole_fitness}')
# To report every Spark's Worker idle time
workers_idle_times: Dict[str, List[Tuple[int, float]]] = {}
workers_execution_times_per_iteration: Dict[str, List[Tuple[int, float]]] = {}
# Iterations
for i in range(n_iterations):
# To report every Spark's Worker idle time. It stores the execution time for the current iteration
workers_execution_times: Dict[str, float] = {}
if debug:
logging.info(f'Iteration {i + 1}/{n_iterations}')
stars_results_values, total_iteration_time, predicted_times_map = parallelize_fitness_execution_by_partitions(
sc,
stars_subsets,
fitness_function,
number_of_workers,
load_balancer_parameters,
debug
)
for star_idx_in_cluster, current_data in stars_results_values:
current_fitness_mean = current_data[0]
worker_execution_time = current_data[1]
partition_id = current_data[2]
host_name = current_data[3]
evaluated_features = current_data[4]
time_lapse_description = current_data[5]
time_by_iteration = current_data[6]
model_test_time = current_data[7]
mean_num_of_iterations = current_data[8]
train_score = current_data[9]
current_predicted_time = predicted_times_map[star_idx_in_cluster]
number_of_features.append(evaluated_features)
hosts.append(host_name)
partition_ids.append(partition_id)
fitness.append((round(current_fitness_mean, 4)))
time_exec.append(round(worker_execution_time, 4))
svm_times_by_iteration.append(round(time_by_iteration, 4))
time_test.append(round(model_test_time, 4))
num_of_iterations.append(round(mean_num_of_iterations, 4))
train_scores.append(round(train_score, 4))
predicted_time_exec.append(round(current_predicted_time, 4))
# Adds execution times to compute the idle time for all the workers. It's the difference between the
# time it took to the master to distribute all the computations and get the result from all the workers;
# and the sum of all the execution times for every star every Worker got
if host_name not in workers_execution_times:
workers_execution_times[host_name] = 0.0
workers_execution_times[host_name] += worker_execution_time # Adds the time of the Worker to compute this star
if debug:
logging.info(f'{star_idx_in_cluster} star took {round(worker_execution_time, 3)} seconds ({time_lapse_description}) '
f'for {evaluated_features} features. Partition: {partition_id} | '
f'Host name: {host_name}. Fitness: {current_fitness_mean}')
# If it's the black hole, skips the computation
if star_idx_in_cluster == black_hole_idx:
continue
# Gets the star subset of features and its fitness value
idx_in_subsets, current_star_subset = __get_subset_for_idx(star_idx_in_cluster, stars_subsets)
# If it's the best fitness, swaps that star with the current black hole
if (more_is_better and current_fitness_mean > black_hole_fitness) or \
(not more_is_better and current_fitness_mean < black_hole_fitness):
if debug:
logging.info(f'Changing Black hole for star {star_idx_in_cluster}, '
f'BH fitness -> {black_hole_fitness} | Star in position {star_idx_in_cluster} '
f'fitness -> {current_fitness_mean}')
black_hole_idx = star_idx_in_cluster
black_hole_subset, current_star_subset = current_star_subset, black_hole_subset
black_hole_data, current_data = current_data, black_hole_data
black_hole_fitness, current_fitness_mean = current_fitness_mean, black_hole_fitness
# If the fitness function was the same, but had fewer features in the star (better!), makes the swap
elif current_fitness_mean == black_hole_fitness and np.count_nonzero(current_star_subset) < np.count_nonzero(
black_hole_subset):
if debug:
logging.info(f'Changing Black hole for star {star_idx_in_cluster}, '
f'BH fitness -> {black_hole_fitness} | Star in position {star_idx_in_cluster} '
f'fitness -> {current_fitness_mean}')
black_hole_idx = star_idx_in_cluster
black_hole_subset, current_star_subset = current_star_subset, black_hole_subset
black_hole_data, current_data = current_data, black_hole_data
black_hole_fitness, current_fitness_mean = current_fitness_mean, black_hole_fitness
# Computes the event horizon
event_horizon = black_hole_fitness / np.sum(current_fitness_mean)
# Checks if the current star falls in the event horizon
dist_to_black_hole = np.linalg.norm(black_hole_subset - current_star_subset) # Euclidean distance
if dist_to_black_hole < event_horizon:
if debug:
logging.info(f'Star {star_idx_in_cluster} has fallen '
f'inside event horizon. '
f'Event horizon -> {event_horizon} | Star distance -> {dist_to_black_hole}')
current_random_state = random_state * (i * (star_idx_in_cluster + 1)) if random_state is not None else None
stars_subsets[idx_in_subsets][1] = get_random_subset_of_features(n_features, current_random_state)
# Stores the idle time for every worker in this iteration
for host_name, sum_execution_times in workers_execution_times.items():
if host_name not in workers_idle_times:
workers_idle_times[host_name] = []
if host_name not in workers_execution_times_per_iteration:
workers_execution_times_per_iteration[host_name] = []
if debug:
logging.info(f'The worker {host_name} has taken ~{sum_execution_times} seconds to compute all its stars')
workers_execution_times_per_iteration[host_name].append((i, sum_execution_times))
workers_idle_times[host_name].append((i, total_iteration_time - sum_execution_times))
# Updates the binary array of the used features
for a in range(n_stars):
# This is useful to replicate random_state with sequential experiments
# if Spark randomizes all the stars order when collecting the results
star_idx_in_cluster = stars_subsets[a][0]
# Skips the black hole
if black_hole_idx == star_idx_in_cluster:
logging.info(f'Skipping black hole in position {a} of the array ({star_idx_in_cluster} in cluster)')
continue
for d in range(n_features):
x_old = stars_subsets[a][1][d]
if random_state is not None:
random.seed(random_state * (d * (star_idx_in_cluster + 1)))
threshold = binary_threshold if binary_threshold is not None else random.uniform(0, 1)
if random_state is not None:
random.seed(random_state * (d * (star_idx_in_cluster + 2)))
x_new = x_old + random.uniform(0, 1) * (black_hole_subset[d] - x_old) # Position
stars_subsets[a][1][d] = 1 if abs(tanh(x_new)) > threshold else 0
# Computes avg/std of idle times for all the iterations for every Worker
workers_idle_times_res: Dict[str, Dict[str, float]] = {}
for host_name, idle_times in workers_idle_times.items():
only_times = np.array(idle_times)[:, 1] # Discards iteration number
workers_idle_times_res[host_name] = {
'mean': round(cast(float, np.mean(only_times)), 4),
'std': round(cast(float, np.std(only_times)), 4)
}
# NOTE: the rest of the data such as 'dataset' and 'model' is added in core.py
result_dict = {
'number_of_features': number_of_features,
'execution_times': time_exec,
'predicted_execution_times': predicted_time_exec,
'fitness': fitness,
'times_by_iteration': svm_times_by_iteration,
'test_times': time_test,
'train_scores': train_scores,
'number_of_iterations': num_of_iterations,
'hosts': hosts,
'workers_execution_times_per_iteration': workers_execution_times_per_iteration,
'workers_idle_times': workers_idle_times_res, # Yes, names are confusing, but workers_idle_times_res has mean and std
'workers_idle_times_per_iteration': workers_idle_times,
'partition_ids': partition_ids
}
return black_hole_subset, black_hole_fitness, black_hole_data, result_dict
def report_all_load_balancer_models_svm(n_stars: int, parameters: 'SVMParameters', stars_subsets: np.ndarray):
"""Reports the predicted times for all the trained models for the SVM model. Useful for DEBUG."""
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters, model_plk='best_linear_model.pkl', use_min_max=True)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Linear d=1 with MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters, model_plk='best_linear_model_no_min_max.pkl', use_min_max=False)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Linear d=1 NO MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters, model_plk='best_linear_model_2.pkl', use_min_max=True, poly_degree=2)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Linear d=2 with MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters, model_plk='best_linear_model_2_no_min_max.pkl', use_min_max=False, poly_degree=2)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Linear d=2 NO MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters,
model_plk='best_gradient_booster_model.pkl', use_min_max=True)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Gradient booster with MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters,
model_plk='best_gradient_booster_model_no_min_max.pkl',
use_min_max=False)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Gradient booster NO MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters, model_plk='best_linear_model_3.pkl',
use_min_max=True, poly_degree=3)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Linear d=3 with MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters,
model_plk='best_linear_model_3_no_min_max.pkl', use_min_max=False,
poly_degree=3)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted Linear d=3 NO MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters, model_plk='best_nn_model.pkl',
use_min_max=True)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted NN with MinMax (in {diff} seconds): {stars_and_times_aux}')
start = time.time()
predicted_times_aux, _ = predict_execution_times_svm(stars_subsets, parameters,
model_plk='best_nn_model_no_min_max.pkl', use_min_max=False)
diff = round(time.time() - start, 4)
stars_and_times_aux = {k: round(v, 4) for (k, v) in zip(range(n_stars), predicted_times_aux)}
logging.info(f'Predicted NN NO MinMax (in {diff} seconds): {stars_and_times_aux}')
def predict_execution_times_svm(stars: np.ndarray, parameters: SVMParameters,
model_plk: str, use_min_max: bool,
poly_degree: Optional[int] = None,
report_data_without_min_max: bool = False) -> Tuple[List, List]:
"""
Predicts execution times for every one of the stars using the load balancer parameters for a SVM model.
:param stars: Stars with the number of features to compute
:param parameters: SVM parameters
:param model_plk: Model's pkl file name
:param use_min_max: If True, a MinMax transformation is computed with some features
:param poly_degree: If not None, a Polynomial transformation is computed. Useful for LinearRegression models
:param report_data_without_min_max: If True, logs the input for the load balancer
:return: List with all the predictions made for every star, and a list with the data from which the model makes
all the predictions
"""
# Loads models
model_plk_path = os.path.join(SVM_TRAINED_MODELS_DIR, model_plk)
logging.info(f'Loading model {model_plk_path}')
trained_model: GradientBoostingRegressor = joblib.load(model_plk_path)
ord_encoder: OrdinalEncoder = joblib.load(os.path.join(SVM_TRAINED_MODELS_DIR, 'ord_encoder.pkl'))
# Prevents errors for first load balancer models which were trained with fewer data than real. So this prevents
# errors with unknown values
ord_encoder.handle_unknown = 'use_encoded_value'
ord_encoder.unknown_value = -1
# Generates number_of_features, number_of_samples, kernel, optimizer
kernel_enum = get_kernel_enum_value(parameters.kernel).value
categorical_features = np.array([parameters.optimizer]).reshape(1, -1)
optimizer_transformed = ord_encoder.transform(categorical_features)[0][0]
x = []
for star in stars:
star_n_features = np.count_nonzero(star[1])
star_data = [star_n_features, parameters.number_of_samples, kernel_enum, optimizer_transformed]
x.append(star_data)
# Min-Max scaling 'number_of_features' (0) and 'number_of_samples' (1)
x = np.array(x)
if report_data_without_min_max:
logging.info(f'Data to predict (without transformation):\n{x}')
if use_min_max:
logging.info('Using MinMaxScaler')
# Changes dtype of data to float32 to allow min_max scaler to work (otherwise number of features and number
# of samples will be truncated to 0)
x = x.astype(np.float32)
min_max_scaler: MinMaxScaler = joblib.load(os.path.join(SVM_TRAINED_MODELS_DIR, 'min_max_scaler.pkl'))
x[:, [0, 1]] = min_max_scaler.transform(x[:, [0, 1]])
else:
logging.info('NOT using MinMaxScaler')
if poly_degree is not None:
x = PolynomialFeatures(degree=poly_degree, include_bias=False).fit_transform(x)
predictions = trained_model.predict(x)
return predictions, x
def predict_execution_times_clustering(stars: np.ndarray, parameters: ClusteringParameters,
model_plk: str, use_min_max: bool,
poly_degree: Optional[int] = None,
report_data_without_min_max: bool = False) -> Tuple[List, List]:
"""
Predicts execution times for every one of the stars using the load balancer parameters for a Clustering model.
:param stars: Stars with the number of features to compute
:param parameters: ClusteringParameters instance.
:param model_plk: Model's pkl file name
:param use_min_max: If True, a MinMax transformation is computed with some features
:param poly_degree: If not None, a Polynomial transformation is computed. Useful for LinearRegression models
:param report_data_without_min_max: If True, logs the input for the load balancer
:return: List with all the predictions made for every star, and a list with the data from which the model makes
all the predictions
"""
# Loads models
model_plk_path = os.path.join(CLUSTERING_TRAINED_MODELS_DIR, model_plk)
logging.info(f'Loading model {model_plk_path}')
trained_model: GradientBoostingRegressor = joblib.load(model_plk_path)
# Gets the numeric representation of algorithm, scoring_method with the same values used at Multiomix
# to get the correct ordinal value
algorithm_enum = get_clustering_algorithm_enum_value(parameters.algorithm).value
scoring_method_enum = get_clustering_scoring_method_enum_value(parameters.scoring_method).value
x = []
for star in stars:
star_n_features = np.count_nonzero(star[1])
star_data = [star_n_features, parameters.number_of_samples, parameters.number_of_clusters,
algorithm_enum, scoring_method_enum]
x.append(star_data)
# Min-Max scaling 'number_of_features' (0) and 'number_of_samples' (1)
x = np.array(x)
if report_data_without_min_max: