Parallelizzare Python su Spark: opzioni per la concorrenza con Panda |  di Matt Collins |  Novembre 2023

 | Intelligenza-Artificiale

Sfrutta i vantaggi di Spark quando lavori con Panda

fotografato da Florian Steciuk SU Unsplash

Nel mio ruolo precedente, ho dedicato del tempo a lavorare su un progetto interno per prevedere il futuro utilizzo dello spazio di archiviazione su disco per i nostri clienti di servizi gestiti su migliaia di dischi. Ogni disco è soggetto ai propri modelli di utilizzo e ciò significa che abbiamo bisogno di un modello di machine learning separato per ciascun disco che prenda dati storici per prevedere l’utilizzo futuro disco per disco. Sebbene eseguire questa previsione e scegliere l’algoritmo corretto per il lavoro sia di per sé una sfida, eseguirla su larga scala presenta i suoi problemi.

Per sfruttare infrastrutture più sofisticate, possiamo cercare di abbandonare le previsioni sequenziali e accelerare il funzionamento delle previsioni parallelizzando il carico di lavoro. Questo post del blog mira a confrontare le UDF di Pandas e il modulo “concurrent.futures”, due approcci di elaborazione simultanea, e a determinare i casi d’uso per ciascuno.

Pandas è un pacchetto gateway in Python per lavorare con set di dati nello spazio di analisi. Lavorando con DataFrames, siamo in grado di profilare i dati e valutarne la qualità, eseguire analisi esplorative dei dati, creare visualizzazioni descrittive dei dati e prevedere le tendenze future.

Sebbene si tratti certamente di un ottimo strumento, la natura a thread singolo di Python significa che può scalare scarsamente quando si lavora con set di dati più grandi o quando è necessario eseguire la stessa analisi su più sottoinsiemi di dati.

Nel mondo dei big data, ci aspettiamo un approccio un po’ più sofisticato, poiché ci concentriamo ulteriormente sulla scalabilità per mantenere ottime prestazioni. Spark, tra gli altri linguaggi, ci consente di sfruttare l’elaborazione distribuita per aiutarci a elaborare strutture di dati più grandi e complicate.

Prima di approfondire questo esempio specifico, possiamo generalizzare alcuni casi d’uso che riassumono la necessità di concorrenza nell’elaborazione dei dati:

  • Applicare trasformazioni uniformi a più file di dati
  • Prevedere valori futuri per diversi sottoinsiemi di dati
  • Ottimizza gli iperparametri per il modello di machine learning e seleziona la configurazione più efficiente

Quando aumentiamo la nostra esigenza di eseguire carichi di lavoro come quelli suggeriti sopra e nel nostro caso, l’approccio più semplice in Python e Panda è elaborare questi dati in sequenza. Per il nostro esempio, eseguiremo il flusso precedente per un disco alla volta.

Nel nostro esempio, disponiamo di dati per migliaia di dischi che mostrano lo spazio libero registrato nel tempo e vogliamo prevedere i futuri valori di spazio libero per ciascuno dei dischi.

Per dipingere il quadro un po’ più chiaramente, ho fornito un file CSV contenente 1.000 dischi ciascuno con un mese di dati storici per lo spazio libero misurato in GB. Si tratta di dimensioni sufficienti per consentirci di vedere l’impatto dei diversi approcci alla previsione su larga scala.

Immagine dell’autore: esempio DataFrame

Per un problema di serie temporali come questo, stiamo cercando di utilizzare dati storici per prevedere le tendenze future e vogliamo capire quale algoritmo di Machine Learning (ML) sarà più appropriato per ciascun disco. Strumenti come AutoML sono ottimi per questo quando si cerca di determinare il modello appropriato per un set di dati, ma qui abbiamo a che fare con 1.000 set di dati, quindi questo è eccessivo per il nostro confronto.

In questo caso, limiteremo a due il numero di algoritmi che vogliamo confrontare e vedremo quale è il modello più appropriato da utilizzare, per ciascun disco, utilizzando il Root Mean Squared Error (RMSE) come metrica di convalida. Ulteriori informazioni su RMSE possono essere trovate Qui . Questi algoritmi sono:

  • Regressione lineare
  • Fbprophet (adattando i dati a una linea più complessa)
  • Il modello di previsione delle serie temporali di Facebook.
  • Costruito per previsioni più complesse con iperparametri per la stagionalità.

Ora abbiamo tutti i componenti pronti se volessimo prevedere il futuro spazio libero su un singolo disco. L’insieme di azioni segue il flusso seguente:

Immagine dell’autore: Ciclo di vita dei dati

Ora vogliamo estenderlo, eseguendo questo flusso per più dischi, 1.000 nel nostro esempio.

Nell’ambito della nostra revisione, confronteremo le prestazioni del calcolo dei valori RMSE per i diversi algoritmi su scale diverse. Pertanto, ho creato un sottoinsieme dei primi 100 dischi per imitarlo.

Ciò dovrebbe fornire alcuni spunti interessanti sulle prestazioni su set di dati di diverse dimensioni, eseguendo operazioni di varia complessità.

Python è notoriamente a thread singolo e di conseguenza non utilizza tutte le risorse di calcolo disponibili in un determinato momento.

Di conseguenza, ho visto tre opzioni:

  1. Implementa un ciclo for per calcolare le previsioni in sequenza, adottando l’approccio a thread singolo.
  2. Usa Python futuri modulo per eseguire più processi contemporaneamente.
  3. Utilizza le UDF di Pandas (funzioni definite dall’utente) per sfruttare l’elaborazione distribuita in PySpark mantenendo la sintassi di Pandas e i pacchetti compatibili.

Volevo fare un confronto abbastanza approfondito in diverse condizioni ambientali, quindi ho utilizzato un cluster Databricks a nodo singolo e un altro cluster Databricks con 4 nodi di lavoro per sfruttare Spark per il nostro approccio UDF di Pandas.

Seguiremo il seguente approccio per valutare l’idoneità dei modelli di regressione lineare e fbprophet per ciascun disco:

  • Suddividere i dati in set di training e test
  • Utilizzare il set di training come input e prevedere le date del set di test
  • Confrontare i valori previsti con i valori effettivi nel set di test per ottenere un punteggio RMSE (Root Mean Squared Error).

Restituiremo due cose nei nostri output: un DataFrame modificato con le previsioni, che ci offre l’ulteriore vantaggio di tracciare e confrontare i valori previsti con quelli effettivi, e un DataFrame contenente i punteggi RMSE per ciascun disco e algoritmo.

Le funzioni per farlo sono le seguenti:

Confronteremo i tre approcci sopra descritti. Abbiamo alcuni scenari diversi, quindi possiamo compilare una tabella con gli elementi rispetto ai quali stiamo raccogliendo i risultati:

Con le seguenti combinazioni:

Metodo

  • Sequenziale
  • futuri
  • UDF Panda

Algoritmo

  • Regressione lineare
  • Un profeta
  • Combinato (entrambi gli algoritmi per ciascun disco): il modo più efficiente per effettuare un confronto.

Modalità cluster

  • Cluster a nodo singolo
  • Cluster standard con 4 lavoratori

Numero di dischi

I risultati sono presentati in questo formato nell’appendice di questo blog, se desideri dare un’occhiata più approfondita.

Metodo 1: sequenziale

Metodo 2: futures.concorrenti

Ci sono due opzioni nell’utilizzo di questo modulo: parallelizzare operazioni ad uso intensivo di memoria (utilizzando ThreadPoolExecutor) o operazioni ad uso intensivo di CPU (ProcessPoolExecutor). Una spiegazione descrittiva di ciò si trova nel seguito blog. Poiché lavoreremo su un problema che richiede un uso intensivo della CPU, ProcessPoolExecutor è adatto a ciò che stiamo cercando di ottenere.

Metodo 3: UDF Pandas

Ora cambieremo marcia e utilizzeremo Spark e sfrutteremo il calcolo distribuito per aumentare la nostra efficienza. Poiché utilizziamo Databricks, la maggior parte della configurazione di Spark viene eseguita per noi, ma sono presenti alcune modifiche alla gestione generale dei dati.

Innanzitutto, importa i dati in un PySpark DataFrame:

Utilizzeremo la mappa raggruppata Panda UDF (PandasUDFType.GROUPED_MAP), poiché vogliamo passare un DataFrame e restituire un DataFrame. Da Apache Spark 3.0 non abbiamo più bisogno di dichiarare esplicitamente questo decoratore!

Dobbiamo suddividere le nostre funzioni fbprophet, regressione e RMSE per le UDF di Pandas a causa della strutturazione di DataFrame in PySpark, ma non è necessaria una massiccia revisione del codice per raggiungere questo obiettivo.

Possiamo quindi utilizzare applyInPandas per produrre i nostri risultati.

Nota: gli esempi precedenti dimostrano solo il processo per utilizzare la regressione lineare per la leggibilità. Si prega di vedere l’intero taccuino per la completa dimostrazione di ciò.

Abbiamo creato grafici per i diversi metodi e le diverse configurazioni dell’ambiente, quindi abbiamo raggruppato i dati per algoritmo e numero di dischi per un facile confronto.

Si prega di notare che i risultati tabellari si trovano nell’appendice di questo post.

Ho riassunto i punti salienti di questi risultati di seguito:

  • Come previsto, prevedere 1.000 dischi rispetto a 100 dischi è (generalmente) un processo che richiede più tempo.
  • L’approccio sequenziale è generalmente il più lento, non essendo in grado di sfruttare le risorse sottostanti in modo efficiente.
  • Le UDF di Panda sono piuttosto inefficienti nei compiti più piccoli e semplici. Il sovraccarico della trasformazione dei dati è più costoso: la parallelizzazione aiuta a compensare questo.
  • Sia l’approccio sequenziale che quello concurrent.futures ignorano il clustering disponibile in Databricks, perdendo risorse di calcolo aggiuntive.

Il contesto gioca sicuramente un ruolo importante nel definire l’approccio più efficace, ma dato che Databricks e Spark vengono spesso utilizzati per problemi di Big Data, possiamo vedere il vantaggio dell’utilizzo delle UDF di Panda con i set di dati più grandi e complessi che abbiamo visto qui oggi.

L’utilizzo di un ambiente Spark per set di dati più piccoli può essere eseguito in modo altrettanto efficiente su una configurazione di calcolo più piccola (e meno costosa!) con grande efficienza, come dimostrato dall’uso del modulo concurrent.futures, quindi tienilo a mente quando progetti la tua soluzione.

Se hai familiarità con Python e Panda, nessuno dei due approcci dovrebbe essere una curva di apprendimento faticosa per allontanarsi dall’approccio sequenziale del ciclo for visto nei tutorial per principianti.

Non lo abbiamo approfondito in questo post poiché ho riscontrato discrepanze e incompatibilità con la versione corrente, ma il recente modulo pyspark.pandas sarà sicuramente più comune in futuro e rappresenta un approccio a cui prestare attenzione. Questa API (insieme a Koalas, sviluppata dai ragazzi di Databricks, ma ora ritirata) sfrutta la familiarità dei Panda con i vantaggi sottostanti di Spark.

Per dimostrare l’effetto che stiamo cercando di ottenere, ci siamo limitati a guardare i valori RMSE prodotti per ciascun disco, piuttosto che prevedere effettivamente un futuro insieme di valori di serie temporali. Il quadro che abbiamo creato qui può essere applicato allo stesso modo per questo, con la logica per determinare se la metrica di valutazione (insieme ad altra logica, come le limitazioni fisiche di un disco) è appropriata in ciascun caso e per prevedere il futuro valori, ove possibile, utilizzando l’algoritmo determinato.

Come sempre, il taccuino lo potete trovare nel mio GitHub .

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *