编辑
2024-09-14
入门基础
00

目录

导包
嵌入层表示
多头注意力机制
前馈层
层归一化
编码器
解码器
Transformer
Train

65f26289870cb15ee3f1216b_transformer_resideual_layer_norm_3.png

导包

python
import torch from torch import nn import math from torch.nn import functional as F import copy

嵌入层表示

python
#输入 class PositionEncoder(nn.Module): def __init__(self,d_model,max_seq_len=80): super().__init__() self.d_model=d_model pe=torch.zeros(max_seq_len,d_model) #初始化位置编码0 for pos in range(max_seq_len): for i in range(0,d_model,2): pe[pos,i]=math.sin(pos / (10000**((2*i)/d_model))) pe[pos,i+1]=math.cos(pos / (10000**((2*(i+1))/d_model))) #位置编码 pe=pe.unsqueeze(0) #加一个维度,因为数据一般会以(batch_size,seq_len,d_model)进入,方便广播机制 self.register_buffer('pe',pe) def forward(self,x): x=x*math.sqrt(self.d_model) #增大原有词向量,减少位置编码对原有语义的影响 seq_len=x.size(1) #第二个维度的长度 x=x+torch.tensor(self.pe[:,:seq_len],requires_grad=False).cuda() return x

多头注意力机制

python
#输入head、d_model、dropout、q k v class MultiHeadAtention(nn.Module): def __init__(self,heads,d_model,dropout=0.1): super().__init__() self.d_model=d_model self.d_k=d_model//heads self.h=heads self.q_linear=nn.Linear(d_model,d_model) self.k_linear=nn.Linear(d_model,d_model) self.v_linear=nn.Linear(d_model,d_model) #将嵌入向量线性变换为q、k、v。你可能会好奇为什么是d_model,而不是d_k。因为后面会拆分为多头的 self.dropout=nn.Dropout(dropout) self.out=nn.Linear(d_model,d_model) def attention(q,k,v,d_k,mask=None,dropout=None): scores=torch.matmul(q,k.transpose(-2,-1)/math.sqrt(d_k)) if mask is not None: mask=mask.unsqueeze(1) scores=scores.masked_fill(mask==0,-1e9) scores=F.softmax(scores,dim=-1) if dropout is not None: scores=dropout(scores) output=torch.matmul(scores,v) return output def forward(self,q,k,v,mask=None): #输入序列(batch_size,seq_len,d_model) bs=q.size(0) #确定batch_size k=self.k_linear(k).view(bs,-1,self.h,self.d_k) q=self.q_linear(q).view(bs,-1,self.h,self.d_k) v=self.v_linear(v).view(bs,-1,self.h,self.d_k) #经过qkv变换,拆分多头,序列(batch_size,seq_len,heads,d_k) q=q.transpose(1,2) k=k.transpose(1,2) v=v.transpose(1,2) #序列(batch_size,heads,seq_len,d_k)转化为多头的形式,方便计算attention scores=self.attention(q,k,v,self.d_k,mask,self.dropout) #序列(batch_size,heads,seq_len,d_k)维度不变,但是d_k部分的数值会经历softmax concat=scores.transpose(1,2).contiguous().view(bs,-1,self.d_model) #转化为序列(batch_size,seq_len,heads,d_k),方便多头合并。 #再进行重排,得到序列(batch_size,seq_len,d_model) output=self.out(concat) #(batch_size,seq_len,d_model)经历线性层,最终的输出意义是注意力值,且维度输入与输出一致! return output

前馈层

python
#输入d_model、d_ff、dropout class FeedForward(nn.Module): def __init__(self,d_model,d_ff=2048,dropout=0.1): super().__init__() self.linear_1=nn.Linear(d_model,d_ff) self.dropout=nn.Dropout(dropout) self.linear_2=nn.Linear(d_ff,d_model) def forward(self,x): x=self.dropout(F.relu(self.linear_1(x))) x=self.linear_2(x) return x #线性层 + relu + dropout + 线性层

层归一化

python
#输入d_model class NormLayer(nn.Module): def __init__(self,d_model,eps=1e-6): super().__init___() self.size=d_model self.alpha=nn.Parameter(torch.ones(self.size)) self.bias=nn.Parameter(torch.ones(self.size)) self.eps=eps def forward(self,x): norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) / (x.std(dim=-1, keepdim=True) + self.eps) + self.bias return norm

编码器

python
class EncoderLayer(nn.Module): def __init__(self,d_model,heads,dropout=0.1): super().__init__() self.norm_1=NormLayer(d_model) self.norm_2=NormLayer(d_model) self.attn=MultiHeadAtention(heads,d_model,dropout=dropout) self.ff=FeedForward(d_model,dropout=dropout) self.dropout_1=nn.Dropout(dropout) self.dropout_2=nn.Dropout(dropout) def forward(self,x,mask): x2=self.norm_1(x) x=x+self.dropout_1(self.attn(x2,x2,x2,mask)) x=self.norm_2(x) x=x+self.dropout_2(self.ff(x2)) return x class Encoder(nn.Module): def __init__(self,vocab_size,d_model,N,heads,dropout): super().__init__() self.N=N self.embed=None self.pe=PositionEncoder(d_model,dropout=dropout) self.layers=get_clones(EncoderLayer(d_model,heads,dropout),N) self.norm=NormLayer(d_model) def forward(self,src,mask): x=self.embed(src) x=self.pe(x) for i in range(self.N): x=self.layers[i](x,mask) return self.norm(x)

解码器

python
class DecoderLayer(nn.Module): def __init__(self,vocab_size,d_model,N,heads,dropout): super().__init__() self.norm_1=NormLayer(d_model) self.norm_2=NormLayer(d_model) self.norm_3=NormLayer(d_model) self.dropout_1=nn.Dropout(dropout) self.dropout_2=nn.Dropout(dropout) self.dropout_3=nn.Dropout(dropout) self.attn_1=MultiHeadAtention(heads,d_model,dropout=dropout) self.attn_2=MultiHeadAtention(heads,d_model,dropout=dropout) self.ff=FeedForward(d_model,dropout=dropout) def forward(self,x,e_outputs,src_mask,tar_mask): x2=self.norm_1(x) x=x+self.dropout_1(self.attn_1(x2,x2,x2,tar_mask)) x2=self.norm_2(x) x=x+self.dropout_2(self.attn_2(x2,e_outputs,e_outputs,src_mask)) x2=self.norm_3(x) x=x+self.dropout_3(self.ff(x2)) return x class Decoder(nn.Module): def __init__(self,vocab_size, d_model, N, heads, dropout): super().__init__() self.N=N self.embed=None self.pe=PositionEncoder(d_model,dropout=dropout) self.layers=get_clones(DecoderLayer(d_model,heads,dropout),N) self.norm=NormLayer(d_model) def forward(self,trg,e_outputs,src_mask,trg_mask): x=self.embed(trg) x=self.pe(x) for i in range(self.N): x=self.layers[i](x,e_outputs,src_mask,trg_mask) return self.norm(x)

Transformer

python
class Transformer(nn.Module): def __init__(self,src_vocab,trg_vocab,d_model,N,heads,dropout): super().__init__() self.encoder=Encoder(src_vocab, d_model, N, heads, dropout) self.decoder=Decoder(trg_vocab, d_model, N, heads, dropout) self.out=nn.Linear(d_model, trg_vocab) def forward(self,src,trg,src_mask,trg_mask): e_outputs=self.encoder(src,src_mask) d_output=self.decoder(trg, e_outputs, src_mask, trg_mask) output=self.out(d_output) return output

Train

python
# 模型参数定义 d_model = 512 heads = 8 N = 6 src_vocab = len(EN_TEXT.vocab) trg_vocab = len(FR_TEXT.vocab) model = Transformer(src_vocab, trg_vocab, d_model, N, heads) for p in model.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9) # 训练模型 def train_model(epochs, print_every=100): model.train() start = time.time() temp = start total_loss = 0 for epoch in range(epochs): for i, batch in enumerate(train_iter): src = batch.English.transpose(0, 1) trg = batch.French.transpose(0, 1) # The French sentence we input has all words except the last, as it is using each word to predict the next trg_input = trg[:, :-1] # The words we are trying to predict targets = trg[:, 1:].contiguous().view(-1) # Create function to make masks using mask code above src_mask, trg_mask = create_masks(src, trg_input) preds = model(src, trg_input, src_mask, trg_mask) optim.zero_grad() loss = F.cross_entropy(preds.view(-1, preds.size(-1)), targets, ignore_index=target_pad) loss.backward() optim.step() total_loss += loss.data[0] if (i + 1) % print_every == 0: loss_avg = total_loss / print_every print("time = %dm, epoch %d, iter = %d, loss = %.3f, %d s per %d iters" % ( (time.time() - start) // 60, epoch + 1, i + 1, loss_avg, (time.time() - start) // (i + 1), print_every)) total_loss = 0 temp = time.time()

本文作者:Bob

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!