Nei post precedenti (es. Qui) abbiamo approfondito l’importanza della profilazione e dell’ottimizzazione delle prestazioni dei carichi di lavoro di formazione DNN. La formazione di modelli di deep learning, soprattutto quelli di grandi dimensioni, può essere un’impresa costosa. La tua capacità di massimizzare l’utilizzo delle risorse di formazione in modo da accelerare la convergenza del modello e ridurre al minimo i costi di formazione, può essere un fattore decisivo per il successo del tuo progetto. Ottimizzazione delle prestazioni è un processo iterativo in cui identifichiamo e affrontiamo il colli di bottiglia prestazionali nella nostra applicazione, ovvero le parti della nostra applicazione che ci impediscono di aumentare l’utilizzo delle risorse e/o di accelerare il tempo di esecuzione.
Questo post è il terzo di una serie di post incentrati su uno dei colli di bottiglia prestazionali più comuni che incontriamo durante l’addestramento dei modelli di deep learning, il collo di bottiglia della pre-elaborazione dei dati. Un collo di bottiglia della pre-elaborazione dei dati si verifica quando la nostra GPU (o acceleratore alternativo), in genere il il più costoso risorsa nella nostra configurazione di addestramento: si ritrova inattiva mentre attende l’input di dati da eccessivamente incaricato Risorse della CPU.
Nel nostro primo post sull’argomento abbiamo discusso e dimostrato diversi modi per affrontare questo tipo di collo di bottiglia, tra cui:
- Scegliendo un’istanza di training con un rapporto di calcolo CPU/GPU più adatto al tuo carico di lavoro,
- Migliorare il bilanciamento del carico di lavoro tra CPU e GPU spostando alcune operazioni della CPU sulla GPU e
- Scaricamento di parte del calcolo della CPU su dispositivi di lavoro CPU ausiliari.
Abbiamo dimostrato la terza opzione utilizzando il file API del servizio dati TensorFlowuna soluzione specifica per TensorFlow, in cui una parte dell’elaborazione dei dati di input può essere scaricata su altri dispositivi utilizzando gRPC come protocollo di comunicazione sottostante.
Nel nostro secondo articoloabbiamo proposto una soluzione più generica basata su gRPC per l’utilizzo di lavoratori ausiliari della CPU e l’abbiamo dimostrata su un modello PyTorch giocattolo. Anche se richiedeva un po’ più di codifica e messa a punto manuale rispetto al API del servizio dati TensorFlowla soluzione ha fornito una robustezza molto maggiore e ha consentito la stessa ottimizzazione delle prestazioni di allenamento.
Bilanciamento del carico con Ray
In questo post dimostreremo un metodo aggiuntivo per l’utilizzo di CPU work ausiliari che mira a combinare la robustezza della soluzione generica con la semplicità e la facilità d’uso dell’API specifica di TensorFlow. Verrà utilizzato il metodo che dimostreremo Set di dati di Ray dal Ray Dati biblioteca. Sfruttando tutta la potenza di Ray gestione delle risorse E pianificazione distribuita sistemi, Ray Data è in grado di eseguire la nostra pipeline di input dei dati di addestramento in modo che sia entrambe le cose scalabile E distribuito. In particolare, configureremo il nostro Ray Dataset in modo tale che la libreria rileverà e utilizzerà automaticamente tutte le risorse della CPU disponibili per pre-elaborare i dati di addestramento. Concluderemo ulteriormente il nostro ciclo di addestramento del modello con a Ray AIR Trainer in modo da consentire il ridimensionamento senza interruzioni a un’impostazione multi-GPU.
Distribuzione di un cluster Ray su Amazon SageMaker
Un prerequisito per l’utilizzo del framework Ray e delle utilità che offre in un ambiente multi-nodo è l’implementazione di un Ammasso di raggi. In generale, progettare, implementare, gestire e mantenere un cluster di elaborazione di questo tipo può essere un compito arduo e spesso richiede un ingegnere devops dedicato (o un team di ingegneri). Ciò può rappresentare un ostacolo insormontabile per alcuni team di sviluppo. In questo post dimostreremo come superare questo ostacolo utilizzando il servizio di formazione gestito di AWS, Amazon SageMaker. In particolare, creeremo un Cluster eterogeneo di SageMaker con istanze GPU e istanze CPU e utilizzarlo per distribuire un cluster Ray all’avvio. Eseguiremo quindi l’applicazione di formazione Ray AIR su questo cluster Ray facendo affidamento sul backend di Ray per eseguire un bilanciamento del carico efficace su tutte le risorse nel cluster. Una volta completata l’applicazione di formazione, il cluster Ray verrà smontato automaticamente. L’utilizzo di SageMaker in questo modo ci consente di distribuire e utilizzare un cluster Ray senza il sovraccarico comunemente associato alla gestione del cluster.
Ray è un framework potente che consente un’ampia gamma di carichi di lavoro di machine learning. In questo post dimostreremo solo alcune delle sue capacità e API utilizzando Ray versione 2.6.1. Questo post non deve essere utilizzato in sostituzione del Documentazione Ray. Assicurati di controllare il funzionario documentazione per l’utilizzo più appropriato ed aggiornato delle utilità Ray.
Prima di iniziare, un ringraziamento speciale a Boruch gesso per avermi fatto conoscere la libreria Ray Data e le sue funzionalità uniche.
Per facilitare la nostra discussione, definiremo e addestreremo un semplice PyTorch (2.0) Trasformatore di visionemodello di classificazione basato su che addestreremo su un set di dati sintetico composto da immagini ed etichette casuali. IL Documentazione Ray AIR include un’ampia varietà di esempi che dimostrano come creare diversi tipi di carichi di lavoro di formazione utilizzando Ray AIR. Lo script che creiamo qui segue vagamente i passaggi descritti nel file Esempio di classificatore di immagini PyTorch.
Definizione del set di dati Ray e del preprocessore
IL API Ray AIR Trainer distingue tra il set di dati grezzi e la pipeline di preelaborazione che viene applicata agli elementi del set di dati prima di inserirli nel ciclo di addestramento. Per il nostro set di dati Ray grezzo creiamo un semplice intervallo di numeri interi di dimensione num_record. Successivamente, definiamo il Preprocessore che vorremmo applicare al nostro set di dati. Il nostro Ray Preprocesser contiene due componenti: il primo è a BatchMapper che mappa gli interi grezzi su coppie casuali di etichetta-immagine. Il secondo è a Preprocessore TorchVision che esegue a trasformazione della torciavisione sui nostri batch casuali che li converte in tensori PyTorch e applica una serie di Sfocatura gaussiana operazioni. IL Sfocatura gaussiana le operazioni hanno lo scopo di simulare una pipeline di pre-elaborazione dei dati relativamente pesante. I due preprocessori vengono combinati utilizzando a Catena Preprocessore. La creazione del set di dati Ray e del preprocessore è dimostrata nel blocco di codice seguente:
import ray
from typing import Dict, Tuple
import numpy as np
import torchvision.transforms as transforms
from ray.data.preprocessors import Chain, BatchMapper, TorchVisionPreprocessordef get_ds(batch_size, num_records):
# create a raw Ray tabular dataset
ds = ray.data.range(num_records)
# map an integer to a random image-label pair
def synthetic_ds(batch: Tuple(int)) -> Dict(str, np.ndarray):
labels = batch('id')
batch_size = len(labels)
images = np.random.randn(batch_size, 224, 224, 3).astype(np.float32)
labels = np.array((label % 1000 for label in labels)).astype(
dtype=np.int64)
return {"image": images, "label": labels}
# the first step of the prepocessor maps batches of ints to
# random image-label pairs
synthetic_data = BatchMapper(synthetic_ds,
batch_size=batch_size,
batch_format="numpy")
# we define a torchvision transform that converts the numpy pairs to
# tensors and then applies a series of gaussian blurs to simulate
# heavy preprocessing
transform = transforms.Compose(
(transforms.ToTensor()) + (transforms.GaussianBlur(11))*10
)
# the second step of the prepocessor appplies the torchvision tranform
vision_preprocessor = TorchVisionPreprocessor(columns=("image"),
transform=transform)
# combine the preprocessing steps
preprocessor = Chain(synthetic_data, vision_preprocessor)
return ds, preprocessor
Tieni presente che la pipeline di dati Ray utilizzerà automaticamente tutte le CPU disponibili nel cluster Ray. Ciò include le risorse CPU presenti sull’istanza GPU nonché le risorse CPU di eventuali istanze ausiliarie aggiuntive nel cluster.
Definire il ciclo formativo
Il passaggio successivo consiste nel definire la sequenza di formazione che verrà eseguita su ciascuno degli operatori di formazione (ad esempio, GPU). Per prima cosa definiamo il modello utilizzando il popolare timm (0.6.13) pacchetto Python e avvolgerlo utilizzando il file train.torch.prepare_model API. Successivamente, estraiamo il frammento appropriato dal set di dati e definiamo un iteratore che produce batch di dati con la dimensione batch richiesta e li copia sul dispositivo di addestramento. Poi arriva il ciclo di addestramento stesso che comprende il codice PyTorch standard. Quando usciamo dal ciclo, riportiamo la metrica della perdita risultante. La sequenza di formazione per lavoratore è illustrata nel blocco di codice seguente:
import time
from ray import train
from ray.air import session
import torch.nn as nn
import torch.optim as optim
from timm.models.vision_transformer import VisionTransformer# build a ViT model using timm
def build_model():
return VisionTransformer()
# define the training loop per worker
def train_loop_per_worker(config):
# wrap the PyTorch model with a Ray object
model = train.torch.prepare_model(build_model())
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# get the appropriate dataset shard
train_dataset_shard = session.get_dataset_shard("train")
# create an iterator that returns batches from the dataset
train_dataset_batches = train_dataset_shard.iter_torch_batches(
batch_size=config("batch_size"),
prefetch_batches=config("prefetch_batches"),
device=train.torch.get_device()
)
t0 = time.perf_counter()
for i, batch in enumerate(train_dataset_batches):
# get the inputs and labels
inputs, labels = batch("image"), batch("label")
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
if i % 100 == 99: # print every 100 mini-batches
avg_time = (time.perf_counter()-t0)/100
print(f"Iteration {i+1}: avg time per step {avg_time:.3f}")
t0 = time.perf_counter()
metrics = dict(running_loss=loss.item())
session.report(metrics)
Definizione del Ray Torch Trainer
Una volta definita la pipeline di dati e il ciclo di addestramento, possiamo passare alla configurazione del file Ray TorchTrainer. Configuriamo il Trainer in modo da tenere conto delle risorse disponibili nel cluster. Nello specifico, impostiamo il numero di addetti alla formazione in base al numero di GPU e impostiamo la dimensione del batch in base alla memoria disponibile sulla nostra GPU di destinazione. Costruiamo il nostro set di dati con il numero di record richiesti per addestrare esattamente 1000 passaggi.
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfigdef train_model():
# we will configure the number of workers, the size of our
# dataset, and the size of the data storage according to the
# available resources
num_gpus = int(ray.available_resources().get("GPU", 0))
# set the number of training workers according to the number of GPUs
num_workers = num_gpus if num_gpus > 0 else 1
# we set the batch size based on the GPU memory capacity of the
# Amazon EC2 g5 instance family
batch_size = 64
# create a synthetic dataset with enough data to train for 1000 steps
num_records = batch_size * 1000 * num_workers
ds, preprocessor = get_ds(batch_size, num_records)
ds = preprocessor(ds)
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={"batch_size": batch_size},
datasets={"train": ds},
scaling_config=ScalingConfig(num_workers=num_workers,
use_gpu=num_gpus > 0),
)
trainer.fit()
Distribuisci un Ray Cluster ed esegui la sequenza di addestramento
Ora definiamo il punto di ingresso del nostro script di formazione. È qui che impostiamo il cluster Ray e iniziamo la sequenza di addestramento sul nodo principale. Noi usiamo il Ambiente classe da formazione per sagemaker libreria per scoprire le istanze nel cluster eterogeneo SageMaker come descritto in questo tutorial. Definiamo il primo nodo del gruppo di istanze GPU come il nostro cluster Ray Testa node ed eseguire il comando appropriato su tutti gli altri nodi per collegarli al cluster. (Vedi il Documentazione Ray per maggiori dettagli sulla creazione di cluster.) Programmiamo il nodo head in modo che attenda fino a quando tutti i nodi non si saranno connessi e quindi inizieremo la sequenza di addestramento. Ciò garantisce che Ray utilizzerà tutte le risorse disponibili durante la definizione e la distribuzione delle attività Ray sottostanti.
import time
import subprocess
from sagemaker_training import environmentif __name__ == "__main__":
# use the Environment() class to auto-discover the SageMaker cluster
env = environment.Environment()
if env.current_instance_group == 'gpu' and \
env.current_instance_group_hosts.index(env.current_host) == 0:
# the head node starts a ray cluster
p = subprocess.Popen('ray start --head --port=6379',
shell=True).wait()
ray.init()
# calculate the total number of nodes in the cluster
groups = env.instance_groups_dict.values()
cluster_size = sum(len(v('hosts')) for v in list(groups))
# wait until all SageMaker nodes have connected to the Ray cluster
connected_nodes = 1
while connected_nodes < cluster_size:
time.sleep(1)
resources = ray.available_resources().keys()
connected_nodes = sum(1 for s in list(resources) if 'node' in s)
# call the training sequence
train_model()
# tear down the ray cluster
p = subprocess.Popen("ray down", shell=True).wait()
else:
# worker nodes attach to the head node
head = env.instance_groups_dict('gpu')('hosts')(0)
p = subprocess.Popen(
f"ray start --address='{head}:6379'",
shell=True).wait()
# utility for checking if the cluster is still alive
def is_alive():
from subprocess import Popen
p = Popen('ray status', shell=True)
p.communicate()(0)
return p.returncode
# keep node alive until the process on head node completes
while is_alive() == 0:
time.sleep(10)
Formazione su un cluster eterogeneo Amazon SageMaker
Una volta completato il nostro script di formazione, ora abbiamo il compito di distribuirlo a un cluster eterogeneo Amazon SageMaker. Per fare ciò seguiamo i passaggi descritti in questo tutorial. Iniziamo creando un dir_origine directory in cui inseriamo il file our train.py sceneggiatura e a requisiti.txt file contenente i due pacchetti pip da cui dipende il nostro script, timm E raggio (acceso). Questi vengono installati automaticamente su ciascuno dei nodi del cluster SageMaker. Definiamo due SageMaker Gruppi di istanzeil primo con un singolo ml.g5.xgrande istanza (contenente 1 GPU e 4 vCPU) e la seconda con una singola ml.c5.4xgrande istanza (contenente 16 vCPU). Usiamo quindi il Stimatore SageMaker PyTorch per definire e distribuire il nostro lavoro di formazione nel cloud.
from sagemaker.pytorch import PyTorch
from sagemaker.instance_group import InstanceGroup
cpu_group = InstanceGroup("cpu", "ml.c5.4xlarge", 1)
gpu_group = InstanceGroup("gpu", "ml.g5.xlarge", 1)estimator = PyTorch(
entry_point='train.py',
source_dir='./source_dir',
framework_version='2.0.0',
role='<arn role>',
py_version='py310',
job_name='hetero-cluster',
instance_groups=(gpu_group, cpu_group)
)
estimator.fit()
Nella tabella seguente confrontiamo i risultati di runtime dell’esecuzione del nostro script di training in due diverse impostazioni: una singola istanza GPU ml.g5.xlarge e un cluster eterogeneo contenente un’istanza ml.g5.xlarge e un’istanza ml.c5.4xlarge. Valutiamo l’utilizzo delle risorse di sistema utilizzando Amazon CloudWatch e stimare il costo della formazione utilizzando il metodo Prezzi di Amazon SageMaker disponibile al momento della stesura di questo articolo ($ 0,816 l’ora per l’istanza ml.c5.4xlarge e $ 1,408 per ml.g5.xlarge).
L’utilizzo relativamente elevato della CPU combinato con il basso utilizzo della GPU dell’esperimento a istanza singola indica un collo di bottiglia delle prestazioni nella pipeline di pre-elaborazione dei dati. Questi problemi vengono affrontati chiaramente quando si passa al cluster eterogeneo. Non solo aumenta l’utilizzo della GPU, ma anche la velocità di allenamento. Nel complesso, l’efficienza in termini di prezzo della formazione aumenta del 23%.
Dovremmo sottolineare che questi esperimenti giocattolo sono stati creati esclusivamente allo scopo di dimostrare le funzionalità di bilanciamento del carico automatizzato consentite dall’ecosistema Ray. È possibile che la regolazione dei parametri di controllo abbia portato a un miglioramento delle prestazioni. È anche probabile che la scelta di una soluzione diversa per risolvere il collo di bottiglia della CPU (come la scelta di un’istanza dal file EC2 g5 famiglia con più CPU) potrebbe aver portato a migliori prestazioni in termini di costi.
In questo post abbiamo dimostrato come i set di dati Ray possano essere utilizzati per bilanciare il carico di una pesante pipeline di pre-elaborazione dei dati su tutti i CPU Worker disponibili nel cluster. Ciò ci consente di risolvere facilmente i colli di bottiglia della CPU semplicemente aggiungendo istanze CPU ausiliarie all’ambiente di formazione. Il supporto cluster eterogeneo di Amazon SageMaker è un modo interessante per eseguire un lavoro di formazione Ray nel cloud poiché gestisce tutti gli aspetti della gestione del cluster evitando la necessità di supporto devops dedicato.
Tieni presente che la soluzione qui presentata è solo uno dei tanti modi diversi per affrontare i colli di bottiglia della CPU. La soluzione migliore per te dipenderà molto dai dettagli del tuo progetto.
Come al solito, non esitate a contattarci con commenti, correzioni e domande.