# reference # https://blog.floydhub.com/the-transformer-in-pytorch/ # https://github.com/hyunwoongko/transformer for the transformer architecture # https://github.com/Whiax/BERT-Transformer-Pytorch/blob/main/train.py (norm layer first) # https://github.com/karpathy/nanoGPT import torch import torch.nn as nn import torch.nn.functional as F from torch.optim.lr_scheduler import _LRScheduler import numpy as np import copy def new_gelu(x): """ Implementation of the GELU activation function currently in Google BERT repo (identical to OpenAI GPT). Reference: Gaussian Error Linear Units (GELU) paper: https://arxiv.org/abs/1606.08415 """ return 0.5 * x * (1.0 + torch.tanh(np.sqrt(2.0 / np.pi) * (x + 0.044715 * torch.pow(x, 3.0)))) # https://github.com/jadore801120/attention-is-all-you-need-pytorch/blob/fec78a687210851f055f792d45300d27cc60ae41/transformer/Modules.py class ScaledDotProductAttention(nn.Module): def __init__(self, temperature, dropout=0.1): super().__init__() self.temperature = temperature self.dropout = nn.Dropout(dropout) def forward(self, q, k, v, mask=None): attn = torch.matmul(q / self.temperature, k.transpose(-2, -1)) if mask is not None: attn = attn.masked_fill(mask == 0, -1e9) attn = F.softmax(attn, dim=-1) attn = self.dropout(attn) output = torch.matmul(attn, v) return output class CausalMultiHeadAttention(nn.Module): def __init__(self, heads, d_model, block_size, dropout=0.1): super().__init__() self.d_model = d_model self.d_k = d_model // heads self.h = heads self.q_linear = nn.Linear(d_model, d_model, bias=False) self.v_linear = nn.Linear(d_model, d_model, bias=False) self.k_linear = nn.Linear(d_model, d_model, bias=False) self.attention = ScaledDotProductAttention(temperature=self.d_k**0.5) # self.dropout = nn.Dropout(dropout) self.out = nn.Linear(d_model, d_model, bias=False) # causal mask self.register_buffer("causal_mask", torch.tril(torch.ones(block_size, block_size)) .view(1, 1, block_size, block_size)) self.dropout = nn.Dropout(dropout) def forward(self, q, k, v): bs, T, C = q.size() # perform linear operation and split into h heads k = self.k_linear(k).view(bs, -1, self.h, self.d_k) q = self.q_linear(q).view(bs, -1, self.h, self.d_k) v = self.v_linear(v).view(bs, -1, self.h, self.d_k) # transpose to get dimension of bs * h * sl * d_model k = k.transpose(1,2) q = q.transpose(1,2) v = v.transpose(1,2) # causal_mask mask = self.causal_mask[:,:,:T,:T] # calculate attention attn = self.attention(q, k, v, mask) # concatenate heads and put trough final linear layer concat = attn.transpose(1,2).contiguous().view(bs, -1, self.d_model) output = self.dropout(self.out(concat)) return output class FeedForward(nn.Module): def __init__(self, d_model, dropout=0.1): super().__init__() # we set d_ff as a default to 2048 self.linear_1 = nn.Linear(d_model, 4 * d_model) self.dropout = nn.Dropout(dropout) self.linear_2 = nn.Linear(4 * d_model, d_model) def forward(self, x): x = self.linear_1(x) x = new_gelu(x) x = self.linear_2(x) x = self.dropout(x) return x # the implementation reference https://www.arxiv-vanity.com/papers/1911.03179/ class Block(nn.Module): def __init__(self, d_model, heads, block_size, dropout=0.1): super().__init__() self.norm_1 = nn.LayerNorm(d_model, eps=1e-6) self.norm_2 = nn.LayerNorm(d_model, eps=1e-6) self.attn = CausalMultiHeadAttention(heads, d_model, block_size) self.ff = FeedForward(d_model) # self.dropout_1 = nn.Dropout(dropout) # self.dropout_2 = nn.Dropout(dropout) def forward(self, x): # normalize x2 = self.norm_1(x) # compute self attention x2 = self.attn(x2, x2, x2) # x2 = self.dropout_1(x2) # residual x = x + x2 # normalize x2= self.norm_2(x) # positionwise feed forward network x2 = self.ff(x2) # x2 = self.dropout_2(x2) # residual x = x + x2 return x # layer multiplier def get_clones(module, N): return nn.ModuleList([copy.deepcopy(module)for i in range(N)]) class GPT(nn.Module): def __init__(self, vocab_size, d_model, N, heads, block_size=80, dropout=0.1): super().__init__() self.N = N self.embed = nn.Embedding(vocab_size, d_model) # self.pe = nn.Embedding(block_size, d_model) self.pe = nn.Parameter(torch.zeros(1, block_size, d_model)) self.dropout = nn.Dropout(dropout) self.layers = get_clones(Block(d_model, heads, block_size), N) self.norm = nn.LayerNorm(d_model, eps=1e-6) self.out = nn.Linear(d_model, vocab_size, bias=False) self.apply(self._init_weights) def _init_weights(self, module): if isinstance(module, (nn.Linear, nn.Embedding)): module.weight.data.normal_(mean=0.0, std=0.02) if isinstance(module, nn.Linear) and module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.LayerNorm): module.bias.data.zero_() module.weight.data.fill_(1.0) def forward(self, src): b, t = src.size() # pos = torch.arange(0, t, dtype=torch.long, device=device).unsqueeze(0) # shape (1, t) tok_emb = self.embed(src) #pos_emb = self.pe(pos) position_embeddings = self.pe[:, :t, :] x = tok_emb + position_embeddings x = self.dropout(x) x = self.norm(x) for i in range(self.N): x = self.layers[i](x) x = self.norm(x) x = self.out(x) return x class Scheduler(_LRScheduler): def __init__(self, optimizer, dim_embed, warmpup_steps, last_epoch=-1, verbose=False): self.dim_embed = dim_embed self.warmup_steps = warmpup_steps self.num_param_groups = len(optimizer.param_groups) super().__init__(optimizer, last_epoch, verbose) def get_lr(self): lr = self.dim_embed**(-0.5) * min(self._step_count**(-0.5),self._step_count * self.warmup_steps**(-1.5)) return [lr] * self.num_param_groups