Una guida completa per scrivere i tuoi Transformers |  di Benjamin Etienne |  Febbraio 2024

 | Intelligenza-Artificiale

Un’implementazione end-to-end di Pytorch Transformer, in cui tratteremo concetti chiave come auto-attenzione, codificatori, decodificatori e molto altro.

fotografato da Susan Holt Simpson SU Unsplash

Quando ho deciso di approfondire le architetture Transformer, spesso mi sono sentito frustrato leggendo o guardando tutorial online perché sentivo che mancava sempre qualcosa:

  • I tutorial ufficiali di Tensorflow o Pytorch utilizzavano le proprie API, rimanendo così di alto livello e costringendomi a dover accedere alla loro base di codice per vedere cosa c’era sotto. Richiede molto tempo e non è sempre facile leggere migliaia di righe di codice.
  • Altri tutorial con codice personalizzato che ho trovato (link alla fine dell’articolo) spesso semplificavano eccessivamente i casi d’uso e non affrontavano concetti come il mascheramento della gestione batch di sequenze di lunghezza variabile.

Ho quindi deciso di scrivere il mio Transformer per assicurarmi di aver compreso i concetti e poterlo utilizzare con qualsiasi set di dati.

Nel corso di questo articolo seguiremo quindi un approccio metodico in cui implementeremo un trasformatore strato per strato e blocco per blocco.

Ovviamente ci sono molte implementazioni diverse, nonché API di alto livello di Pytorch o Tensorflow già disponibili in commercio, con, ne sono sicuro, prestazioni migliori rispetto al modello che costruiremo.

“Ok, ma allora perché non utilizzare le implementazioni TF/Pytorch”?

Lo scopo di questo articolo è educativo e non ho alcuna pretesa di battere le implementazioni di Pytorch o Tensorflow. Credo che la teoria e il codice alla base dei trasformatori non siano semplici, ecco perché spero che seguire questo tutorial passo passo ti consentirà di comprendere meglio questi concetti e di sentirti più a tuo agio quando costruisci il tuo codice Dopo.

Un altro motivo per costruire il tuo trasformatore da zero è che ti consentirà di comprendere appieno come utilizzare le API di cui sopra. Se guardiamo l’implementazione Pytorch di forward() metodo della classe Transformer, vedrai molte parole chiave oscure come:

fonte : Documenti Pytorch

Se hai già familiarità con queste parole chiave, puoi tranquillamente saltare questo articolo.

Altrimenti, questo articolo ti guiderà attraverso ciascuna di queste parole chiave con i concetti sottostanti.

Se hai già sentito parlare di ChatGPT o Gemini, allora hai già incontrato un trasformatore. In realtà, la “T” di ChatGPT sta per Transformer.

L’architettura è stata coniata per la prima volta nel 2017 dai ricercatori di Google nel documento “L’attenzione è tutto ciò di cui hai bisogno”. È piuttosto rivoluzionario in quanto i modelli precedenti utilizzati per l’apprendimento sequenza-sequenza (traduzione automatica, discorso-testo, ecc…) si basavano su RNN che erano computazionalmente costosi nel senso che dovevano elaborare le sequenze passo dopo passo, mentre Transformers basta guardare una volta l’intera sequenza, spostando la complessità temporale da O(n) a O(1).

(Vaswani et al., 2017)

Le applicazioni dei trasformatori sono piuttosto ampie nel campo della PNL e includono la traduzione linguistica, la risposta alle domande, il riepilogo dei documenti, la generazione di testi, ecc.

L’architettura complessiva di un trasformatore è la seguente:

fonte

Il primo blocco che implementeremo è in realtà la parte più importante di un Transformer e si chiama Multi-head Attention. Vediamo dove si colloca nell’architettura complessiva

fonte

L’attenzione è un meccanismo che in realtà non è specifico dei trasformatori e che era già utilizzato nei modelli sequenza-sequenza delle RNN.

Attenzione in un trasformatore (fonte: Tensorflow documentazione)
Attenzione in un trasformatore (fonte: Tensorflow documentazione)
import torch
import torch.nn as nn
import math

class MultiHeadAttention(nn.Module):
def __init__(self, hidden_dim=256, num_heads=4):
"""
input_dim: Dimensionality of the input.
num_heads: The number of attention heads to split the input into.
"""
super(MultiHeadAttention, self).__init__()
self.hidden_dim = hidden_dim
self.num_heads = num_heads
assert hidden_dim % num_heads == 0, "Hidden dim must be divisible by num heads"
self.Wv = nn.Linear(hidden_dim, hidden_dim, bias=False) # the Value part
self.Wk = nn.Linear(hidden_dim, hidden_dim, bias=False) # the Key part
self.Wq = nn.Linear(hidden_dim, hidden_dim, bias=False) # the Query part
self.Wo = nn.Linear(hidden_dim, hidden_dim, bias=False) # the output layer

def check_sdpa_inputs(self, x):
assert x.size(1) == self.num_heads, f"Expected size of x to be ({-1, self.num_heads, -1, self.hidden_dim // self.num_heads}), got {x.size()}"
assert x.size(3) == self.hidden_dim // self.num_heads

def scaled_dot_product_attention(
self,
query,
key,
value,
attention_mask=None,
key_padding_mask=None):
"""
query : tensor of shape (batch_size, num_heads, query_sequence_length, hidden_dim//num_heads)
key : tensor of shape (batch_size, num_heads, key_sequence_length, hidden_dim//num_heads)
value : tensor of shape (batch_size, num_heads, key_sequence_length, hidden_dim//num_heads)
attention_mask : tensor of shape (query_sequence_length, key_sequence_length)
key_padding_mask : tensor of shape (sequence_length, key_sequence_length)

"""
self.check_sdpa_inputs(query)
self.check_sdpa_inputs(key)
self.check_sdpa_inputs(value)

d_k = query.size(-1)
tgt_len, src_len = query.size(-2), key.size(-2)

# logits = (B, H, tgt_len, E) * (B, H, E, src_len) = (B, H, tgt_len, src_len)
logits = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

# Attention mask here
if attention_mask is not None:
if attention_mask.dim() == 2:
assert attention_mask.size() == (tgt_len, src_len)
attention_mask = attention_mask.unsqueeze(0)
logits = logits + attention_mask
else:
raise ValueError(f"Attention mask size {attention_mask.size()}")

# Key mask here
if key_padding_mask is not None:
key_padding_mask = key_padding_mask.unsqueeze(1).unsqueeze(2) # Broadcast over batch size, num heads
logits = logits + key_padding_mask

attention = torch.softmax(logits, dim=-1)
output = torch.matmul(attention, value) # (batch_size, num_heads, sequence_length, hidden_dim)

return output, attention

def split_into_heads(self, x, num_heads):
batch_size, seq_length, hidden_dim = x.size()
x = x.view(batch_size, seq_length, num_heads, hidden_dim // num_heads)

return x.transpose(1, 2) # Final dim will be (batch_size, num_heads, seq_length, , hidden_dim // num_heads)

def combine_heads(self, x):
batch_size, num_heads, seq_length, head_hidden_dim = x.size()
return x.transpose(1, 2).contiguous().view(batch_size, seq_length, num_heads * head_hidden_dim)

def forward(
self,
q,
k,
v,
attention_mask=None,
key_padding_mask=None):
"""
q : tensor of shape (batch_size, query_sequence_length, hidden_dim)
k : tensor of shape (batch_size, key_sequence_length, hidden_dim)
v : tensor of shape (batch_size, key_sequence_length, hidden_dim)
attention_mask : tensor of shape (query_sequence_length, key_sequence_length)
key_padding_mask : tensor of shape (sequence_length, key_sequence_length)

"""
q = self.Wq(q)
k = self.Wk(k)
v = self.Wv(v)

q = self.split_into_heads(q, self.num_heads)
k = self.split_into_heads(k, self.num_heads)
v = self.split_into_heads(v, self.num_heads)

# attn_values, attn_weights = self.multihead_attn(q, k, v, attn_mask=attention_mask)
attn_values, attn_weights = self.scaled_dot_product_attention(
query=q,
key=k,
value=v,
attention_mask=attention_mask,
key_padding_mask=key_padding_mask,
)
grouped = self.combine_heads(attn_values)
output = self.Wo(grouped)

self.attention_weigths = attn_weights

return output

Qui dobbiamo spiegare alcuni concetti.

1) Query, chiavi e valori.

IL domanda sono le informazioni che stai cercando di abbinare,
IL chiave E valori sono le informazioni memorizzate.

Pensalo come se stessi usando un dizionario: ogni volta che usi un dizionario Python, se la tua query non corrisponde alle chiavi del dizionario, non ti verrà restituito nulla. Ma cosa succede se vogliamo che il nostro dizionario restituisca una miscela di informazioni abbastanza simili? Come se avessimo:

d = {"panther": 1, "bear": 10, "dog":3}
d("wolf") = 0.2*d("panther") + 0.7*d("dog") + 0.1*d("bear")

L’attenzione consiste fondamentalmente in questo: osservare diverse parti dei tuoi dati e fonderle per ottenere una sintesi come risposta alla tua domanda.

La parte rilevante del codice è questa, dove calcoliamo i pesi di attenzione tra la query e le chiavi

logits = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k) # we compute the weights of attention

E questo, dove applichiamo i pesi normalizzati ai valori:

attention = torch.softmax(logits, dim=-1)
output = torch.matmul(attention, value) # (batch_size, num_heads, sequence_length, hidden_dim)

2) Attenzione mascheratura e imbottitura

Quando ci occupiamo di parti di un input sequenziale, non vogliamo includere informazioni inutili o proibite.

Le informazioni inutili sono ad esempio il riempimento: i simboli di riempimento, utilizzati per allineare tutte le sequenze in un batch alla stessa dimensione della sequenza, dovrebbero essere ignorati dal nostro modello. Torneremo su questo nell’ultima sezione

Le informazioni proibite sono un po’ più complesse. Durante l’addestramento, un modello impara a codificare la sequenza di input e ad allineare i target agli input. Tuttavia, poiché il processo di inferenza prevede l’esame dei token emessi in precedenza per prevedere quello successivo (si pensi alla generazione di testo in ChatGPT), dobbiamo applicare le stesse regole durante l’addestramento.

Questo è il motivo per cui applichiamo a maschera causale per garantire che i target, in ogni fase temporale, possano vedere solo informazioni del passato. Ecco la sezione corrispondente in cui viene applicata la maschera (il calcolo della maschera viene trattato alla fine)

if attention_mask is not None:
if attention_mask.dim() == 2:
assert attention_mask.size() == (tgt_len, src_len)
attention_mask = attention_mask.unsqueeze(0)
logits = logits + attention_mask

Corrisponde alla seguente parte del Transformer:

Quando riceve e tratta un input, un trasformatore non ha senso dell’ordine poiché guarda la sequenza nel suo insieme, al contrario di quanto fanno gli RNN. Dobbiamo quindi aggiungere un accenno di ordine temporale in modo che il trasformatore possa apprendere le dipendenze.

I dettagli specifici su come funziona la codifica posizionale non rientrano nell’ambito di questo articolo, ma sentiti libero di leggere il documento originale per capire.

# Taken from https://pytorch.org/tutorials/beginner/transformer_tutorial.html#define-the-model
class PositionalEncoding(nn.Module):

def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)

pe = torch.zeros(max_len, d_model)
position = torch.arange(max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

pe(:, 0::2) = torch.sin(position * div_term)
pe(:, 1::2) = torch.cos(position * div_term)
pe = pe.unsqueeze(0)

self.register_buffer('pe', pe)

def forward(self, x):
"""
Arguments:
x: Tensor, shape ``(batch_size, seq_len, embedding_dim)``
"""
x = x + self.pe(:, :x.size(1), :)
return x

Ci stiamo avvicinando ad avere un codificatore completamente funzionante! L’encoder è la parte sinistra del Transformer

Aggiungeremo una piccola parte al nostro codice, che è la parte Feed Forward:

class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model: int, d_ff: int):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()

def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))

Mettendo insieme i pezzi, otteniamo un modulo Encoder!

class EncoderBlock(nn.Module):
def __init__(self, n_dim: int, dropout: float, n_heads: int):
super(EncoderBlock, self).__init__()
self.mha = MultiHeadAttention(hidden_dim=n_dim, num_heads=n_heads)
self.norm1 = nn.LayerNorm(n_dim)
self.ff = PositionWiseFeedForward(n_dim, n_dim)
self.norm2 = nn.LayerNorm(n_dim)
self.dropout = nn.Dropout(dropout)

def forward(self, x, src_padding_mask=None):
assert x.ndim==3, "Expected input to be 3-dim, got {}".format(x.ndim)
att_output = self.mha(x, x, x, key_padding_mask=src_padding_mask)
x = x + self.dropout(self.norm1(att_output))

ff_output = self.ff(x)
output = x + self.norm2(ff_output)

return output

Come mostrato nel diagramma, l’Encoder contiene in realtà N blocchi o livelli di Encoder, nonché un livello di incorporamento per i nostri input. Creiamo quindi un Encoder aggiungendo i blocchi Embedding, Positional Encoding ed Encoder:

class Encoder(nn.Module):
def __init__(
self,
vocab_size: int,
n_dim: int,
dropout: float,
n_encoder_blocks: int,
n_heads: int):

super(Encoder, self).__init__()
self.n_dim = n_dim

self.embedding = nn.Embedding(
num_embeddings=vocab_size,
embedding_dim=n_dim
)
self.positional_encoding = PositionalEncoding(
d_model=n_dim,
dropout=dropout
)
self.encoder_blocks = nn.ModuleList((
EncoderBlock(n_dim, dropout, n_heads) for _ in range(n_encoder_blocks)
))

def forward(self, x, padding_mask=None):
x = self.embedding(x) * math.sqrt(self.n_dim)
x = self.positional_encoding(x)
for block in self.encoder_blocks:
x = block(x=x, src_padding_mask=padding_mask)
return x

La parte del decodificatore è la parte a sinistra e richiede un po’ più di lavorazione.

C’è qualcosa chiamato Attenzione multi-testa mascherata. Ricorda cosa abbiamo detto prima a riguardo maschera causale ? Ebbene questo accade qui. Utilizzeremo il parametro Attention_mask del nostro modulo di attenzione multi-testa per rappresentare questo (maggiori dettagli su come calcoliamo la maschera alla fine):


# Stuff before

self.self_attention = MultiHeadAttention(hidden_dim=n_dim, num_heads=n_heads)
masked_att_output = self.self_attention(
q=tgt,
k=tgt,
v=tgt,
attention_mask=tgt_mask, <-- HERE IS THE CAUSAL MASK
key_padding_mask=tgt_padding_mask)

# Stuff after

Viene richiamata la seconda attenzione attenzione incrociata. Utilizzerà la query del decodificatore per abbinarla alla chiave e ai valori del codificatore! Attenzione: possono avere lunghezze diverse durante l’allenamento, quindi di solito è una buona pratica definire chiaramente le forme previste degli input come segue:

def scaled_dot_product_attention(
self,
query,
key,
value,
attention_mask=None,
key_padding_mask=None):
"""
query : tensor of shape (batch_size, num_heads, query_sequence_length, hidden_dim//num_heads)
key : tensor of shape (batch_size, num_heads, key_sequence_length, hidden_dim//num_heads)
value : tensor of shape (batch_size, num_heads, key_sequence_length, hidden_dim//num_heads)
attention_mask : tensor of shape (query_sequence_length, key_sequence_length)
key_padding_mask : tensor of shape (sequence_length, key_sequence_length)

"""

Ed ecco la parte in cui utilizziamo l’output dell’encoder, chiamata memoriacon l’ingresso del nostro decoder:

# Stuff before
self.cross_attention = MultiHeadAttention(hidden_dim=n_dim, num_heads=n_heads)
cross_att_output = self.cross_attention(
q=x1,
k=memory,
v=memory,
attention_mask=None, <-- NO CAUSAL MASK HERE
key_padding_mask=memory_padding_mask) <-- WE NEED TO USE THE PADDING OF THE SOURCE
# Stuff after

Mettendo insieme i pezzi, otteniamo questo per il decoder:

class DecoderBlock(nn.Module):
def __init__(self, n_dim: int, dropout: float, n_heads: int):
super(DecoderBlock, self).__init__()

# The first Multi-Head Attention has a mask to avoid looking at the future
self.self_attention = MultiHeadAttention(hidden_dim=n_dim, num_heads=n_heads)
self.norm1 = nn.LayerNorm(n_dim)

# The second Multi-Head Attention will take inputs from the encoder as key/value inputs
self.cross_attention = MultiHeadAttention(hidden_dim=n_dim, num_heads=n_heads)
self.norm2 = nn.LayerNorm(n_dim)

self.ff = PositionWiseFeedForward(n_dim, n_dim)
self.norm3 = nn.LayerNorm(n_dim)
# self.dropout = nn.Dropout(dropout)

def forward(self, tgt, memory, tgt_mask=None, tgt_padding_mask=None, memory_padding_mask=None):

masked_att_output = self.self_attention(
q=tgt, k=tgt, v=tgt, attention_mask=tgt_mask, key_padding_mask=tgt_padding_mask)
x1 = tgt + self.norm1(masked_att_output)

cross_att_output = self.cross_attention(
q=x1, k=memory, v=memory, attention_mask=None, key_padding_mask=memory_padding_mask)
x2 = x1 + self.norm2(cross_att_output)

ff_output = self.ff(x2)
output = x2 + self.norm3(ff_output)

return output

class Decoder(nn.Module):
def __init__(
self,
vocab_size: int,
n_dim: int,
dropout: float,
max_seq_len: int,
n_decoder_blocks: int,
n_heads: int):

super(Decoder, self).__init__()

self.embedding = nn.Embedding(
num_embeddings=vocab_size,
embedding_dim=n_dim
)

self.positional_encoding = PositionalEncoding(
d_model=n_dim,
dropout=dropout
)

self.decoder_blocks = nn.ModuleList((
DecoderBlock(n_dim, dropout, n_heads) for _ in range(n_decoder_blocks)
))

def forward(self, tgt, memory, tgt_mask=None, tgt_padding_mask=None, memory_padding_mask=None):
x = self.embedding(tgt)
x = self.positional_encoding(x)

for block in self.decoder_blocks:
x = block(x, memory, tgt_mask=tgt_mask, tgt_padding_mask=tgt_padding_mask, memory_padding_mask=memory_padding_mask)
return x

Ricorda la sezione sull’attenzione multi-testa in cui abbiamo menzionato l’esclusione di alcune parti degli input quando si fa attenzione.

Durante l’addestramento, consideriamo lotti di input e obiettivi, in cui ciascuna istanza può avere una lunghezza variabile. Considera il seguente esempio in cui mettiamo insieme 4 parole: banana, anguria, pera, mirtillo. Per elaborarli come un unico batch, dobbiamo allineare tutte le parole alla lunghezza della parola più lunga (anguria). Aggiungeremo quindi un ulteriore token, PAD, a ciascuna parola in modo che abbiano tutte la stessa lunghezza di watermelon.

Nell’immagine seguente, la tabella superiore rappresenta i dati grezzi, la tabella inferiore la versione codificata:

(immagine dell’autore)

Nel nostro caso, vogliamo escludere gli indici di riempimento dai pesi dell’attenzione calcolati. Possiamo quindi calcolare una maschera come segue, sia per i dati di origine che per quelli di destinazione:

padding_mask = (x == PAD_IDX)

Che dire delle maschere causali adesso? Ebbene, se vogliamo, ad ogni passo temporale, che il modello possa frequentare solo passi nel passato, ciò significa che per ogni passo temporale T, il modello può frequentare solo ogni passo t per t in 1…T. È un doppio ciclo for, possiamo quindi utilizzare una matrice per calcolarlo:

(immagine dell’autore)
def generate_square_subsequent_mask(size: int):
"""Generate a triangular (size, size) mask. From PyTorch docs."""
mask = (1 - torch.triu(torch.ones(size, size), diagonal=1)).bool()
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask

Costruiamo ora il nostro Transformer riunendo le parti!

Nel nostro caso d’uso, utilizzeremo un set di dati molto semplice per mostrare come i Transformers effettivamente apprendono.

“Ma perché usare un trasformatore per invertire le parole? So già come farlo in Python con word(::-1)!”

L’obiettivo qui è vedere se il meccanismo di attenzione del Transformer funziona. Ciò che ci aspettiamo è vedere i pesi dell’attenzione spostarsi da destra a sinistra quando viene data una sequenza di input. Se è così, significa che il nostro Transformer ha imparato una grammatica molto semplice, che consiste semplicemente nella lettura da destra a sinistra, e potrebbe generalizzare a grammatiche più complesse durante la traduzione in una lingua reale.

Iniziamo innanzitutto con la nostra classe Transformer personalizzata:

import torch
import torch.nn as nn
import math

from .encoder import Encoder
from .decoder import Decoder

class Transformer(nn.Module):
def __init__(self, **kwargs):
super(Transformer, self).__init__()

for k, v in kwargs.items():
print(f" * {k}={v}")

self.vocab_size = kwargs.get('vocab_size')
self.model_dim = kwargs.get('model_dim')
self.dropout = kwargs.get('dropout')
self.n_encoder_layers = kwargs.get('n_encoder_layers')
self.n_decoder_layers = kwargs.get('n_decoder_layers')
self.n_heads = kwargs.get('n_heads')
self.batch_size = kwargs.get('batch_size')
self.PAD_IDX = kwargs.get('pad_idx', 0)

self.encoder = Encoder(
self.vocab_size, self.model_dim, self.dropout, self.n_encoder_layers, self.n_heads)
self.decoder = Decoder(
self.vocab_size, self.model_dim, self.dropout, self.n_decoder_layers, self.n_heads)
self.fc = nn.Linear(self.model_dim, self.vocab_size)

@staticmethod
def generate_square_subsequent_mask(size: int):
"""Generate a triangular (size, size) mask. From PyTorch docs."""
mask = (1 - torch.triu(torch.ones(size, size), diagonal=1)).bool()
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask

def encode(
self,
x: torch.Tensor,
) -> torch.Tensor:
"""
Input
x: (B, S) with elements in (0, C) where C is num_classes
Output
(B, S, E) embedding
"""

mask = (x == self.PAD_IDX).float()
encoder_padding_mask = mask.masked_fill(mask == 1, float('-inf'))

# (B, S, E)
encoder_output = self.encoder(
x,
padding_mask=encoder_padding_mask
)

return encoder_output, encoder_padding_mask

def decode(
self,
tgt: torch.Tensor,
memory: torch.Tensor,
memory_padding_mask=None
) -> torch.Tensor:
"""
B = Batch size
S = Source sequence length
L = Target sequence length
E = Model dimension

Input
encoded_x: (B, S, E)
y: (B, L) with elements in (0, C) where C is num_classes
Output
(B, L, C) logits
"""

mask = (tgt == self.PAD_IDX).float()
tgt_padding_mask = mask.masked_fill(mask == 1, float('-inf'))

decoder_output = self.decoder(
tgt=tgt,
memory=memory,
tgt_mask=self.generate_square_subsequent_mask(tgt.size(1)),
tgt_padding_mask=tgt_padding_mask,
memory_padding_mask=memory_padding_mask,
)
output = self.fc(decoder_output) # shape (B, L, C)
return output

def forward(
self,
x: torch.Tensor,
y: torch.Tensor,
) -> torch.Tensor:
"""
Input
x: (B, Sx) with elements in (0, C) where C is num_classes
y: (B, Sy) with elements in (0, C) where C is num_classes
Output
(B, L, C) logits
"""

# Encoder output shape (B, S, E)
encoder_output, encoder_padding_mask = self.encode(x)

# Decoder output shape (B, L, C)
decoder_output = self.decode(
tgt=y,
memory=encoder_output,
memory_padding_mask=encoder_padding_mask
)

return decoder_output

Esecuzione di inferenza con la decodifica greedy

Dobbiamo aggiungere un metodo che fungerà da famoso model.predict di scikit.learn. L’obiettivo è chiedere al modello di produrre dinamicamente previsioni dato un input. Durante l’inferenza, non esiste un target: il modello inizia emettendo un token prestando attenzione all’output e utilizza la propria previsione per continuare a emettere token. Questo è il motivo per cui questi modelli sono spesso chiamati modelli autoregressivi, poiché utilizzano le previsioni passate per prevedere quella successiva.

Il problema con la decodifica greedy è che ad ogni passaggio considera il token con la probabilità più alta. Ciò può portare a previsioni pessime se i primi token sono completamente sbagliati. Esistono altri metodi di decodifica, come la ricerca Beam, che considerano un elenco ristretto di sequenze candidate (si pensi a mantenere i token top-k in ogni passaggio temporale invece dell’argmax) e restituiscono la sequenza con la probabilità totale più alta.

Per ora, implementiamo la decodifica greedy e aggiungiamola al nostro modello Transformer:

def predict(
self,
x: torch.Tensor,
sos_idx: int=1,
eos_idx: int=2,
max_length: int=None
) -> torch.Tensor:
"""
Method to use at inference time. Predict y from x one token at a time. This method is greedy
decoding. Beam search can be used instead for a potential accuracy boost.

Input
x: str
Output
(B, L, C) logits
"""

# Pad the tokens with beginning and end of sentence tokens
x = torch.cat((
torch.tensor((sos_idx)),
x,
torch.tensor((eos_idx)))
).unsqueeze(0)

encoder_output, mask = self.transformer.encode(x) # (B, S, E)

if not max_length:
max_length = x.size(1)

outputs = torch.ones((x.size()(0), max_length)).type_as(x).long() * sos_idx
for step in range(1, max_length):
y = outputs(:, :step)
probs = self.transformer.decode(y, encoder_output)
output = torch.argmax(probs, dim=-1)

# Uncomment if you want to see step by step predicitons
# print(f"Knowing {y} we output {output(:, -1)}")

if output(:, -1).detach().numpy() in (eos_idx, sos_idx):
break
outputs(:, step) = output(:, -1)

return outputs

Creazione dei dati del giocattolo

Definiamo un piccolo set di dati che inverte le parole, il che significa che “helloworld” restituirà “dlrowolleh”:

import numpy as np
import torch
from torch.utils.data import Dataset

np.random.seed(0)

def generate_random_string():
len = np.random.randint(10, 20)
return "".join((chr(x) for x in np.random.randint(97, 97+26, len)))

class ReverseDataset(Dataset):
def __init__(self, n_samples, pad_idx, sos_idx, eos_idx):
super(ReverseDataset, self).__init__()
self.pad_idx = pad_idx
self.sos_idx = sos_idx
self.eos_idx = eos_idx
self.values = (generate_random_string() for _ in range(n_samples))
self.labels = (x(::-1) for x in self.values)

def __len__(self):
return len(self.values) # number of samples in the dataset

def __getitem__(self, index):
return self.text_transform(self.values(index).rstrip("\n")), \
self.text_transform(self.labels(index).rstrip("\n"))

def text_transform(self, x):
return torch.tensor((self.sos_idx) + (ord(z)-97+3 for z in x) + (self.eos_idx)

Definiremo ora le fasi di formazione e valutazione:

PAD_IDX = 0
SOS_IDX = 1
EOS_IDX = 2

def train(model, optimizer, loader, loss_fn, epoch):
model.train()
losses = 0
acc = 0
history_loss = ()
history_acc = ()

with tqdm(loader, position=0, leave=True) as tepoch:
for x, y in tepoch:
tepoch.set_description(f"Epoch {epoch}")

optimizer.zero_grad()
logits = model(x, y(:, :-1))
loss = loss_fn(logits.contiguous().view(-1, model.vocab_size), y(:, 1:).contiguous().view(-1))
loss.backward()
optimizer.step()
losses += loss.item()

preds = logits.argmax(dim=-1)
masked_pred = preds * (y(:, 1:)!=PAD_IDX)
accuracy = (masked_pred == y(:, 1:)).float().mean()
acc += accuracy.item()

history_loss.append(loss.item())
history_acc.append(accuracy.item())
tepoch.set_postfix(loss=loss.item(), accuracy=100. * accuracy.item())

return losses / len(list(loader)), acc / len(list(loader)), history_loss, history_acc

def evaluate(model, loader, loss_fn):
model.eval()
losses = 0
acc = 0
history_loss = ()
history_acc = ()

for x, y in tqdm(loader, position=0, leave=True):

logits = model(x, y(:, :-1))
loss = loss_fn(logits.contiguous().view(-1, model.vocab_size), y(:, 1:).contiguous().view(-1))
losses += loss.item()

preds = logits.argmax(dim=-1)
masked_pred = preds * (y(:, 1:)!=PAD_IDX)
accuracy = (masked_pred == y(:, 1:)).float().mean()
acc += accuracy.item()

history_loss.append(loss.item())
history_acc.append(accuracy.item())

return losses / len(list(loader)), acc / len(list(loader)), history_loss, history_acc

E addestra il modello per un paio di epoche:

def collate_fn(batch):
"""
This function pads inputs with PAD_IDX to have batches of equal length
"""
src_batch, tgt_batch = (), ()
for src_sample, tgt_sample in batch:
src_batch.append(src_sample)
tgt_batch.append(tgt_sample)

src_batch = pad_sequence(src_batch, padding_value=PAD_IDX, batch_first=True)
tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX, batch_first=True)
return src_batch, tgt_batch

# Model hyperparameters
args = {
'vocab_size': 128,
'model_dim': 128,
'dropout': 0.1,
'n_encoder_layers': 1,
'n_decoder_layers': 1,
'n_heads': 4
}

# Define model here
model = Transformer(**args)

# Instantiate datasets
train_iter = ReverseDataset(50000, pad_idx=PAD_IDX, sos_idx=SOS_IDX, eos_idx=EOS_IDX)
eval_iter = ReverseDataset(10000, pad_idx=PAD_IDX, sos_idx=SOS_IDX, eos_idx=EOS_IDX)
dataloader_train = DataLoader(train_iter, batch_size=256, collate_fn=collate_fn)
dataloader_val = DataLoader(eval_iter, batch_size=256, collate_fn=collate_fn)

# During debugging, we ensure sources and targets are indeed reversed
# s, t = next(iter(dataloader_train))
# print(s(:4, ...))
# print(t(:4, ...))
# print(s.size())

# Initialize model parameters
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)

# Define loss function : we ignore logits which are padding tokens
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.98), eps=1e-9)

# Save history to dictionnary
history = {
'train_loss': (),
'eval_loss': (),
'train_acc': (),
'eval_acc': ()
}

# Main loop
for epoch in range(1, 4):
start_time = time.time()
train_loss, train_acc, hist_loss, hist_acc = train(model, optimizer, dataloader_train, loss_fn, epoch)
history('train_loss') += hist_loss
history('train_acc') += hist_acc
end_time = time.time()
val_loss, val_acc, hist_loss, hist_acc = evaluate(model, dataloader_val, loss_fn)
history('eval_loss') += hist_loss
history('eval_acc') += hist_acc
print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Train acc: {train_acc:.3f}, Val loss: {val_loss:.3f}, Val acc: {val_acc:.3f} "f"Epoch time = {(end_time - start_time):.3f}s"))

Visualizza l’attenzione

Definiamo una piccola funzione per accedere ai pesi delle teste di attenzione:

fig = plt.figure(figsize=(10., 10.))
images = model.decoder.decoder_blocks(0).cross_attention.attention_weigths(0,...).detach().numpy()
grid = ImageGrid(fig, 111, # similar to subplot(111)
nrows_ncols=(2, 2), # creates 2x2 grid of axes
axes_pad=0.1, # pad between axes in inch.
)

for ax, im in zip(grid, images):
# Iterating over the grid returns the Axes.
ax.imshow(im)

immagine dell’autore

Possiamo vedere un bel modello da destra a sinistra, quando leggiamo i pesi dall’alto. Le parti verticali nella parte inferiore dell’asse y possono sicuramente rappresentare pesi mascherati a causa della maschera di imbottitura

Testare il nostro modello!

Per testare il nostro modello con nuovi dati, ne definiremo un po’ Translator class per aiutarci con la decodifica:

class Translator(nn.Module):
def __init__(self, transformer):
super(Translator, self).__init__()
self.transformer = transformer

@staticmethod
def str_to_tokens(s):
return (ord(z)-97+3 for z in s)

@staticmethod
def tokens_to_str(tokens):
return "".join((chr(x+94) for x in tokens))

def __call__(self, sentence, max_length=None, pad=False):

x = torch.tensor(self.str_to_tokens(sentence))

outputs = self.transformer.predict(sentence)

return self.tokens_to_str(outputs(0))

Dovresti essere in grado di vedere quanto segue:

E se stampiamo la testa dell’attenzione osserveremo quanto segue:

fig = plt.figure()
images = model.decoder.decoder_blocks(0).cross_attention.attention_weigths(0,...).detach().numpy().mean(axis=0)

fig, ax = plt.subplots(1,1, figsize=(10., 10.))
# Iterating over the grid returs the Axes.
ax.set_yticks(range(len(out)))
ax.set_xticks(range(len(sentence)))

ax.xaxis.set_label_position('top')

ax.set_xticklabels(iter(sentence))
ax.set_yticklabels((f"step {i}" for i in range(len(out))))
ax.imshow(images)

immagine dell’autore

Possiamo vedere chiaramente che il modello va da destra a sinistra quando invertiamo la nostra frase “reversethis”! (Il passaggio 0 riceve effettivamente il token dell’inizio della frase).

Questo è tutto, ora puoi scrivere Transformer e utilizzarlo con set di dati più grandi per eseguire la traduzione automatica di creare il tuo BERT, ad esempio!

Volevo che questo tutorial ti mostrasse gli avvertimenti quando si scrive un Transformer: il riempimento e il mascheramento sono forse le parti che richiedono maggiore attenzione (gioco di parole non intenzionale) poiché definiranno la buona prestazione del modello durante l’inferenza.

Nei seguenti articoli vedremo come creare il tuo modello BERT e come utilizzare Equinox, una libreria altamente performante basata su JAX.

Rimani sintonizzato !

(+) “Il trasformatore annotato”
(+) “Trasformatori da zero
(+) “Traduzione automatica neurale con Transformer e Keras”
(+) “Il Trasformatore Illustrato”
(+) Tutorial sul deep learning dell’Università di Amsterdam
(+) Tutorial Pytorch su Transformers

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *