Una guida completa su come velocizzare l’addestramento dei tuoi modelli con Distributed Data Parallel (DDP)

12 minuti di lettura

23 ore fa

Immagine dell’autore

Ciao a tutti! Sono Francois, ricercatore presso Meta. Benvenuti in questa nuova parte tutorial della serie Fantastici tutorial sull’intelligenza artificiale.

In questo tutorial demistificheremo una tecnica ben nota chiamata DDP per addestrare modelli su più GPU contemporaneamente.

Durante i miei giorni alla scuola di ingegneria, ricordo di aver sfruttato le GPU di Google Colab per la formazione. Tuttavia, in ambito aziendale, il panorama è diverso. Se fai parte di un’organizzazione che ha investito molto nell’intelligenza artificiale, in particolare se fai parte di un gigante della tecnologia, probabilmente hai una vasta gamma di cluster GPU a tua disposizione.

Questa sessione mira a fornirti le conoscenze necessarie per sfruttare la potenza di più GPU, consentendo una formazione rapida ed efficiente. E indovina cosa? È più semplice di quanto potresti pensare! Prima di procedere, ti consiglio di avere una buona conoscenza di PyTorch, inclusi i suoi componenti principali come set di dati, dataloader, ottimizzatori, CUDA e il ciclo di addestramento.

Inizialmente, vedevo il DDP come uno strumento complesso, quasi irraggiungibile, pensando che avrebbe richiesto un grande team per creare l’infrastruttura necessaria. Tuttavia, ti assicuro che DDP non è solo intuitivo ma anche conciso e richiede solo una manciata di righe di codice per essere implementato. Intraprendiamo insieme questo viaggio illuminante!

Distributed Data Parallel (DDP) è un concetto semplice una volta analizzato. Immagina di avere a disposizione un cluster con 4 GPU. Con DDP, su ogni GPU viene caricato lo stesso modello, ottimizzatore incluso. La differenziazione principale risiede nel modo in cui distribuiamo i dati.

DDP, immagine presa da PyTorch tutorial

Se hai familiarità con il deep learning, ricorderai DataLoader, uno strumento che segmenta il tuo set di dati in batch distinti. La norma è frammentare l’intero set di dati in questi batch, aggiornando il modello dopo il calcolo di ciascun batch.

Ingrandendo ulteriormente, DDP perfeziona questo processo dividendo ciascun lotto in quelli che possiamo definire “sottobatch”. In sostanza, ogni replica del modello elabora un segmento del batch primario, risultando in un calcolo del gradiente distinto per ciascuna GPU.

In DDP dividiamo questo lotto in sottolotti attraverso uno strumento chiamato a Campionatore distribuitocome illustrato nel seguente disegno:

DDP, immagine presa da PyTorch tutorial

Al momento della distribuzione di ciascun sottolotto alle singole GPU, ogni GPU calcola il proprio gradiente unico.

DDP, immagine presa da PyTorch tutorial
  • Ora arriva la magia del DDP. Prima di aggiornare i parametri del modello, i gradienti calcolati su ciascuna GPU devono essere aggregati in modo che per ogni GPU venga calcolato il gradiente medio sull’intero batch di dati.
  • Questo viene fatto prendendo i gradienti da tutte le GPU e calcolandone la media. Ad esempio, se disponi di 4 GPU, il gradiente medio per un particolare parametro del modello è la somma dei gradienti per quel parametro su ciascuna delle 4 GPU diviso per 4.
  • DDP utilizza il file NCCL O Gloo backend (NCCL è ottimizzato per GPU NVIDIA, Gloo è più generale) per comunicare in modo efficiente e mediare i gradienti tra le GPU.
DDP, immagine presa da PyTorch tutorial

Prima di immergerci nel codice, è fondamentale comprendere il vocabolario che utilizzeremo frequentemente. Demistifiamo questi termini:

  • Node: Pensa a un nodo come a una macchina potente dotata di più GPU. Quando parliamo di cluster, non si tratta semplicemente di un insieme di GPU messe insieme. Invece lo sono organizzati in gruppi o “nodi”. Ad esempio, un nodo potrebbe ospitare 8 GPU.
  • Master Node: In un ambiente multi-nodo, in genere un nodo prende il comando. Questo “nodo principale” gestisce attività come la sincronizzazione, l’avvio di copie del modello, la supervisione del caricamento del modello e la gestione delle voci di registro. Senza un nodo master, ciascuna GPU genererebbe log in modo indipendente, portando al caos.
  • Local Rank: Il termine “rango” può essere paragonato a un ID o a una posizione. Il rango locale si riferisce alla posizione o all’ID di una GPU all’interno del suo nodo (o macchina) specifico. È “locale” perché è limitato a quella particolare macchina.
  • Global Rank: Prendendo una prospettiva più ampia, la classifica globale identifica una GPU su tutti i nodi disponibili. È un identificatore univoco indipendentemente dalla macchina.
  • World Size: Fondamentalmente, questo è un conteggio di tutti GPU disponibili su tutti i nodi. Semplicemente, è il prodotto del numero di nodi e del numero di GPU in ciascun nodo.

Per mettere le cose in prospettiva, se lavori con una sola macchina, le cose sono più semplici poiché il rango locale equivale al rango globale.

Per chiarirlo con un’immagine:

Classifica locale, immagine da tutorial
Classifica locale, immagine da tutorial

Distributed Data Parallel (DDP) ha apportato trasformazioni in molti flussi di lavoro di deep learning, ma è essenziale comprenderne i confini.

Il nocciolo della limitazione del DDP risiede nella sua consumo di memoria. Con il DDP, ciascuna GPU carica una replica del modello, dell’ottimizzatore e del rispettivo batch di dati. Le memorie GPU in genere vanno da pochi GB a 80 GB per le GPU di fascia alta.

Per i modelli più piccoli, questo non è un problema. Tuttavia, quando ci si avventura nel regno dei Large Language Models (LLM) o di architetture simili a GPT, i confini della memoria di una singola GPU potrebbero essere inadeguati.

Nella visione artificiale, sebbene esista una miriade di modelli leggeri, sorgono sfide quando aumento delle dimensioni dei lottisoprattutto in scenari che coinvolgono Immagini 3D o attività di rilevamento oggetti.

Accedi al Parallelo dati completamente condivisi (FSDP). Questo metodo estende i vantaggi del DDP non solo distribuendo i dati ma anche disperdendo gli stati del modello e dell’ottimizzatore nelle memorie della GPU. Anche se ciò sembra vantaggioso, FSDP aumenta la comunicazione tra GPU, rallentando potenzialmente l’addestramento.

In sintesi:

  • Se il tuo modello e il lotto corrispondente si adattano comodamente alla memoria di una GPU, DDP è la soluzione migliore grazie alla sua velocità.
  • Per i modelli di dimensioni gigantesche che richiedono più memoria, FSDP è una scelta più adatta. Tuttavia, tieni presente il suo compromesso: stai sacrificando la velocità per la memoria.

Se vai sul sito web di PyTorch, in realtà ci sono le opzioni: DP e DDP. Ma lo menziono solo per non perderti o confonderti: Basta usare DDP, è più veloce e non è limitato a un singolo nodo.

Confronto da Pytorch tutorial

Implementare il deep learning distribuito è più semplice di quanto si possa pensare. La bellezza sta nel fatto che non sarai impantanato nelle configurazioni manuali della GPU o nelle complessità della distribuzione del gradiente.

Troverai tutto il template e lo script su:

Ecco una ripartizione dei passaggi che eseguiremo:

  1. Inizializzazione del processo: comporta la designazione del nodo master, la specifica della porta e l’impostazione del world_size.
  2. Configurazione del DataLoader distribuito: fondamentale per questo passaggio è il partizionamento di ciascun batch tra le GPU disponibili. Faremo in modo che i dati siano distribuiti uniformemente senza alcuna sovrapposizione.
  3. Addestramento/test del modello: in sostanza, questo passaggio rimane sostanzialmente invariato rispetto al processo con GPU singola.

Formazione su 1 GPU 1 nodo (baseline)

Per prima cosa definiamo un codice vanilla che carica un set di dati, crea un modello e lo addestra end-to-end su una singola GPU. Questo sarà il nostro punto di partenza:

import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

class WineDataset(Dataset):
def __init__(self, data, targets):
self.data = data
self.targets = targets

def __len__(self):
return len(self.data)

def __getitem__(self, idx):
return torch.tensor(self.data(idx), dtype=torch.float), torch.tensor(self.targets(idx), dtype=torch.long)

class SimpleNN(torch.nn.Module):
def __init__(self):
super(SimpleNN, self).__init__()
self.fc1 = torch.nn.Linear(13, 64)
self.fc2 = torch.nn.Linear(64, 3)

def forward(self, x):
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x

class Trainer():
def __init__(self, model, train_data, optimizer, gpu_id, save_every):
self.model = model
self.train_data = train_data
self.optimizer = optimizer
self.gpu_id = gpu_id
self.save_every = save_every
self.losses = ()

def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()
return loss.item()

def _run_epoch(self, epoch):
total_loss = 0.0
num_batches = len(self.train_data)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
loss = self._run_batch(source, targets)
total_loss += loss

avg_loss = total_loss / num_batches
self.losses.append(avg_loss)
print(f"Epoch {epoch}, Loss: {avg_loss:.4f}")

def _save_checkpoint(self, epoch):
checkpoint = self.model.state_dict()
PATH = f"model_{epoch}.pt"
torch.save(checkpoint, PATH)
print(f"Epoch {epoch} | Model saved to {PATH}")

def train(self, max_epochs):
self.model.train()
for epoch in range(max_epochs):
self._run_epoch(epoch)
if epoch % self.save_every == 0:
self._save_checkpoint(epoch)

def load_train_objs():
wine_data = load_wine()
X = wine_data.data
y = wine_data.target

# Normalize and split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

train_set = WineDataset(X_train, y_train)
test_set = WineDataset(X_test, y_test)

print("Sample from dataset:")
sample_data, sample_target = train_set(0)
print(f"Data: {sample_data}")
print(f"Target: {sample_target}")

model = SimpleNN()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

return train_set, model, optimizer

def prepare_dataloader(dataset, batch_size):
return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)

def main(device, total_epochs, save_every, batch_size):
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, device, save_every)
trainer.train(total_epochs)

main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)

Formazione su più GPU, 1 Nodo

Ora utilizzeremo tutte le GPU in un singolo nodo con i seguenti passaggi:

  1. Importa le librerie necessarie per la formazione distribuita.
  2. Inizializzare l’ambiente distribuito: (in particolare il MASTER_ADDR E MASTER_PORT
  3. Avvolgi il modello con DDP utilizzando il file DistributedDataParallel involucro.
  4. Utilizza Distributed Sampler per garantire che il set di dati sia suddiviso tra le GPU in modo distribuito.
  5. Regola la funzione principale su spawn processi multipli per l’addestramento multi-GPU.

Per le biblioteche, abbiamo bisogno di questo:

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

Quindi dobbiamo impostare ciascun processo. Ad esempio se abbiamo 8 GPU su 1 Nodo, chiameremo 8 volte le seguenti funzioni, una per ogni GPU e con il tasto destro local_rank:

def ddp_setup(rank, world_size):
"""
Set up the distributed environment.

Args:
rank: The rank of the current process. Unique identifier for each process in the distributed training.
world_size: Total number of processes participating in the distributed training.
"""

# Address of the main node. Since we are doing single-node training, it's set to localhost.
os.environ("MASTER_ADDR") = "localhost"

# Port on which the master node is expected to listen for communications from workers.
os.environ("MASTER_PORT") = "12355"

# Initialize the process group.
# 'backend' specifies the communication backend to be used, "nccl" is optimized for GPU training.
init_process_group(backend="nccl", rank=rank, world_size=world_size)

# Set the current CUDA device to the specified device (identified by rank).
# This ensures that each process uses a different GPU in a multi-GPU setup.
torch.cuda.set_device(rank)

Alcune spiegazioni sulla funzione:

  • MASTER_ADDR è il nome host della macchina su cui è in esecuzione il master (o il processo di rango 0). Qui è localhost
  • MASTER_PORT: specifica la porta su cui il master è in ascolto per le connessioni dai lavoratori o da altri processi. 12355 è arbitrario. Puoi scegliere qualsiasi numero di porta inutilizzato purché non venga utilizzato da un altro servizio sul tuo sistema e sia consentito dalle regole del firewall.
  • torch.cuda.set_device(rank): Ciò garantisce che ciascun processo utilizzi la GPU corrispondente

Quindi dobbiamo modificare leggermente la classe Trainer. Avvolgeremo semplicemente il modello con la funzione DDP:

class Trainer():
def __init__(self, model, train_data, optimizer, gpu_id, save_every):
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.gpu_id = gpu_id
self.save_every = save_every
self.losses = ()

# This changes
self.model = DDP(self.model, device_ids=(gpu_id))

Il resto della lezione per Allenatori è lo stesso, fantastico!

Ora dobbiamo cambiare il dataloader, perché ricorda, dobbiamo dividere il batch su ciascuna GPU:

def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)

Ora possiamo modificare il file main funzione, che verrà chiamata per ogni processo (quindi 8 volte nel nostro caso):

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
"""
Main training function for distributed data parallel (DDP) setup.

Args:
rank (int): The rank of the current process (0 <= rank < world_size). Each process is assigned a unique rank.
world_size (int): Total number of processes involved in the distributed training.
save_every (int): Frequency of model checkpoint saving, in terms of epochs.
total_epochs (int): Total number of epochs for training.
batch_size (int): Number of samples processed in one iteration (forward and backward pass).
"""

# Set up the distributed environment, including setting the master address, port, and backend.
ddp_setup(rank, world_size)

# Load the necessary training objects - dataset, model, and optimizer.
dataset, model, optimizer = load_train_objs()

# Prepare the data loader for distributed training. It partitions the dataset across the processes and handles shuffling.
train_data = prepare_dataloader(dataset, batch_size)

# Initialize the trainer instance with the loaded model, data, and other configurations.
trainer = Trainer(model, train_data, optimizer, rank, save_every)

# Train the model for the specified number of epochs.
trainer.train(total_epochs)

# Cleanup the distributed environment after training is complete.
destroy_process_group()

E infine, durante l’esecuzione dello script, dovremo avviare gli 8 processi. Questo viene fatto con il mp.spawn() funzione:

if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()

world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

Ultimo passo: allenamento su più nodi

Se sei arrivato fin qui, congratulazioni! Il passo definitivo è riuscire a reclutare tutte le GPU disponibili su diversi nodi. Ma se hai capito cosa abbiamo fatto finora, è molto semplice.

La distinzione chiave quando si scala su più nodi è lo spostamento da local_rank A global_rank. Questo è fondamentale perché ogni processo richiede un identificatore univoco. Ad esempio, se lavori con due nodi, ciascuno con 8 GPU, entrambi i processi 0 e 8 avrebbero un local_rank di 0.

Il global_rank è dato dalla formula molto intuitiva:

rango_globale = rango_nodo * dimensione_mondo_per_nodo + rango_locale

Quindi prima modifichiamo il file ddp_setup funzione:

def ddp_setup(local_rank, world_size_per_node, node_rank):
os.environ("MASTER_ADDR") = "MASTER_NODE_IP" # <-- Replace with your master node IP
os.environ("MASTER_PORT") = "12355"
global_rank = node_rank * world_size_per_node + local_rank
init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node*torch.cuda.device_count())
torch.cuda.set_device(local_rank)

E dobbiamo regolare la funzione principale che ora assume il file wold_size_per_node in argomento:

def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int):
ddp_setup(local_rank, world_size_per_node, node_rank)
# ... (rest of the main function)

E infine aggiustiamo il mp.spawn() funzionare con il world_size_per_node anche:

if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
parser.add_argument('--node_rank', default=0, type=int, help='The rank of the node in multi-node training')
args = parser.parse_args()

world_size_per_node = torch.cuda.device_count()
mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)

Utilizzando un cluster (SLURM)

Ora sei pronto per inviare la formazione al cluster. È molto semplice, devi solo chiamare il numero di nodi che desideri.

Ecco un modello per lo script SLURM:

#!/bin/bash
#SBATCH --job-name=DDPTraining # Name of the job
#SBATCH --nodes=$1 # Number of nodes specified by the user
#SBATCH --ntasks-per-node=1 # Ensure only one task runs per node
#SBATCH --cpus-per-task=1 # Number of CPU cores per task
#SBATCH --gres=gpu:1 # Number of GPUs per node
#SBATCH --time=01:00:00 # Time limit hrs:min:sec (1 hour in this example)
#SBATCH --mem=4GB # Memory limit per GPU
#SBATCH --output=training_%j.log # Output and error log name (%j expands to jobId)
#SBATCH --partition=gpu # Specify the partition or queue

srun python3 your_python_script.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID

E ora puoi avviare l’allenamento dal terminale con il comando

sbatch train_net.sh 2  # for using 2 nodes

Congratulazioni, ce l’hai fatta!

Grazie per aver letto! Prima che tu vada:

Per tutorial più fantastici, controlla il mio compilazione di tutorial sull’intelligenza artificiale su Github

Ydovresti ricevere i miei articoli nella tua casella di posta. Iscriviti qui.

Se desideri avere accesso ad articoli premium su Medium, hai solo bisogno di un abbonamento per $ 5 al mese. Se ti iscrivi con il mio collegamentomi sostieni con una parte del tuo compenso senza costi aggiuntivi.

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *