pythonimport torch
from torch import nn
import math
from torch.nn import functional as F
import copy
python#输入
class PositionEncoder(nn.Module):
def __init__(self,d_model,max_seq_len=80):
super().__init__()
self.d_model=d_model
pe=torch.zeros(max_seq_len,d_model)
#初始化位置编码0
for pos in range(max_seq_len):
for i in range(0,d_model,2):
pe[pos,i]=math.sin(pos / (10000**((2*i)/d_model)))
pe[pos,i+1]=math.cos(pos / (10000**((2*(i+1))/d_model)))
#位置编码
pe=pe.unsqueeze(0)
#加一个维度,因为数据一般会以(batch_size,seq_len,d_model)进入,方便广播机制
self.register_buffer('pe',pe)
def forward(self,x):
x=x*math.sqrt(self.d_model)
#增大原有词向量,减少位置编码对原有语义的影响
seq_len=x.size(1)
#第二个维度的长度
x=x+torch.tensor(self.pe[:,:seq_len],requires_grad=False).cuda()
return x
python#输入head、d_model、dropout、q k v
class MultiHeadAtention(nn.Module):
def __init__(self,heads,d_model,dropout=0.1):
super().__init__()
self.d_model=d_model
self.d_k=d_model//heads
self.h=heads
self.q_linear=nn.Linear(d_model,d_model)
self.k_linear=nn.Linear(d_model,d_model)
self.v_linear=nn.Linear(d_model,d_model)
#将嵌入向量线性变换为q、k、v。你可能会好奇为什么是d_model,而不是d_k。因为后面会拆分为多头的
self.dropout=nn.Dropout(dropout)
self.out=nn.Linear(d_model,d_model)
def attention(q,k,v,d_k,mask=None,dropout=None):
scores=torch.matmul(q,k.transpose(-2,-1)/math.sqrt(d_k))
if mask is not None:
mask=mask.unsqueeze(1)
scores=scores.masked_fill(mask==0,-1e9)
scores=F.softmax(scores,dim=-1)
if dropout is not None:
scores=dropout(scores)
output=torch.matmul(scores,v)
return output
def forward(self,q,k,v,mask=None):
#输入序列(batch_size,seq_len,d_model)
bs=q.size(0)
#确定batch_size
k=self.k_linear(k).view(bs,-1,self.h,self.d_k)
q=self.q_linear(q).view(bs,-1,self.h,self.d_k)
v=self.v_linear(v).view(bs,-1,self.h,self.d_k)
#经过qkv变换,拆分多头,序列(batch_size,seq_len,heads,d_k)
q=q.transpose(1,2)
k=k.transpose(1,2)
v=v.transpose(1,2)
#序列(batch_size,heads,seq_len,d_k)转化为多头的形式,方便计算attention
scores=self.attention(q,k,v,self.d_k,mask,self.dropout)
#序列(batch_size,heads,seq_len,d_k)维度不变,但是d_k部分的数值会经历softmax
concat=scores.transpose(1,2).contiguous().view(bs,-1,self.d_model)
#转化为序列(batch_size,seq_len,heads,d_k),方便多头合并。
#再进行重排,得到序列(batch_size,seq_len,d_model)
output=self.out(concat)
#(batch_size,seq_len,d_model)经历线性层,最终的输出意义是注意力值,且维度输入与输出一致!
return output
python#输入d_model、d_ff、dropout
class FeedForward(nn.Module):
def __init__(self,d_model,d_ff=2048,dropout=0.1):
super().__init__()
self.linear_1=nn.Linear(d_model,d_ff)
self.dropout=nn.Dropout(dropout)
self.linear_2=nn.Linear(d_ff,d_model)
def forward(self,x):
x=self.dropout(F.relu(self.linear_1(x)))
x=self.linear_2(x)
return x
#线性层 + relu + dropout + 线性层
python#输入d_model
class NormLayer(nn.Module):
def __init__(self,d_model,eps=1e-6):
super().__init___()
self.size=d_model
self.alpha=nn.Parameter(torch.ones(self.size))
self.bias=nn.Parameter(torch.ones(self.size))
self.eps=eps
def forward(self,x):
norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) / (x.std(dim=-1, keepdim=True) + self.eps) + self.bias
return norm
pythonclass EncoderLayer(nn.Module):
def __init__(self,d_model,heads,dropout=0.1):
super().__init__()
self.norm_1=NormLayer(d_model)
self.norm_2=NormLayer(d_model)
self.attn=MultiHeadAtention(heads,d_model,dropout=dropout)
self.ff=FeedForward(d_model,dropout=dropout)
self.dropout_1=nn.Dropout(dropout)
self.dropout_2=nn.Dropout(dropout)
def forward(self,x,mask):
x2=self.norm_1(x)
x=x+self.dropout_1(self.attn(x2,x2,x2,mask))
x=self.norm_2(x)
x=x+self.dropout_2(self.ff(x2))
return x
class Encoder(nn.Module):
def __init__(self,vocab_size,d_model,N,heads,dropout):
super().__init__()
self.N=N
self.embed=None
self.pe=PositionEncoder(d_model,dropout=dropout)
self.layers=get_clones(EncoderLayer(d_model,heads,dropout),N)
self.norm=NormLayer(d_model)
def forward(self,src,mask):
x=self.embed(src)
x=self.pe(x)
for i in range(self.N):
x=self.layers[i](x,mask)
return self.norm(x)
pythonclass DecoderLayer(nn.Module):
def __init__(self,vocab_size,d_model,N,heads,dropout):
super().__init__()
self.norm_1=NormLayer(d_model)
self.norm_2=NormLayer(d_model)
self.norm_3=NormLayer(d_model)
self.dropout_1=nn.Dropout(dropout)
self.dropout_2=nn.Dropout(dropout)
self.dropout_3=nn.Dropout(dropout)
self.attn_1=MultiHeadAtention(heads,d_model,dropout=dropout)
self.attn_2=MultiHeadAtention(heads,d_model,dropout=dropout)
self.ff=FeedForward(d_model,dropout=dropout)
def forward(self,x,e_outputs,src_mask,tar_mask):
x2=self.norm_1(x)
x=x+self.dropout_1(self.attn_1(x2,x2,x2,tar_mask))
x2=self.norm_2(x)
x=x+self.dropout_2(self.attn_2(x2,e_outputs,e_outputs,src_mask))
x2=self.norm_3(x)
x=x+self.dropout_3(self.ff(x2))
return x
class Decoder(nn.Module):
def __init__(self,vocab_size, d_model, N, heads, dropout):
super().__init__()
self.N=N
self.embed=None
self.pe=PositionEncoder(d_model,dropout=dropout)
self.layers=get_clones(DecoderLayer(d_model,heads,dropout),N)
self.norm=NormLayer(d_model)
def forward(self,trg,e_outputs,src_mask,trg_mask):
x=self.embed(trg)
x=self.pe(x)
for i in range(self.N):
x=self.layers[i](x,e_outputs,src_mask,trg_mask)
return self.norm(x)
pythonclass Transformer(nn.Module):
def __init__(self,src_vocab,trg_vocab,d_model,N,heads,dropout):
super().__init__()
self.encoder=Encoder(src_vocab, d_model, N, heads, dropout)
self.decoder=Decoder(trg_vocab, d_model, N, heads, dropout)
self.out=nn.Linear(d_model, trg_vocab)
def forward(self,src,trg,src_mask,trg_mask):
e_outputs=self.encoder(src,src_mask)
d_output=self.decoder(trg, e_outputs, src_mask, trg_mask)
output=self.out(d_output)
return output
python# 模型参数定义
d_model = 512
heads = 8
N = 6
src_vocab = len(EN_TEXT.vocab)
trg_vocab = len(FR_TEXT.vocab)
model = Transformer(src_vocab, trg_vocab, d_model, N, heads)
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
# 训练模型
def train_model(epochs, print_every=100):
model.train()
start = time.time()
temp = start
total_loss = 0
for epoch in range(epochs):
for i, batch in enumerate(train_iter):
src = batch.English.transpose(0, 1)
trg = batch.French.transpose(0, 1)
# The French sentence we input has all words except the last, as it is using each word to predict the next
trg_input = trg[:, :-1]
# The words we are trying to predict
targets = trg[:, 1:].contiguous().view(-1)
# Create function to make masks using mask code above
src_mask, trg_mask = create_masks(src, trg_input)
preds = model(src, trg_input, src_mask, trg_mask)
optim.zero_grad()
loss = F.cross_entropy(preds.view(-1, preds.size(-1)), targets, ignore_index=target_pad)
loss.backward()
optim.step()
total_loss += loss.data[0]
if (i + 1) % print_every == 0:
loss_avg = total_loss / print_every
print("time = %dm, epoch %d, iter = %d, loss = %.3f, %d s per %d iters" % (
(time.time() - start) // 60, epoch + 1, i + 1, loss_avg,
(time.time() - start) // (i + 1), print_every))
total_loss = 0
temp = time.time()
本文作者:Bob
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!