Dask DataFrame è veloce adesso |  di Patrick Hoefler |  Maggio 2024

 | Intelligenza-Artificiale

In che modo Dask consente l'elaborazione efficiente dei dati su scala di terabyte

Performance Improvements for Dask DataFrames — All Images created by the Author

introduzione

Dask DataFrame scala i DataFrame panda per operare su scala 100 GB-100 TB.

Storicamente, Dask era piuttosto lento rispetto ad altri strumenti in questo ambito (come Spark). Grazie a una serie di miglioramenti incentrati sulle prestazioni, ora è piuttosto veloce (circa 20 volte più veloce di prima). La nuova implementazione ha portato Dask dall'essere distrutto da Spark su ogni benchmark al superare regolarmente Spark sulle query TPC-H con un margine significativo.

I carichi di lavoro Dask DataFrame hanno avuto molte difficoltà. Le prestazioni e l'utilizzo della memoria erano comunemente riscontrati come punti critici, lo spostamento era instabile per set di dati più grandi, rendendo difficile la scalabilità orizzontale. Scrivere codice efficiente richiedeva una comprensione eccessiva delle parti interne di Dask.

La nuova implementazione ha cambiato tutto questo. Le cose che non funzionavano sono state completamente riscritte da zero e le implementazioni esistenti sono state migliorate. Ciò pone Dask DataFrames su una solida base che consentirà cicli di iterazione più rapidi in futuro.

Esamineremo le tre modifiche più importanti, illustrando il modo in cui influiscono sulle prestazioni e semplificano l'utilizzo efficiente di Dask, anche per gli utenti che non conoscono l'elaborazione distribuita. Discuteremo anche i piani per miglioramenti futuri.

Faccio parte del core team di Dask. Sono un ingegnere open source per Arrotolato ed è stato coinvolto nell'implementazione di alcuni dei miglioramenti discussi in questo post.

1. Supporto per le frecce Apache: tipo di dati stringa efficiente

Un Dask DataFrame è costituito da molti DataFrame panda. Storicamente, i panda utilizzavano NumPy per i dati numerici, ma gli oggetti Python per i dati di testo, che sono inefficienti e aumentano l’utilizzo della memoria. Anche le operazioni sui dati degli oggetti detengono il GIL, il che non ha molta importanza per i panda, ma è una catastrofe per le prestazioni con un sistema parallelo come Dask.

La versione 2.0 di Panda ha introdotto il supporto per i tipi di dati Arrow di uso generale, quindi Dask ora utilizza le stringhe supportate da PyArrow per impostazione predefinita. Questi sono tanto Meglio. Le stringhe PyArrow riducono l'utilizzo della memoria fino all'80% e sbloccano il multi-threading per le operazioni sulle stringhe. I carichi di lavoro che in precedenza avevano difficoltà con la memoria disponibile ora si adattano comodamente a molto meno spazio e sono molto più veloci perché non riversano più costantemente i dati in eccesso sul disco.

Memory Usage of the Legacy DataFrames compared with Arrow Strings

Ho scritto un post a riguardo indaga le integrazioni di Arrow più in dettaglio se vuoi saperne di più.

2. Join più veloci con un nuovo algoritmo Shuffle

Il mescolamento è un componente essenziale dei sistemi distribuiti per consentire l'ordinamento, l'unione e il raggruppamento complesso per operazioni. Si tratta di un'operazione completa che richiede un uso intensivo della rete e che spesso rappresenta il componente più costoso di un flusso di lavoro. Abbiamo riscritto il sistema di mescolamento di Dask, che ha un notevole impatto sulle prestazioni complessive, in particolare su carichi di lavoro complessi e ad alta intensità di dati.

Un'operazione di riproduzione casuale è intrinsecamente un'operazione di comunicazione tutto a tutti in cui ogni partizione di input deve fornire una piccola porzione di dati a ogni partizione di output. Dask stava già utilizzando il proprio algoritmo basato su attività che riusciva a ridurre il O(n * n) complessità del compito a O(log(n) * n) Dove n è il numero di partizioni. Si è trattato di una drastica riduzione del numero di attività, ma il ridimensionamento non lineare alla fine non ha consentito a Dask di elaborare set di dati arbitrariamente grandi.

Dask ha introdotto un nuovo metodo shuffle P2P (peer-to-peer) che ha ridotto la complessità delle attività a O(n) che si adatta linearmente alla dimensione del set di dati e alla dimensione del cluster. Incorpora inoltre un'efficiente integrazione del disco che consente di mescolare facilmente set di dati che sono molto più grandi della memoria. Il nuovo sistema è estremamente stabile e “funziona” su qualsiasi scala di dati.

Memory Usage of the Legacy Shuffle compared with P2P

Ha scritto uno dei miei colleghi un post a riguardo che include una spiegazione più ampia e molti dettagli tecnici.

3. Ottimizzatore

Dask stesso è pigro, il che significa che registra l'intera query prima di eseguire qualsiasi lavoro effettivo. Questo è un concetto potente che consente molte ottimizzazioni, ma storicamente Dask non traeva vantaggio da questa conoscenza in passato. Dask ha anche fatto un pessimo lavoro nel nascondere le complessità interne e ha lasciato gli utenti da soli mentre affrontavano le difficoltà del calcolo distribuito ed eseguivano query su larga scala. Ha reso difficile la scrittura di codice efficiente per i non esperti.

L'uscita di Dask è prevista per marzo include una reimplementazione completa dell'API DataFrame per supportare l'ottimizzazione delle query. Questo è un grosso problema. Il nuovo motore è incentrato su un ottimizzatore di query che riscrive il nostro codice per renderlo più efficiente e meglio adattato ai punti di forza di Dask. Immergiamoci in alcune strategie di ottimizzazione, nel modo in cui rendono il nostro codice più veloce e scalabile meglio.

Inizieremo con un paio di ottimizzazioni per scopi generali utili per ogni strumento simile a DataFrame prima di immergerci in tecniche più specifiche adattate ai sistemi distribuiti in generale e a Dask in modo più specifico.

3.1 Proiezione della colonna

La maggior parte dei set di dati ha più colonne di quelle effettivamente necessarie. Eliminarli richiede lungimiranza (“Di quali colonne avrò bisogno per questa query? 🤔”), quindi la maggior parte delle persone non ci pensa quando carica i dati. Ciò è negativo per le prestazioni perché portiamo con noi molti dati di cui non abbiamo bisogno, rallentando tutto. La proiezione delle colonne elimina le colonne non appena non sono più necessarie. È un'ottimizzazione semplice, ma estremamente vantaggiosa.

L'implementazione legacy legge sempre tutte le colonne dall'archivio e le elimina solo se lo chiediamo attivamente. Operare semplicemente con meno dati è un grande vantaggio in termini di prestazioni e utilizzo della memoria.

L'ottimizzatore esamina la query e determina quali colonne sono necessarie per ciascuna operazione. Possiamo immaginarlo come guardare il passaggio finale della nostra query e poi lavorare all'indietro passo dopo passo fino all'origine dati e inserire operazioni di rilascio per eliminare le colonne non necessarie.

We only require a subset of columns in the end. Replace doesn't need access to all columns, so we can drop unnecessary columns directly in the IO step.

3.2 Filtro Pushdown

Il pushdown del filtro è un'altra ottimizzazione di uso generale con lo stesso obiettivo della proiezione delle colonne: operare su meno dati. L'implementazione legacy mantiene semplicemente i filtri dove li abbiamo inseriti. La nuova implementazione esegue le operazioni di filtro il prima possibile mantenendo gli stessi risultati.

L'ottimizzatore identifica ogni filtro nella nostra query ed esamina l'operazione precedente per vedere se possiamo spostare il filtro più vicino all'origine dati. Lo ripeterà finché non troverà un'operazione che non può essere cambiata con un filtro. Questo è un po' più difficile delle proiezioni di colonna, perché dobbiamo assicurarci che le operazioni non modifichino i valori del nostro DataFrame. Ad esempio, cambiare un filtro e un'operazione di unione va bene (i valori non cambiano), ma cambiare un filtro e un'operazione di sostituzione non è valido, perché i nostri valori potrebbero cambiare e le righe che in precedenza sarebbero state filtrate ora non lo saranno , o vice versa.

Initially, the filter happens after the Dropna, but we can execute the filter before Dropna without changing the result. This allows us to push the filter into the IO step.

Inoltre, se il nostro filtro è sufficientemente potente, possiamo potenzialmente eliminare file completi nella fase IO. Questo è lo scenario migliore, in cui un filtro precedente apporta un enorme miglioramento delle prestazioni e richiede persino la lettura di meno dati dall'archiviazione remota.

3.3 Ridimensionamento automatico delle partizioni

Oltre a implementare le tecniche di ottimizzazione comuni descritte sopra, abbiamo anche migliorato un punto dolente comune specifico dei sistemi distribuiti in generale e degli utenti Dask in particolare: le dimensioni ottimali delle partizioni.

I Dask DataFrames sono costituiti da tanti piccoli panda DataFrames chiamati partizioni. Spesso, il numero di partizioni viene deciso per te e agli utenti Dask viene consigliato di “ripartizionarli” manualmente dopo aver ridotto o espanso i propri dati (ad esempio eliminando colonne, filtrando dati o espandendo con join) (vedere la sezione Documenti scuri). Senza questo passaggio aggiuntivo, il sovraccarico (solitamente piccolo) di Dask può diventare un collo di bottiglia se i DataFrames dei panda diventano troppo piccoli, rendendo i flussi di lavoro di Dask dolorosamente lenti.

Controllare manualmente la dimensione della partizione è un compito difficile di cui noi, come utenti Dask, non dovremmo preoccuparci. È anche lento perché richiede il trasferimento in rete di alcune partizioni. Dask DataFrame ora fa automaticamente due cose per aiutare quando le partizioni diventano troppo piccole:

  • Mantiene costante la dimensione di ciascuna partizione, in base al rapporto tra i dati che desideri calcolare e la dimensione del file originale. Se, ad esempio, filtri l'80% del set di dati originale, Dask combinerà automaticamente le partizioni più piccole risultanti in meno partizioni più grandi.
  • Combina partizioni troppo piccole in partizioni più grandi, in base a un minimo assoluto (il valore predefinito è 75 MB). Se, ad esempio, il tuo set di dati originale è suddiviso in tanti piccoli file, Dask li combinerà automaticamente.
We select two columns that take up 40 MB of memory out of the 200 MB from the whole file.

L'ottimizzatore esaminerà il numero di colonne e la dimensione dei dati al loro interno. Calcola un rapporto utilizzato per combinare più file in un'unica partizione.

The ratio of 40/200 results in combining five files into a single partition.

Questo passaggio è attualmente limitato alle operazioni di I/O (come la lettura di un set di dati Parquet), ma prevediamo di estenderlo ad altre operazioni che consentano di combinare in modo economico le partizioni.

3.4 Operazioni banali di unione e unione

Le operazioni di unione e unione sono in genere economiche su una singola macchina con panda ma costose in un ambiente distribuito. L'unione dei dati nella memoria condivisa è economica, mentre l'unione dei dati attraverso una rete è piuttosto lenta, a causa delle operazioni di riproduzione casuale spiegate in precedenza.

Questa è una delle operazioni più costose in un sistema distribuito. L'implementazione legacy ha attivato un trasferimento di rete di entrambi i DataFram di input per ogni operazione di unione. Questo a volte è necessario, ma molto costoso.

Both joins are performed on the same column. The left DataFrame is already properly partitioned after the first join, so we can avoid shuffling again with the new implementation.

L'ottimizzatore determinerà quando è necessario lo spostamento rispetto a quando è sufficiente un'unione banale perché i dati sono già allineati correttamente. Ciò può rendere le singole fusioni un ordine di grandezza più veloci. Questo vale anche per altre operazioni che normalmente richiedono una riproduzione casuale groupby().apply().

Le unioni Dask erano inefficienti, il che causava tempi di esecuzione lunghi. L'ottimizzatore risolve questo problema nel caso banale in cui queste operazioni si verificano una dopo l'altra, ma la tecnica non è ancora molto avanzata. C'è ancora molto potenziale di miglioramento.

The current implementation shuffles both branches that originate from the same table. Injecting a shuffle node further up avoids one of the expensive operations.

L'ottimizzatore esaminerà l'espressione e inserirà i nodi shuffle dove necessario per evitare mescolamenti non necessari.

Come si accumulano i miglioramenti rispetto all'implementazione legacy?

Dask ora è 20 volte più veloce di prima. Questo miglioramento si applica all'intera API DataFrame (non solo ai componenti isolati), senza regressioni note delle prestazioni. Dask ora esegue carichi di lavoro che prima era impossibile completare in un periodo di tempo accettabile. Questo aumento delle prestazioni è dovuto a numerosi miglioramenti sovrapposti uno sopra l'altro. Non si tratta di fare una cosa particolarmente bene, ma di non fare nulla particolarmente male.

Performance improvements on Query 3 of the TPC-H Benchmarks from https://github.com/coiled/benchmarks/tree/main/tests/tpch

Le prestazioni, pur essendo il miglioramento più allettante, non sono l'unica cosa che è migliorata. L'ottimizzatore nasconde molta complessità all'utente e rende molto più semplice la transizione da Panda a Dask perché ora è molto più difficile scrivere codice con prestazioni scadenti. L'intero sistema è più robusto.

Anche la nuova architettura dell'API è molto più semplice da utilizzare. L'implementazione legacy ha fatto trapelare molte complessità interne nelle implementazioni API di alto livello, rendendo le modifiche macchinose. I miglioramenti sono quasi banali da aggiungere ora.

Cosa succederà?

Dask DataFrame è cambiato molto negli ultimi 18 mesi. L'API legacy era spesso difficile da utilizzare e presentava difficoltà con la scalabilità orizzontale. La nuova implementazione ha eliminato gli aspetti che non funzionavano e ha migliorato le implementazioni esistenti. Il lavoro pesante è ora terminato, il che consente cicli di iterazione più rapidi per migliorare lo status quo. Ora è banale aggiungere miglioramenti incrementali.

Alcune cose che sono sulla tabella di marcia immediata:

  • Divisione automatica: questo è parzialmente implementato, ma c'è più possibilità di scegliere una dimensione della partizione più efficiente durante l'ottimizzazione.
  • Join più veloci: c'è ancora molta messa a punto da fare qui. Ad esempio, abbiamo un PR in volo con un miglioramento del 30-40%.
  • Partecipa al riordino: non lo facciamo ancora, ma è sulla tabella di marcia immediata

Questo articolo si concentra su una serie di miglioramenti apportati a Dask DataFrame e su quanto sia più veloce e affidabile di conseguenza. Se scegli tra Dask e altri popolari strumenti DataFrame, potresti anche considerare:

Grazie per aver letto. Sentiti libero di contattarci per condividere i tuoi pensieri e feedback.

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *