-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTransformer
141 lines (110 loc) · 5.32 KB
/
Transformer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
import torch.nn.functional as F
data = pd.read_csv('mp_considated_v1.csv')
data = data[data['Measurement'] != -999]
bins = [0, 0.0005, 0.001, 0.008, 0.01, 1, 100000]
labels = [0, 1, 2, 3, 4, 5]
data['classification'] = pd.cut(data['Measurement'], bins=bins, labels=labels, right=False)
data['classification'] = pd.to_numeric(data['classification']).astype('int64')
data = data.sort_values(by='Year')
X = data.drop(columns=['Measurement','classification','windspeed (m/s)','Date']) # features
y = data['classification'] # target variaable
# 3. Divide the training set and test set
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)
scaler = MinMaxScaler()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
X_scaled = scaler.fit_transform(X) # Normalize input data
X_tensor = torch.tensor(X_scaled, dtype=torch.float32).to(device)
nan_mask = torch.isnan(X_tensor)
column_means = torch.nanmean(X_tensor, dim=0)
X_tensor[nan_mask] = column_means.expand_as(X_tensor)[nan_mask]
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.impute import KNNImputer
# Convert to torch tensors and reshape
X_train_t = torch.tensor(X_train.values, dtype=torch.float32).view(X_train.shape[0], 1, X_train.shape[1])
X_val_t = torch.tensor(X_val.values, dtype=torch.float32).view(X_val.shape[0], 1, X_val.shape[1])
x_train_reshaped = X_train_t.view(-1, X_train_t.shape[-1]).numpy()
imputer = KNNImputer(n_neighbors=3, weights="uniform")
x_train_imputed = imputer.fit_transform(x_train_reshaped)
X_train_t = torch.tensor(x_train_imputed).view(9407, 1, 10)
# Convert labels (Pandas Series to NumPy arrays, then to tensors)
num_classes = 6
y_train_t = torch.tensor(y_train.values, dtype=torch.long)
print(torch.unique(y_train_t))
y_train_t = torch.clamp(y_train_t, 0, num_classes - 1)
y_val_t = torch.tensor(y_val.values, dtype=torch.long)
class TransformerModel(nn.Module):
def __init__(self, input_dim, num_classes, d_model=64, nhead=4, num_layers=2, dim_feedforward=128, dropout=0.1):
super(TransformerModel, self).__init__()
# Positional encoding for sequences
self.positional_encoding = nn.Parameter(torch.zeros(1, 100, d_model)) # Max 100 timesteps, adjust as needed
# Embedding layer to project input features into d_model dimensions
self.input_embedding = nn.Linear(input_dim, d_model)
# Transformer encoder
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
# Fully connected classification head
self.fc = nn.Linear(d_model, num_classes)
def forward(self, x):
"""
x: [batch_size, timesteps, input_dim]
"""
# Apply positional encoding
seq_len = x.size(1)
x = self.input_embedding(x) + self.positional_encoding[:, :seq_len, :] # [batch_size, timesteps, d_model]
# Transformer encoder expects input as [sequence_length, batch_size, d_model]
x = x.permute(1, 0, 2) # [timesteps, batch_size, d_model]
x = self.transformer_encoder(x) # [timesteps, batch_size, d_model]
# Pooling (take the mean over the timesteps)
x = x.mean(dim=0) # [batch_size, d_model]
# Fully connected layer for classification
x = self.fc(x) # [batch_size, num_classes]
return x
from torchmetrics import F1Score, Recall
# If multi-class:
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#class_weights = torch.tensor([1.97, 3.65, 4.62, 10, 10], dtype=torch.float32).to(device)
#criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)
# Initialize model
model = TransformerModel(
input_dim=10,
num_classes=5,
d_model=128, #or 256
nhead=8,
num_layers=6,
dim_feedforward=256,
dropout=0.3
)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
num_classes = 6
# Initialize metrics
f1_metric = F1Score(num_classes=num_classes, average="macro", task='multiclass') # Macro for multi-class
recall_metric = Recall(num_classes=num_classes, average="macro", task='multiclass')
# Training loop
for epoch in range(110):
model.train()
optimizer.zero_grad()
logits = model(X_train_t)
loss = criterion(logits, y_train_t)
loss.backward()
optimizer.step()
# Validation accuracy
model.eval()
with torch.no_grad():
val_logits = model(X_val_t)
val_preds = torch.argmax(val_logits, dim=1)
val_accuracy = (val_preds == y_val_t).float().mean().item()
print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Validation Accuracy: {val_accuracy:.4f}, "
f"F1 Score: {f1_score:.4f}, Recall: {recall:.4f}")