Gli LLM sono ormai ovunque, in particolare ChatGPT. Ci sono un sacco di applicazioni create sopra e se non lo sei, dovresti provarlo.
La creazione di applicazioni su ChatGPT richiederà probabilmente di effettuare diverse chiamate parallele. Sfortunatamente, non sei l’unico. Con così tante applicazioni che eseguono milioni di richieste al giorno (complimenti per il loro team di ingegneri tra l’altro) spesso l’API restituisce un errore di “troppe richieste”. Quindi abbiamo bisogno di un buon modo per gestire tali errori effettuando diverse chiamate parallele.
In questo piccolo tutorial Python, tratteremo questi due argomenti importanti per eseguire in modo efficiente le chiamate all’API ChatGPT:
- Eseguire più chiamate in parallelo
- Riprova le chiamate nel caso falliscano
Il modo più semplice per eseguire una chiamata è eseguirla in modo sincrono, ovvero inviare la richiesta e attendere l’arrivo della risposta per continuare il programma. Possiamo farlo semplicemente come segue:
import requestsheaders = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
response_json = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": "ping"}),
"temperature": 0
}).json()
print(response_json("choices")(0)("message")("content"))
Pong!
Se lavoriamo in un sistema semplice va bene, tuttavia, se desideriamo eseguire più chiamate in parallelo a un’API o ad altre risorse come un database, possiamo farlo in modo asincrono per risposte più rapide.
L’esecuzione delle attività in modo asincrono attiverà ogni azione e attenderà che finiscano in parallelo, riducendo così i tempi di attesa.
Un modo semplice per farlo è creare thread diversi per elaborare ciascuna richiesta, tuttavia esiste un modo migliore per farlo utilizzando chiamate asincrone.
Effettuare una chiamata asincrona è spesso più efficiente poiché è possibile specificare i luoghi esatti in cui l’applicazione deve attendere, mentre nel threading tradizionale il sistema metterà automaticamente i thread in attesa, il che potrebbe non essere ottimale.
Di seguito presentiamo un esempio che mostra la differenza tra l’utilizzo delle chiamate sincronizzate e asincrone.
# Sync callimport time
def delay_print(msg):
print(msg, end=" ")
time.sleep(1)
def sync_print():
for i in range(10):
delay_print(i)
start_time = time.time()
sync_print()
print("\n", time.time() - start_time, "seconds.")
0 1 2 3 4 5 6 7 8 9
10.019574642181396 seconds.
#Async Callimport asyncio
async def delay_print_async(msg):
print(msg, end=" ")
await asyncio.sleep(1)
async def async_print():
asyncio.gather(*(delay_print_async(i) for i in range(10)))
start_time = time.time()
await async_print()
print("\n", time.time() - start_time, "seconds.")
0.0002448558807373047 seconds.
0 1 2 3 4 5 6 7 8 9
IL asyncio.gather
Il metodo attiverà tutte le chiamate asincrone passate ad esso e restituirà i risultati una volta pronti.
Sfortunatamente eseguo chiamate asincrone con il requests
la biblioteca non è possibile. Per farlo puoi utilizzare il aiohttp
biblioteca. Di seguito è riportato un esempio di come eseguire una chiamata asincrona con aiohttp
.
import aiohttpasync def get_completion(content):
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json("choices")(0)('message')("content")
await get_completion("Ping")
Pong!
Come detto prima, per eseguire richieste asincrone dobbiamo utilizzare il file asyncio.gather
metodo.
async def get_completion_list(content_list):
return await asyncio.gather(*(get_completion(content) for content in content_list))await get_completion_list(("ping", "pong")*5)
('Pong!',
'Ping!',
'Pong!',
'Ping!',
'Pong!',
'Ping!',
'Pong!',
'Ping!',
'Pong!',
'Ping!')
Sebbene funzioni, eseguire le chiamate in questo modo non è l’ideale poiché stiamo ricreando l’oggetto della sessione per ogni chiamata. Possiamo risparmiare risorse e tempo riutilizzando lo stesso oggetto di sessione in questo modo:
async def get_completion(content, session):
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:response_json = await resp.json()
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list):
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*(get_completion(content, session) for content in content_list))
await get_completion_list(("ping", "pong")*5)
Semplice, vero? Con questo è possibile effettuare facilmente più chiamate. Un problema, tuttavia, è che spesso non è una buona pratica eseguire chiamate illimitate in questo modo poiché si può sovraccaricare il sistema ed essere penalizzati impedendo di eseguire richieste aggiuntive per un certo periodo di tempo (fidati di me, lo farai). Pertanto è una buona idea limitare il numero di chiamate che è possibile effettuare contemporaneamente. Puoi farlo facilmente con asyncio.Semaphore
classe.
IL Semaphore
La classe crea un gestore di contesto che gestirà la quantità di chiamate asincrone attualmente eseguite all’interno del suo contesto. Se viene raggiunto l’importo massimo, si bloccherà fino al termine di alcune chiamate.
async def get_completion(content, session, semaphore):
async with semaphore:await asyncio.sleep(1)
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*(get_completion(content, session, semaphore) for content in content_list))
start_time = time.perf_counter()
completion_list = await get_completion_list(("ping", "pong")*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
Time elapsed: 1.8094507199984946 seconds.
('Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!')
Una cosa facoltativa qui è segnalare come sta andando lo stato di avanzamento delle chiamate. Puoi farlo creando una piccola classe che manterrà i progressi e sarà condivisa tra tutte le chiamate. Puoi farlo come segue:
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
await asyncio.sleep(1)
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*(get_completion(content, session, semaphore, progress_log) for content in content_list))
start_time = time.perf_counter()
completion_list = await get_completion_list(("ping", "pong")*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Done runs 4/10.
Done runs 5/10.
Done runs 6/10.
Done runs 7/10.
Done runs 8/10.
Done runs 9/10.
Done runs 10/10.
Time elapsed: 1.755018908999773 seconds.
('Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!')
Ciò completa questa sezione su come eseguire più richieste asincrone. Con questo, puoi eseguire diverse chiamate asincrone, limitare il numero di chiamate per volta e segnalare l’avanzamento. Tuttavia ci sono ancora alcuni problemi da gestire.
Le richieste effettuate possono fallire per diversi motivi come sovraccarico del server, interruzione della connessione, richieste errate, ecc. Queste possono generare eccezioni o restituire risposte imprevedibili, quindi dobbiamo trattare questi casi e riprovare automaticamente le chiamate non riuscite.
Per gestire le chiamate fallite utilizzeremo il file tenacity
biblioteca. Tenacity fornisce decoratori di funzioni che ritenteranno automaticamente la nostra chiamata di funzione nel caso in cui generi un’eccezione.
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
Per fornire una funzionalità di riprova alle nostre chiamate dovremo inserire il file @retry
decoratore. Usandolo senza parametri aggiuntivi, la funzione riproverà immediatamente e indefinitamente una volta fallita. Questo non va bene per alcuni motivi.
Uno è che la nostra chiamata di funzione potrebbe fallire a causa del sovraccarico del server, il che rende ragionevole attendere un po’ di tempo prima di riprovare. Per indicare il tempo di attesa utilizzeremo l’approccio del backoff esponenziale tramite il parametro wait=wait_random_exponential(min=min_value, max=max_value)
. Ciò aumenterà il tempo di attesa quanto più la funzione fallisce.
Una cosa facoltativa è registrare i messaggi ogni volta che si verifica un nuovo tentativo. Possiamo farlo fornendo qualche funzione al parametro before_sleep
. Qui useremo il print
funzione, tuttavia, un modo migliore è utilizzare il file logging
modulo e passare a logging.error
O logging.debug
funzione su questo parametro.
Per dimostrare genereremo eccezioni casuali.
import randomclass ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.2:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*(get_completion(content, session, semaphore, progress_log) for content in content_list))
start_time = time.perf_counter()
completion_list = await get_completion_list(("ping", "pong")*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
<RetryCallState 133364377433616: attempt #1; slept for 0.74; last result: failed (Exception Random exception)>
<RetryCallState 133364377424496: attempt #1; slept for 0.79; last result: failed (Exception Random exception)>
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Done runs 4/10.
Done runs 5/10.
Done runs 6/10.
Done runs 7/10.
Done runs 8/10.
Done runs 9/10.
Done runs 10/10.
Time elapsed: 1.1305301820011664 seconds.
('Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!')
Ciò farà sì che la nostra funzione attenda un po’ di tempo prima di riprovare. Tuttavia, il motivo del fallimento può essere sistematico, ad esempio a causa di un tempo di inattività del server o di un carico utile errato. In questo caso, vogliamo che il numero di tentativi sia limitato. Possiamo farlo con il parametro stop=stop_after_attempt(n)
.
import randomclass ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.9:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*(get_completion(content, session, semaphore, progress_log) for content in content_list))
start_time = time.perf_counter()
completion_list = await get_completion_list(("ping", "pong")*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
<RetryCallState 133364608660048: attempt #1; slept for 0.1; last result: failed (Exception Random exception)>
<RetryCallState 133364377435680: attempt #1; slept for 0.71; last result: failed (Exception Random exception)>
<RetryCallState 133364377421472: attempt #1; slept for 0.17; last result: failed (Exception Random exception)>
<RetryCallState 133364377424256: attempt #1; slept for 0.37; last result: failed (Exception Random exception)>
<RetryCallState 133364377430928: attempt #1; slept for 0.87; last result: failed (Exception Random exception)>
<RetryCallState 133364377420752: attempt #1; slept for 0.42; last result: failed (Exception Random exception)>
<RetryCallState 133364377422576: attempt #1; slept for 0.47; last result: failed (Exception Random exception)>
<RetryCallState 133364377431312: attempt #1; slept for 0.11; last result: failed (Exception Random exception)>
<RetryCallState 133364377425840: attempt #1; slept for 0.69; last result: failed (Exception Random exception)>
<RetryCallState 133364377424592: attempt #1; slept for 0.89; last result: failed (Exception Random exception)>
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/_asyncio.py in __call__(self, fn, *args, **kwargs)
49 try:
---> 50 result = await fn(*args, **kwargs)
51 except BaseException: # noqa: B9025 frames
Exception: Random exception
The above exception was the direct cause of the following exception:
RetryError Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/__init__.py in iter(self, retry_state)
324 if self.reraise:
325 raise retry_exc.reraise()
--> 326 raise retry_exc from fut.exception()
327
328 if self.wait:
RetryError: RetryError(<Future at 0x794b5057a590 state=finished raised Exception>)
Con questo parametro impostare a RetryError
aumenterà una volta che il numero di tentativi raggiunge il valore massimo. Tuttavia, potrebbe essere il caso di voler continuare l’esecuzione senza generare un’eccezione, salvando semplicemente un file None
valore alla chiamata restituita per gestirla in seguito. Per farlo possiamo utilizzare la funzione callback retry_error_callback
per restituire semplicemente il file None
valore nel caso a RetryError
si verifica l’errore:
import randomclass ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
#await asyncio.sleep(1)
if random.random() < 0.7:
raise Exception("Random exception")
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(1)) as session:
return await asyncio.gather(*(get_completion(content, session, semaphore, progress_log) for content in content_list))
start_time = time.perf_counter()
completion_list = await get_completion_list(("ping", "pong")*5, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
print(completion_list)
<RetryCallState 133364377805024: attempt #1; slept for 0.22; last result: failed (Exception Random exception)>
<RetryCallState 133364377799456: attempt #1; slept for 0.53; last result: failed (Exception Random exception)>
<RetryCallState 133364377801328: attempt #1; slept for 0.24; last result: failed (Exception Random exception)>
<RetryCallState 133364377810208: attempt #1; slept for 0.38; last result: failed (Exception Random exception)>
<RetryCallState 133364377801616: attempt #1; slept for 0.54; last result: failed (Exception Random exception)>
<RetryCallState 133364377422096: attempt #1; slept for 0.59; last result: failed (Exception Random exception)>
<RetryCallState 133364377430592: attempt #1; slept for 0.07; last result: failed (Exception Random exception)>
<RetryCallState 133364377425648: attempt #1; slept for 0.05; last result: failed (Exception Random exception)>
Done runs 1/10.
Done runs 2/10.
Done runs 3/10.
Time elapsed: 2.6409040250000544 seconds.
('Pong!', 'Ping!', None, None, None, None, None, 'Ping!', None, None)
Con questo, None
verranno restituiti i valori invece di generare errori.
Un problema non ancora gestito è il problema della connessione bloccata. Ciò accade quando eseguiamo una richiesta e per qualche motivo l’host mantiene la connessione ma non fallisce né restituisce qualcosa. Per gestire questi casi dobbiamo inserire un timeout da restituire nel caso in cui la chiamata non restituisca un valore entro un determinato periodo. Per farlo possiamo usare il file timeout
parametro da aiohttp
biblioteca insieme al aiohttp.ClientTimeout
classe. Nel caso in cui si verifichi un timeout qui, a TimeoutError
verrà sollevato, che verrà poi gestito dal file retry
decoratore da tenacity
ed eseguire nuovamente automaticamente la funzione.
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list, max_parallel_calls):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
return await asyncio.gather(*(get_completion(content, session, semaphore, progress_log) for content in content_list))
start_time = time.perf_counter()
completion_list = await get_completion_list(("ping", "pong")*100, 100)
print("Time elapsed: ", time.perf_counter() - start_time, "seconds.")
<RetryCallState 133364375201936: attempt #1; slept for 0.57; last result: failed (TimeoutError )>
Time elapsed: 12.705538211999738 seconds.
Grande! Ora disponiamo di un modo affidabile per eseguire più richieste parallele che riproveranno automaticamente nel caso in cui si verifichi un errore e ritornino None
valori nel caso in cui il guasto sia sistematico. Quindi il codice finale sarà il seguente:
import asyncio
import aiohttp
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
class ProgressLog:
def __init__(self, total):
self.total = total
self.done = 0
def increment(self):
self.done = self.done + 1
def __repr__(self):
return f"Done runs {self.done}/{self.total}."
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
async with semaphore:
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
"model": "gpt-3.5-turbo",
"messages": ({"role": "user", "content": content}),
"temperature": 0
}) as resp:
response_json = await resp.json()
progress_log.increment()
print(progress_log)
return response_json("choices")(0)('message')("content")
async def get_completion_list(content_list, max_parallel_calls, timeout):
semaphore = asyncio.Semaphore(value=max_parallel_calls)
progress_log = ProgressLog(len(content_list))
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(timeout)) as session:
return await asyncio.gather(*(get_completion(content, session, semaphore, progress_log) for content in content_list))
In sintesi, abbiamo implementato le seguenti funzionalità:
- Chiamate asincrone per ridurre i tempi di attesa.
- Registrazione dell’avanzamento delle chiamate asincrone.
- Attiva automaticamente i tentativi quando una chiamata fallisce.
- Restituisce Nessun valore nel caso in cui gli errori siano sistematici.
- Riprovare una chiamata quando scade e non restituisce nulla.
Se hai domande, hai trovato qualche errore o hai qualche idea su come migliorarlo lascia un commento qui sotto!
Fonte: towardsdatascience.com