-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbin_packing.py
82 lines (69 loc) · 3.26 KB
/
bin_packing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import List, Dict
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import OrdinalEncoder, MinMaxScaler
import binpacking
import numpy as np
from metaheuristics import get_random_subset_of_features
import joblib
from utils import KernelName, OptimizerName
# Just for the example
n_stars = 25
n_features = 10000
n_workers = 3
optimizer: OptimizerName = 'avltree'
kernel: KernelName = 'linear'
number_of_samples = 501 # Random number
def __generate_stars_and_partitions_bins(bins: List) -> Dict[int, int]:
"""
Generates a dict with the idx of the star and the assigned partition
:param bins: Bins generated by binpacking
:return: Dict where keys are star index, values are the Spark partition
"""
stars_and_partitions: Dict[int, int] = {}
for partition_id, aux_bin in enumerate(bins):
for star_idx in aux_bin.keys():
stars_and_partitions[star_idx] = partition_id
return stars_and_partitions
def predict_execution_times(stars: np.ndarray) -> List:
"""
Predicts execution times for every one of the stars
:param stars:
:return: List with all the predictions made for every star
"""
# Loads models
trained_model: GradientBoostingRegressor = joblib.load('Trained_models/svm/best_gradient_booster_model.pkl')
ord_encoder: OrdinalEncoder = joblib.load('Trained_models/svm/ord_encoder.pkl')
min_max_scaler: MinMaxScaler = joblib.load('Trained_models/svm/min_max_scaler.pkl')
# Generates number_of_features, model, number_of_samples, kernel, optimizer
categorical_features = np.array([kernel, optimizer]).reshape(1, -1)
kernel_transformed = ord_encoder.transform(categorical_features)[0][0]
optimizer_transformed = ord_encoder.transform(categorical_features)[0][1]
x = []
for star in stars:
star_n_features = np.count_nonzero(star[1])
star_data = [star_n_features, number_of_samples, kernel_transformed, optimizer_transformed]
x.append(star_data)
# Min-Max scaling 'number_of_features' (0) and 'number_of_samples' (1)
x = np.array(x)
x[:, [0, 1]] = min_max_scaler.transform(x[:, [0, 1]])
predictions = trained_model.predict(x)
return predictions
def main():
# Initializes stars
stars_subsets = np.empty((n_stars, 2), dtype=object) # 2 = (1, features)
for i in range(n_stars):
random_features_to_select = get_random_subset_of_features(n_features)
stars_subsets[i] = (i, random_features_to_select) # Initializes 'Population' with a key for partitionBy()
predicted_times = predict_execution_times(stars_subsets)
stars_and_times = {k: v for (k, v) in zip(range(n_stars), predicted_times)}
bins = binpacking.to_constant_bin_number(stars_and_times, n_workers) # n_workers is the number of bins
print("Stars (keys) and their predicted execution times (values):")
print(stars_and_times)
print(f"\nRepartition among {n_workers} bins (list of the above stars data):")
print(bins)
# Generates a dict with the idx of the star and the assigned partition
stars_and_partitions = __generate_stars_and_partitions_bins(bins)
print('\nFinal repartition. Keys are star index, values are the Spark partition:')
print(stars_and_partitions)
if __name__ == '__main__':
main()