Uno strumento di pianificazione basato sulle priorità per i lavori di formazione di Amazon SageMaker |  di Chaim Rand |  Marzo 2024

 | Intelligenza-Artificiale

Ottimizzazione dell'uso di acceleratori di addestramento IA limitati – Parte 2

fotografato da Adriano Aletti SU Unsplash

Questo post è stato creato in collaborazione con Max Rabin.

Questa è la seconda parte di una serie di post sul tema della massimizzazione dell'utilità delle scarse risorse di intelligenza artificiale. Nel primo post abbiamo notato le crescenti limitazioni alla capacità di espandere a piacimento le risorse IA e, di conseguenza, la tendenza crescente dei team di sviluppo IA a garantire la capacità di calcolo dell’IA attraverso la creazione di una server farm IA interna e/o la prenotazione istanze dedicate nel cloud. La scarsità di risorse di calcolo basate sull’intelligenza artificiale motiva la progettazione di soluzioni di pianificazione specializzate per ridurre al minimo i tempi di inattività e dare priorità ai carichi di lavoro critici. Si prega di consultare il nostro messaggio precedente in cui abbiamo proposto un elenco dettagliato dei requisiti per tali soluzioni. L'approccio che abbiamo adottato è stato quello di sfruttare le priorità esistenti pianificatore che viene fornito con Kubernetes e allineare il nostro flusso di lavoro di sviluppo della formazione al suo utilizzo. In questo post esploriamo la possibilità di mantenere il nostro framework esistente per l'addestramento dei modelli di intelligenza artificiale e di migliorarlo con la nostra implementazione personalizzata di uno scheduler basato sulle priorità. È importante sottolineare che la necessità di questo tipo di soluzione è spesso motivata non solo dalla scarsità di risorse di intelligenza artificiale, ma anche dal desiderio di aumentare il controllo sull’orchestrazione e sulla definizione delle priorità dei carichi di lavoro di formazione in modo da ridurre i costi di sviluppo. Ad esempio, anche in uno scenario di capacità abbondante, è possibile scegliere di limitare l'utilizzo a un numero fisso di istanze di formazione in modo da limitare le spese di formazione.

Ai fini di questo post, supporremo che il nostro framework di formazione preferito sia il servizio gestito di AWS per la formazione dei modelli AI, Amazon SageMaker. La soluzione che proporremo utilizzerà servizi AWS aggiuntivi come Amazon DynamoDB E AWSLambda. La scelta di dimostrare la nostra soluzione utilizzando i servizi AWS non deve essere vista come un'approvazione. Sono disponibili molte offerte di servizi basati su cloud e quella migliore per te dipenderà dai dettagli particolari del tuo progetto. Soluzioni simili a quella che descriveremo possono essere progettate su altri ambienti cloud-based e/o utilizzando servizi cloud alternativi.

Tradizionalmente, avvieremmo un lavoro di formazione SageMaker utilizzando il file SDK Python di Amazon SageMaker. Nel blocco di codice seguente utilizziamo SageMaker SDK (versione 2.208) per eseguire un carico di lavoro di addestramento PyTorch su una singola istanza di tipo p5.48xgrande.

from sagemaker.pytorch import PyTorch

# define job
estimator = PyTorch(
role='<sagemaker role>',
entry_point='train.py',
instance_type='ml.p5.48xlarge',
instance_count=1,
framework_version='2.0.1',
py_version='py310',
tags=({'Key': 'priority', 'Value': '100'}
)

# start job
estimator.fit()

Quando il stimatore.fit() viene chiamata la funzione, la libreria SageMaker carica il nostro codice su Amazon S3 e poi trasforma la richiesta in un client SageMaker boto3 crea_lavoro_di_formazione richiesta (cfr Qui).

Questa modalità di avvio dei lavori di formazione dipende dalla disponibilità delle risorse richieste per il suo successo. Nel nostro scenario di scarse risorse di intelligenza artificiale, è probabile che fallisca il più delle volte. Anche se questo può essere parzialmente mitigato da mantenendo le istanze di calcolo fornite per i carichi di lavoro successivil'API lo fa non fornire gli strumenti adeguati per massimizzare la loro utilità. Supponiamo di volerne utilizzare esattamente due p5.48xgrande istanze. Per semplificare la discussione, presupponiamo che ogni carico di lavoro di training venga eseguito su una singola istanza. In genere, durante un ciclo di sviluppo del modello AI ci saranno periodi in cui sono presenti più di due carichi di lavoro di addestramento in attesa di essere elaborati. L'API esistente proverebbe ad avviarne una terza p5.48xgrande esempio e molto probabilmente fallirebbe a causa della sua disponibilità limitata. Anche quando c'è disponibilità di istanze, potremmo voler limitare la nostra formazione solo alle due istanze designate per aumentare il nostro controllo sui costi della formazione.

Abbiamo bisogno di una nuova API per l'invio di lavori per la formazione, che non ne avvii immediatamente una nuova p5.48xgrande istanza, ma piuttosto inserisce i lavori in una coda di priorità. E abbiamo bisogno di un job scheduler associato che gestisca l'utilizzo delle nostre due risorse dando priorità ai carichi di lavoro critici.

È importante sottolineare che al momento della stesura di questo articolo Amazon SageMaker lo fa non sostenere la possibilità di formazione su istanze Amazon EC2 riservate. E sebbene Piani di risparmio di Amazon SageMaker ha proprietà simili alle prenotazioni di istanze, lo fa non garantire la capacità dell'istanza. In un messaggio precedente abbiamo affrontato questa limitazione e proposto l'utilizzo SageMaker gestiva piscine calde come metodo alternativo per mantenere l'accesso alle istanze fornite. Per il resto del post, supporremo di essere in grado di ottenere due istanze a nostra scelta, sia attraverso questo che qualche altro metodo.

In questa sezione descriveremo i componenti della nostra soluzione proposta. Utilizzeremo il Specifica del modello SAM (Serverless Application Model) di AWS. Più specificamente, creeremo un file File YAML del modello AWS SAM e aggiungere gradualmente le risorse AWS di cui abbiamo bisogno. Si prega di consultare il documentazione per dettagli su come definire e distribuire soluzioni serverless utilizzando AWS SAM.

Diagramma dell'architettura AWS (per autore)

Iniziamo utilizzando Gateway API di Amazon definire a API REST privata per l'invio di richieste di lavoro formativo. Diamo un nome all'API coda-di-lavoro-formazione. Successivamente aggiungeremo un metodo POST chiamato aggiungere lavoro e modificare il nostro codice di creazione del lavoro di formazione per utilizzare questo metodo anziché il client SageMaker crea_lavoro_di_formazione API. Il blocco di codice seguente contiene la definizione di private Risorsa API nel SAM. In pratica probabilmente vorrai specificare limitazioni di accesso all'API e/o un metodo di autorizzazione.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
InternalAPI:
Type: AWS::Serverless::Api
# Auth: # Add access control to API
EndpointConfiguration:
Type: PRIVATE
# VPCEndpointIds: # Specify VPC Endpoint(s)
Name: training-job-queue
StageName: prod

Definire una tabella AWS DynamoDB per l'archiviazione delle richieste di lavoro di formazione

Useremo un Amazon DynamoDB tabella denominata coda-sagemaker per archiviare i carichi di lavoro di formazione inviati. Ogni voce avrà i seguenti campi:

  1. jobName: memorizza il nome univoco del lavoro di formazione.
  2. entryTime: memorizza la data e l'ora in cui è stato aggiunto il lavoro.
  3. jobState: memorizza lo stato corrente del processo di formazione. I valori validi sono “in sospeso”, “in esecuzione” e “prerilasciato”.
  4. priorità: memorizza un valore intero che rappresenta la priorità relativa del lavoro.
  5. jobDetails: memorizza i dettagli della richiesta di lavoro.

Definiamo la nostra tabella DynamoDB nel nostro file YAML del modello SAM utilizzando il file AWS::Serverless::SimpleTable risorsa.

  DynamoSMQueue:
Type: AWS::Serverless::SimpleTable
Properties:
PrimaryKey:
Name: jobName
Type: String
TableName: sagemaker-queue

Definiamo una funzione che crea una voce nella tabella da una determinata richiesta di lavoro di formazione. Supponiamo che la richiesta contenga lo stesso contenuto dell'input nel file crea_lavoro_di_formazione API in formato JSON. Assumiamo inoltre che il priorità del carico di lavoro viene inserito come valore-chiave etichetta nella definizione del lavoro formativo.

import json, boto3, datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sagemaker-queue')

def add_job_entry(job_json):
job_details = json.loads(job_json)

# extract job_name
job_name = job_details('TrainingJobName')
print(f'add entry {job_name}')

# get current time
entry_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

# default priority is 0
priority = 0

# update priority based on tags
tags = job_details('Tags')
for tag in tags:
if tag('Key') == 'priority':
priority = int(tag('Value'))
break

# create entry
entry = {
'jobName': job_name,
'entryTime': entry_time,
'jobState': 'pending',
'priority': priority,
'jobDetails': job_json
}
table.put_item(Item=entry) #TODO handle errors
print(f'Added job {job_name} to queue')

L'API REST aggiungere lavoro metodo che definiremo prossimamente sarà programmato per richiamare il aggiungi_voce_lavoro funzione.

Definiamo una seconda funzione che estrae i lavori pendenti dal database e li restituisce in ordine di priorità. Nel caso in cui più lavori abbiano la stessa priorità, verranno ordinati in base al tempo di attesa in coda.

from boto3.dynamodb.conditions import Attr

# Get a list of all pending jobs sorted by priority
def get_pending_jobs():
response = table.scan(
ProjectionExpression='jobName, priority, entryTime',
FilterExpression=Attr('jobState').ne('running')
)
jobs = response.get('Items', ())

# sort jobs, first by priority (descending) and then by entryTime
sorted_jobs = sorted(jobs,
key=lambda x: (-x('priority'), x('entryTime')))

return sorted_jobs

Le seguenti funzioni di utilità torneranno utili nelle prossime sezioni.

# Get a jobName -> priority mapping of all running jobs
def get_running_jobs_dict():
# Get all running jobs
response = table.scan(
ProjectionExpression="jobName, priority",
FilterExpression=Attr('jobState').eq('running')
)
jobs = response.get('Items', ())

running_jobs = {job('jobName'): job('priority') for job in jobs}

return running_jobs

# Print the queue state
def print_queue_state():
response = table.scan(
ProjectionExpression='jobName, jobState, priority'
)
jobs = response.get('Items', ())

print_table = ()
for job in jobs:
print_table.append((job('jobName'), job('jobState'), job('priority')))

# sort by priority
sorted_table = sorted(print_table,
key=lambda x: -x(2))
# Print the table
from tabulate import tabulate
print(tabulate(sorted_table, headers=('Job Name', 'State', 'Priority')))

# get job details
def get_job_details(job_name):
response = table.get_item(
Key={'jobName': job_name},
ProjectionExpression='jobDetails'
)
return json.loads(response.get('Item').get('jobDetails'))

# get job state or None if the job does not exist
def get_job_state(job_name):
response = table.get_item(
Key={'jobName': job_name},
ProjectionExpression='jobState'
)
job = response.get('Item')
return job.get('jobState') if job else None

# update the job state
def update_job_state(job_name, new_state):
table.update_item(
Key={'jobName': job_name},
UpdateExpression="SET jobState = :new_state",
ExpressionAttributeValues={":new_state": new_state}
)
print(f'Update job {job_name} to {new_state}')

# remove a job entry
def remove_job(job_name):
table.delete_item(
Key={'jobName': job_name}
)
print(f'Removed job {job_name} from queue')

Sia la nostra scelta di DynamoDB che il suo utilizzo (ad esempio, il nostro utilizzo di Scansione API anziché Domanda API) presuppongono che il numero complessivo di lavori nella nostra coda sarà al massimo dell'ordine delle decine. Per una soluzione su scala più ampia, potrebbe essere meglio con un database più pesante (ad esempio, uno che esegue l'operazione di ordinamento per te) o un uso più sofisticato di DynamoDB (ad esempio, vedi Qui).

Definire il gestore della coda dei processi di formazione

Il componente principale della nostra soluzione è il pianificatore dei lavori di formazione. Qui implementiamo un gestore piuttosto semplice che esegue i seguenti passaggi:

  1. Estrae l'elenco dei lavori in coda, ordinati per priorità. Se non esiste, ritorna.
  2. Scopri la capacità delle istanze inutilizzata. Per ogni istanza gratuita, avvia un lavoro in sospeso su SageMaker. Se dopo tale data non rimangono più posti di lavoro, ritorna.
  3. Calcola il numero di lavori SageMaker nel file Fermarsi stato. Se maggiore del numero di lavori in sospeso, restituire.
  4. Valutare la necessità di prelazione sull'esecuzione dei lavori SageMaker confrontando i loro priorità a quelli dei nostri lavori in sospeso.
# set the limit on total number of instances/jobs
MAX_CAPACITY = 2

sagemaker = boto3.client('sagemaker')

# apply a queue stamp to identify that the job came from the queue
def apply_qstamp(job_name):
return f'{job_name}-qstamp-{datetime.now().strftime("%d%H%M")}'

# strip the queue stamp
def strip_qstamp(job_name):
return job_name.split('-qstamp-')(0)

# start a SageMaker job and update job entry in queue
def start_job(job_name):
print(f'start job {job_name}')
job_details = get_job_details(job_name)
job_details('TrainingJobName') = apply_qstamp(job_name)
if(job_details):
# start job with detail from queue
# (you may optinally overwrite fields such as the iam role)
response = sagemaker.create_training_job(**job_details)
if response('ResponseMetadata')('HTTPStatusCode') == 200:
print(f'started job {job_name}')
update_job_state(job_name, 'running')

# preempt a SageMaker job and update job entry in queue
def preempt_job(job_name):
print(f'preempt job {job_name}')
response = sagemaker.stop_training_job(TrainingJobName=job_name)
if response('ResponseMetadata')('HTTPStatusCode') == 200:
print(f'preempted job {job_name}')
update_job_state(strip_qstamp(job_name), 'preempted')

# get SageMaker jobs
def get_sagemaker_jobs(status):
running = sagemaker.list_training_jobs(StatusEquals=status)
return running.get('TrainingJobSummaries', ())

# queue manager
def manage_queue():
# extract pending jobs to run
pending = get_pending_jobs()

if not pending:
return

if len(pending) > MAX_CAPACITY:
pending = pending(:MAX_CAPACITY)

# get running sagemaker jobs
running = get_sagemaker_jobs('InProgress')
total_running = len(running)

# get stopping sagemaker jobs
stopping = get_sagemaker_jobs('Stopping')
total_stopping = len(stopping)

# calculate the number of free instances
free_slots = MAX_CAPACITY - total_running - total_stopping

jobs_to_start = min(len(pending), free_slots)

# for each free instance, start a job
for i in range(jobs_to_start):
start_job(pending(i).get('jobName'))

still_pending = pending(jobs_to_start:)

if not still_pending:
return

# assume that 'total_stopping' number of jobs will start soon
test_for_preemption = len(still_pending) - total_stopping
if test_for_preemption <= 0:
return

# check if preemption is required
test_priority = still_pending(total_stopping:)

running_jobs = get_running_jobs_dict()
priority_dict = {}
for job in running:
job_name = job('TrainingJobName')
priority_dict(job_name) = running_jobs(strip_qstamp(job_name))

# sort running jobs from lowest to highest priority
sorted_running = sorted(priority_dict.items(), key=lambda item: item(1))

index = 0
while index < test_for_preemption and \
test_priority(index).get('priority') > sorted_running(index)(1):
preempt_job(sorted_running(index)(0))
index = index + 1

Note importanti:

  1. La nostra implementazione è molto ottimistica nel senso che presupponiamo che tutti i lavori inseriti siano validi e che saremo in grado di avviarli su SageMaker senza problemi. In pratica, dovrebbe essere aggiunta un'appropriata gestione degli errori (ad esempio, rimuovendo i lavori difettosi dalla coda con un'appropriata registrazione).
  2. In un ambiente di produzione, dovremmo prendere in considerazione la probabile occorrenza di a condizione di gara quando il nostro coda_manager viene attivato da più eventi simultanei. Esistono diversi modi per affrontare questo problema (ad es Qui) inclusa l'applicazione dell'atomicità (ad esempio, impostando our Concorrenza delle funzioni lambda a uno), utilizzando una qualche forma di meccanismo di blocco (ad esempio, come fatto Qui), o rendendo la nostra funzione idempotente. Qui abbiamo adottato l'approccio di ciò che chiamiamo “idempotenza ottimistica”, in cui ci basiamo sull'uso appropriato dell'API e sull'idempotenza delle nostre chiamate sottostanti alle API SageMaker.
  3. Sottolineiamo che la nostra implementazione è ingenua. In pratica, consigliamo un algoritmo più sofisticato che 1) tenga conto dell'uso di diversi tipi di istanze e lavori che richiedono più di un'istanza, 2) prenda in considerazione tutti i casi limite e 3) sia adattato alle esigenze specifiche del tuo progetto.

Definire la funzione AWS Lambda

Il componente successivo della soluzione è la funzione Lambda. Il seguente blocco di codice include SAM definizione della nostra funzione serverless. Programmiamo la funzione in modo che venga eseguita su due diversi tipi di eventi: qualsiasi chiamata a aggiungere lavoro sul nostro gateway API privato e a passare allo stato di un lavoro di formazione SageMaker.

  ManagedTrainingJobQueue:
Type: AWS::Serverless::Function
Properties:
CodeUri: job-queue/ # the directory containing our index.py file
Handler: index.lambda_handler
Runtime: python3.12
Architectures:
- arm64 # use graviton
Policies: # allow access to SageMaker and DynamoDB
- !Sub "arn:${AWS::Partition}:iam::aws:policy/AmazonSageMakerFullAccess"
- DynamoDBCrudPolicy:
TableName: !Ref DynamoSMQueue
Events:
CreateTraining:
Type: Api
Properties:
Path: /add-job
Method: post
RestApiId: !Ref InternalAPI
SageMakerEvent:
Type: EventBridgeRule
Properties:
Pattern:
source:
- aws.sagemaker
detail-type:
- SageMaker Training Job State Change
detail:
TrainingJobStatus:
- "Completed"
- "Failed"
- "Stopped"

IL lambda_handler la funzione è implementata come segue:

def lambda_handler(event, context):
# identify source of event and take appropriate action
if 'requestContext' in event and 'apiId' in event('requestContext'):
print('Lambda triggerred by API Gateway')
job_details = json.loads(event.get('body'))
add_job_entry(job_details)
elif 'source' in event and event('source') == 'aws.sagemaker':
print('Lambda triggerred by SageMaker job state change')
job_name = event('detail')('TrainingJobName')
job_status = event('detail')('TrainingJobStatus')
print(f'{job_name} status changed to {job_status}')

# strip qstamp from job_name
job_name = strip_qstamp(job_name)

if job_status in ('Completed' , 'Failed'):
remove_job(job_name)
elif job_status == 'Stopped':
# check if it was manually stopped or preempted by queue manager
if get_job_state(job_name) == 'preempted':
print(f'job {job_name} preemption completed')
else:
print(f'job {job_name} {job_status}, remove from queue')
remove_job(job_name)

# in all cases invoke queue manager
manage_queue()

Intercetta la richiesta di creazione del lavoro di formazione

L'ultima modifica richiesta per rendere completa la nostra soluzione è intercettare la chiamata a SageMaker crea_lavoro_di_formazione API e reindirizzarlo al nostro aggiungere lavoro metodo. Lo facciamo sovrascrivendo il file _intercetta_crea_richiesta funzione del Classe Sessione SageMaker:

from sagemaker.pytorch import PyTorch
from sagemaker.session import Session
import requests, logging
logger = logging.getLogger('sagemaker')

def submit_to_training_queue(job):
logger.info(f'Adding training-job {job('TrainingJobName')} to queue')
logger.debug('train request: {json.dumps(job, indent=4)}')

vpce='<vpc endpoint>' # insert id of vpc endpoint
region='us-east-1' # specify region
url=f'https://{vpce}.execute-api.{region}.vpce.amazonaws.com/prod/add-job'
headers = {'x-apigw-api-id': '<api-id>'} # insert api gateway id

# submit job
response = requests.post(url, headers=headers, json=job)

class QueueTrainingJobSession(Session):
def _intercept_create_request(self, request, create, func_name = None):
"""This function intercepts the create job request

Args:
request (dict): the create job request
create (functor): a functor calls the sagemaker client create method
func_name (str): the name of the function needed intercepting
"""
if func_name == 'train':
submit_to_training_queue(request)
else:
super()._intercept_create_request(request,create,func_name)

# define job
estimator = PyTorch(
role='<sagemaker role>',
entry_point='train.py',
instance_type='ml.p5.48xlarge',
instance_count=1,
framework_version='2.0.1',
py_version='py310',
tags=({'Key': 'priority', 'Value': '100'},
keep_alive_period_in_seconds=60, # keep warm for 1 minute
# use our custom Session class
sagemaker_session=QueueTrainingJobSession()
)

estimator.fit(wait=False)

Per testare la nostra soluzione inviamo la seguente sequenza di lavori. Dopo ogni chiamata stampiamo lo stato della coda (usando il file print_queue_state funzione) e dormire per venti secondi.

  1. Avvia lavoro1 con priorità 1.
  2. Avvia job2 con priorità 2.
  3. Avvia job3 con priorità 1.
  4. Avvia job4 con priorità 3.

I primi due lavori vengono immediatamente inviati a SageMaker e aggiornati al file corsa stato. Dato che il terzo lavoro ha una priorità bassa e abbiamo esattamente due istanze di formazione, rimane nel file in attesa di stato e aspetta il suo turno. Dopo aver inviato i primi tre lavori, lo stato della coda appare come:

Job Name    State      Priority
---------- ------- ----------
job2 running 2
job1 running 1
job3 pending 1

Il quarto lavoro che inviamo ha una priorità più alta rispetto a tutti i lavori in coda. Di conseguenza, il lavoro in esecuzione con la priorità più bassa, lavoro1è anticipato. Il lavoro SageMaker corrispondente viene interrotto e una volta rilasciata l'istanza, lo stato della coda diventa:

Job Name    State        Priority
---------- --------- ----------
job4 running 3
job2 running 2
job1 preempted 1
job3 pending 1

Il lavoro SageMaker è in esecuzione lavoro2 è il primo a finire, lavoro2 viene rimosso dalla coda e il nostro lavoro anticipato viene ripreso:

Job Name    State      Priority
---------- ------- ----------
job4 running 3
job1 running 1
job3 pending 1

Una volta lavoro4 viene completato, anch'esso viene rimosso dalla coda, lasciando spazio a lavoro3. Anche i lavori rimanenti vengono completati, lasciando la coda vuota.

La crescente difficoltà nell’acquisire capacità di calcolo dell’intelligenza artificiale ha costretto i team di sviluppo dell’intelligenza artificiale a rivalutare i processi che utilizzano per addestrare i modelli di intelligenza artificiale. L'approccio che abbiamo dimostrato in questo post è quello di aumentare le API tradizionali per i modelli di training con una coda di priorità personalizzata e uno scheduler di lavoro associato. È importante sottolineare che la proposta che abbiamo presentato dovrebbe essere vista come uno schema generale, non come una soluzione degna di produzione. Sarebbero necessarie modifiche e miglioramenti appropriati per soddisfare le esigenze specifiche del progetto.

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *