1,5 anni di conoscenza di Spark in 8 consigli |  di Michael Berk |  Dicembre 2023

 | Intelligenza-Artificiale

I miei insegnamenti dal coinvolgimento dei clienti di Databricks

distorsione dei dati della partizione spark ottimizza l'ottimizzazione della partizione dell'interfaccia utente pyspark sql python
Figura 1: uno schema tecnico di come scrivere apache spark. Immagine dell’autore.

In Databricks aiuto le grandi organizzazioni di vendita al dettaglio a distribuire e ridimensionare pipeline di dati e machine learning. Ecco gli 8 più importanti scintilla consigli/trucchi che ho imparato sul campo.

In questo post presupponiamo una conoscenza generale di Spark e della sua struttura, ma questo post dovrebbe essere accessibile a tutti i livelli.

Immergiamoci!

Rivediamo rapidamente cosa fa Spark…

Spark è un motore di elaborazione di big data. Richiede Python/Java/scala/R/SQL e converte il codice in un insieme di trasformazioni altamente ottimizzato.

distorsione dei dati della partizione spark ottimizza l'ottimizzazione della partizione dell'interfaccia utente pyspark sql python
Figura 2: configurazione spark driver e lavoratore. Immagine dell’autore.

Al suo livello più basso, spark crea attività, che sono trasformazioni parallelizzabili su partizioni dati. Queste attività vengono quindi distribuite da un nodo driver ai nodi di lavoro, che sono responsabili di sfruttare i core della CPU per completare le trasformazioni. Distribuendo le attività a un numero potenziale di lavoratori, Spark ci consente di scalare orizzontalmente e quindi supportare pipeline di dati complesse che sarebbero impossibili su una singola macchina.

Ok, spero che non tutte queste fossero informazioni nuove. In ogni caso, nelle sezioni seguenti rallenteremo un po’. Questi suggerimenti dovrebbero aiutare sia i principianti che gli intermedi a innescare la scintilla.

Spark è complesso. Per aiutare te e potenzialmente altri a comprenderne la struttura, sfruttiamo un’analogia straordinariamente valida presa in prestito da teoria delle code: Spark è un negozio di alimentari.

Quando si pensa alla componente di calcolo distribuito di Spark, ci sono tre componenti principali….

  • Partizioni dati: sottoinsiemi di righe dei nostri dati. Nel nostro negozio di alimentari, lo sono drogheria.
  • Attività Spark: trasformazioni di basso livello eseguite su una partizione dati. Nel nostro negozio di alimentari, lo sono clienti.
  • Nuclei: la parte dei tuoi processori che funzionano in parallelo. Nel nostro negozio di alimentari, lo sono cassieri.

Questo è tutto!

Ora, sfruttiamo questi concetti per parlare di alcuni fondamenti di Spark.

distorsione dei dati della partizione spark ottimizza l'ottimizzazione della partizione dell'interfaccia utente pyspark sql python
Figura 3: illustrazione dell’analogo del cassiere, in particolare per la distorsione dei dati. Immagine dell’autore.

Come mostrato nella figura 3, i nostri cassieri (core) possono elaborare solo un cliente (attività) alla volta. Inoltre, alcuni clienti hanno molti generi alimentari (conteggio delle righe di partizione), come dimostrato dal primo cliente alla cassa 2. Da queste semplici osservazioni…

  • Più cassieri (core), più clienti (attività) puoi elaborare in parallelo. Questo è scala orizzontale/verticale.
  • Se non hai abbastanza clienti (attività) per saturare i tuoi cassieri (core), pagherai affinché il cassiere si sieda lì. Ciò riguarda scalabilità automaticadimensionamento del cluster e dimensionamento delle partizioni.
  • Se i clienti (attività) hanno quantità di generi alimentari molto diverse (conteggio delle righe di partizione), noterai un utilizzo non uniforme dei tuoi cassieri. Questo è distorsione dei dati.
  • Migliori sono i tuoi cassieri (core), più velocemente potranno elaborare un singolo cliente (attività). Ciò riguarda l’aggiornamento del processore.
  • eccetera.

Dato che l’analogia deriva dalla teoria delle code, un campo direttamente correlato al calcolo distribuito, è piuttosto potente!

Usa questa analogia per eseguire il debug, comunicare e sviluppare spark.

L’errore più comune per i principianti di Spark è fraintendere la valutazione pigra.

Valutazione pigra significa che non verrà eseguita alcuna trasformazione dei dati finché non si richiama una raccolta in memoria. Esempi di metodi che richiamano una raccolta includono ma non sono limitati a…

  • .raccogliere(): porta DataFrame in memoria come elenco Python.
  • .spettacolo(): stampa il primo n righe del tuo DataFrame.
  • .contare(): ottieni il numero di righe del tuo DataFrame.
  • .Primo(): ottieni la prima riga del tuo DataFrame.

Il metodo di raccolta errato più comune è il ricorso alla leva finanziaria .count() durante un programma. Ogni volta che invochi una raccolta, tutte le trasformazioni upstream verranno ricalcolate da zero, quindi se hai 5 invocazioni di .count()il tuo programma verrà eseguito in modo asintotico 5 volte più a lungo.

Spark viene valutato pigramente! Le pipeline dovrebbero avere un unico flusso dalla(e) sorgente(i) alla(e) destinazione(i).

Un problema sorprendentemente comune che si presenta quando si lavora con grandi organizzazioni è che perdono di vista il quadro generale e quindi ottimizzano le pipeline in modo inefficiente.

Ecco come le pipeline dovrebbero essere ottimizzate per la maggior parte dei casi d’uso…

  1. Chiedi se dobbiamo realizzare il progetto. In parole povere, pensa a cosa ottieni effettivamente dall’ottimizzazione di una pipeline. Se prevedi di migliorare l’autonomia del 20% e la pipeline costa $ 100 per l’esecuzione, dovresti investire il tuo costosissimo stipendio da ingegnere dati per risparmiare $ 20 per esecuzione? Forse. Forse no.
  2. Cerca i frutti a bassa pendenza nel codice. Dopo aver accettato di realizzare il progetto, controlla se il codice presenta difetti evidenti. Esempi sono l’uso improprio della valutazione pigra, trasformazioni non necessarie e ordinamento errato delle trasformazioni.
  3. Ottieni il lavoro in esecuzione secondo lo SLA sfruttando l’elaborazione. Dopo aver verificato che il codice sia relativamente efficiente, è sufficiente eseguire il calcolo per risolvere il problema in modo da poter 1) rispettare lo SLA e 2) raccogliere statistiche dall’interfaccia utente di Spark.
  4. Fermare. Se stai saturando adeguatamente il tuo calcolo e i costi non sono eccessivi, apporta alcuni miglioramenti di calcolo dell’ultimo minuto e poi interrompi. Il tuo tempo è prezioso. Non sprecarlo risparmiando dollari quando potresti creare migliaia di dollari altrove.
  5. Profonda immersione. Infine, se hai davvero bisogno di approfondire perché i costi sono inaccettabili, rimboccati le maniche e ottimizza dati, codice ed elaborazione.

La bellezza di questo framework è che 1–4 richiedono solo una conoscenza superficiale di spark e sono molto veloci da eseguire; a volte è possibile raccogliere informazioni sui passaggi da 1 a 4 durante una chiamata di 30 minuti. Il quadro garantisce inoltre che ci fermeremo non appena lo saremo abbastanza buono. Infine, se è necessario il passaggio 5, possiamo delegarlo ai membri del team che sono più forti nella scintilla.

Trovando tutti i modi per evitare di ottimizzare eccessivamente una pipeline, risparmi ore preziose per gli sviluppatori.

La fuoriuscita del disco è il motivo più comune per cui i processi Spark vengono eseguiti lentamente.

È un concetto molto semplice. Spark è progettato per sfruttare l’elaborazione in memoria. Se non disponi di memoria sufficiente, Spark proverà a scrivere i dati aggiuntivi su disco per evitare l’arresto anomalo del processo. Questo si chiama fuoriuscita del disco.

distorsione dei dati della partizione spark ottimizza l'ottimizzazione della partizione dell'interfaccia utente pyspark sql python
Figura 4: schermata dell’interfaccia utente di Spark che evidenzia la fuoriuscita del disco. Immagine dell’autore.

La scrittura e la lettura dal disco sono lente, quindi dovrebbero essere evitate. Se vuoi imparare come identificare e mitigare le fuoriuscite, segui questo tutorial. Tuttavia, alcuni metodi molto comuni e semplici per mitigare le fuoriuscite sono…

  1. Elabora meno dati per attività, cosa che può essere ottenuta modificando il conteggio delle partizioni tramite partizioni.spark.shuffle O distribuzione.
  2. Aumenta il rapporto RAM/core nel tuo computer.

Se vuoi che il tuo lavoro venga eseguito in modo ottimale, evita le fuoriuscite.

Che tu stia utilizzando scala, Java, Python, SQL o R, Spark sfrutterà sempre le stesse trasformazioni dietro le quinte. Quindi, usa la lingua giusta per il tuo compito.

SQL è il “linguaggio” meno dettagliato tra tutti i linguaggi Spark supportati per molte operazioni! In modo più tangibile:

Ecco due rapidi esempi…

# Column rename and cast with SQL
df = df.selectExpr((f"{c}::int as {c}_abc" for c in df.columns))

# Column rename and cast with native spark
for c in df.columns:
df = df.withColumn(f"{c}_abc", F.col(c).cast("int")).drop(c)

# Window functions with SQL
df.withColumn("running_total", expr(
"sum(value) over (order by id rows between unbounded preceding and current row)"
))

# Window functions with native spark
windowSpec = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_running_total_native = df.withColumn("running_total", F.sum("value").over(windowSpec))

Usa SQL.

Hai bisogno di leggere un mucchio di file di dati archiviati in una directory complessa? Se è così, usa la scintilla estremamente potente leggere le opzioni.

La prima volta che ho riscontrato questo problema, ho riscritto os.walk per lavorare con il mio fornitore di servizi cloud dove sono stati archiviati i dati. Ho mostrato con molto orgoglio questo metodo al mio partner di progetto che ha semplicemente detto “permettimi di condividere il mio schermo” e ha continuato a presentarmi i filtri glob.

# Read all parquet files in the directory (and subdirectories)
df = spark.read.load(
"examples/src/main/resources/dir1",
format="parquet",
pathGlobFilter="*.parquet"
)

Quando ho applicato il filtro glob mostrato sopra invece del mio os.walk personalizzato, l’operazione di acquisizione è stata 10 volte più veloce.

Spark ha parametri potenti. Controlla se esiste la funzionalità desiderata prima di creare implementazioni su misura.

I loop sono quasi sempre dannosi per stimolare le prestazioni. Ecco perché…

Spark prevede due fasi principali: pianificazione ed esecuzione. Nella fase di pianificazione, Spark crea un grafico aciclico diretto (DAG) che indica come verranno eseguite le trasformazioni specificate. La fase di pianificazione è relativamente costosa e talvolta può richiedere diversi secondi, quindi è consigliabile eseguirla il meno frequentemente possibile.

Parliamo di un caso d’uso in cui è necessario scorrere molti DataFrame, eseguire trasformazioni costose e quindi aggiungerli a una tabella.

Innanzitutto, esiste il supporto nativo per quasi tutti i casi d’uso iterativi, in particolare UDF dei pandafunzioni finestra e join. Ma, se hai veramente bisogno di un ciclo, ecco come richiamare un’unica fase di pianificazione e quindi ottenere tutte le trasformazioni in un singolo DAG.

import functools
from pyspark.sql import DataFrame

paths = get_file_paths()

# BAD: For loop
for path in paths:
df = spark.read.load(path)
df = fancy_transformations(df)
df.write.mode("append").saveAsTable("xyz")

# GOOD: functools.reduce
lazily_evaluated_reads = (spark.read.load(path) for path in paths)
lazily_evaluted_transforms = (fancy_transformations(df) for df in lazily_evaluated_reads)
unioned_df = functools.reduce(DataFrame.union, lazily_evaluted_transforms)
unioned_df.write.mode("append").saveAsTable("xyz")

La prima soluzione utilizza un ciclo for per scorrere i percorsi, eseguire trasformazioni fantasiose, quindi aggiungere alla nostra tabella delta di interesse. Nel secondo, memorizziamo un elenco di DataFrames valutati pigramente, applichiamo trasformazioni su di essi, quindi li riduciamo tramite un’unione, eseguendo un singolo piano spark e scriviamo.

Possiamo effettivamente vedere la differenza nell’architettura sul backend tramite l’interfaccia utente Spark…

distorsione dei dati della partizione spark ottimizza l'ottimizzazione della partizione dell'interfaccia utente pyspark sql python
Figura 5: DAG spark per ciclo for rispetto a functools.reduce. Immagine dell’autore.

Nella figura 5, il DAG a sinistra corrispondente al ciclo for avrà 10 stadi. Tuttavia, il DAG a destra corrispondente a functools.reduce avrà un’unica fase e quindi potrà essere elaborato più facilmente in parallelo.

Per un semplice caso d’uso di lettura di 400 tabelle delta univoche e quindi di aggiunta a una tabella delta, questo metodo era 6 volte più veloce di un ciclo for.

Diventa creativo per creare un singolo DAG spark.

Non si tratta di pubblicità.

Spark è un software consolidato e quindi ben documentato. Gli LLM, in particolare GPT-4, sono davvero bravi a distillare informazioni complesse in spiegazioni comprensibili e concise. Dal rilascio di GPT-4, non ho realizzato un progetto Spark complesso in cui non facessi molto affidamento su GPT-4.

distorsione dei dati della partizione spark ottimizza l'ottimizzazione della partizione dell'interfaccia utente pyspark sql python
Figura 6: esempio di output GPT-4 sull’impatto sulla dimensione della partizione dei dati in Spark. Immagine dell’autore.

Tuttavia, affermando ciò che (si spera) è ovvio, fai attenzione ai LLM. Tutto ciò che invii a un modello closed source può diventare dati di addestramento per l’organizzazione madre: assicurati di non inviare nulla di sensibile. Inoltre, verifica che l’output di GPT sia legittimo.

Se utilizzati correttamente, gli LLM stanno cambiando il gioco per stimolare l’apprendimento e lo sviluppo. Vale $ 20 al mese.

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *