-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
106 lines (85 loc) · 3.63 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#encoding=utf8
import torch
import torch.nn as nn
import numpy as np
import torch.optim as optim
use_cuda = torch.cuda.is_available()
class ScaledDotProductAttention(nn.Module):
"""Scaled dot-product attention mechanism."""
def __init__(self, attention_dropout=0.0):
super(ScaledDotProductAttention, self).__init__()
self.dropout = nn.Dropout(attention_dropout)
self.softmax = nn.Softmax(dim=1)
def forward(self, q, k, v, scale=None, attn_mask=None):
attention = torch.matmul(q, k.transpose(0, 1))
if scale:
attention = attention * scale
if attn_mask is not None:
# 给需要mask的地方设置一个负无穷
attention = attention.masked_fill_(attn_mask, -np.inf)
# 计算softmax
attention = self.softmax(attention)
# 添加dropout
attention = self.dropout(attention)
# 和V做点积
context = torch.matmul(attention, v)
return context, attention
class MultiHeadAttention(nn.Module):
def __init__(self, model_dim=512, num_heads=8, dropout=0.0):
super(MultiHeadAttention, self).__init__()
self.dim_per_head = model_dim // num_heads
self.num_heads = num_heads
self.linear_k = nn.Linear(model_dim, self.dim_per_head * num_heads)
self.linear_v = nn.Linear(model_dim, self.dim_per_head * num_heads)
self.linear_q = nn.Linear(model_dim, self.dim_per_head * num_heads)
self.dot_product_attention = ScaledDotProductAttention(dropout)
self.linear_final = nn.Linear(model_dim, model_dim)
self.dropout = nn.Dropout(dropout)
# multi-head attention之后需要做layer norm
self.layer_norm = nn.LayerNorm(model_dim)
def forward(self, key, value, query, attn_mask=None):
# 残差连接
residual = query
dim_per_head = self.dim_per_head
num_heads = self.num_heads
batch_size = key.size(0)
# linear projection
key = self.linear_k(key)
value = self.linear_v(value)
query = self.linear_q(query)
# split by heads
key = key.view(batch_size * num_heads, -1, dim_per_head)
value = value.view(batch_size * num_heads, -1, dim_per_head)
query = query.view(batch_size * num_heads, -1, dim_per_head)
if attn_mask is not None:
attn_mask = attn_mask.repeat(num_heads, 1, 1)
# scaled dot product attention
scale = (key.size(-1) // num_heads) ** -0.5
context, attention = self.dot_product_attention(query, key, value, scale, attn_mask)
# concat heads
context = context.view(batch_size, -1, dim_per_head * num_heads)
# final linear projection
output = self.linear_final(context)
# dropout
output = self.dropout(output)
# add residual and norm layer
output = residual + self.layer_norm(output) #self.layer_norm(residual + output)
return output, attention
def to_bow(data, min_length):
"""Convert index lists to bag of words representation of documents."""
vect = [np.bincount(x[x != 0].astype('int'), minlength=min_length)
for x in data]
return np.array(vect)
def getOptimizer(name,parameters,**kwargs):
if name == 'sgd':
return optim.SGD(parameters,**kwargs)
elif name == 'adadelta':
return optim.Adadelta(parameters,**kwargs)
elif name == 'adam':
return optim.Adam(parameters,**kwargs)
elif name == 'adagrad':
return optim.Adagrad(parameters,**kwargs)
elif name == 'rmsprop':
return optim.RMSprop(parameters,**kwargs)
else:
raise Exception('Optimizer Name Error')