Configurazione dell’ambiente
In questa guida utilizzeremo JupyterLab con Docker e MinIO. Pensa a Docker come a uno strumento pratico che semplifica l’esecuzione delle applicazioni e a MinIO come a una soluzione di archiviazione flessibile perfetta per gestire molti tipi diversi di dati. Ecco come sistemeremo le cose:
Non mi immergerò in profondità in ogni passaggio qui poiché ce n’è già uno fantastico tutorial per quello. Suggerisco di provarlo prima, poi di tornare per continuare con questo.
Una volta che tutto sarà pronto, inizieremo preparando i nostri dati di esempio. Apri un nuovo taccuino Jupyter per iniziare.
Per prima cosa dobbiamo installare il file s3fs
Pacchetto Python, essenziale per lavorare con MinIO in Python.
!pip install s3fs
Successivamente importeremo le dipendenze e i moduli necessari.
import os
import s3fs
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T
import datetime
import time
Imposteremo anche alcune variabili d’ambiente che saranno utili durante l’interazione con MinIO.
# Define environment variables
os.environ("MINIO_KEY") = "minio"
os.environ("MINIO_SECRET") = "minio123"
os.environ("MINIO_ENDPOINT") = "http://minio1:9000"
Quindi, configureremo la nostra sessione Spark con le impostazioni necessarie.
# Create Spark session
spark = SparkSession.builder \
.appName("big_data_file_formats") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.spark:spark-avro_2.12:3.5.0,io.delta:delta-spark_2.12:3.0.0") \
.config("spark.hadoop.fs.s3a.endpoint", os.environ("MINIO_ENDPOINT")) \
.config("spark.hadoop.fs.s3a.access.key", os.environ("MINIO_KEY")) \
.config("spark.hadoop.fs.s3a.secret.key", os.environ("MINIO_SECRET")) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport() \
.getOrCreate()
Semplifichiamolo per capirlo meglio.
spark.jars.packages
: scarica i file JAR richiesti dal file Archivio Maven. Un repository Maven è un luogo centrale utilizzato per archiviare artefatti di build come file JAR, librerie e altre dipendenze utilizzate nei progetti basati su Maven.spark.hadoop.fs.s3a.endpoint
: questo è l’URL dell’endpoint per MinIO.spark.hadoop.fs.s3a.access.key
Espark.hadoop.fs.s3a.secret.key
: Questa è la chiave di accesso e la chiave segreta per MinIO. Tieni presente che è lo stesso nome utente e password utilizzati per accedere all’interfaccia web MinIO.spark.hadoop.fs.s3a.path.style.access
: è impostato su true per abilitare l’accesso in stile percorso per il bucket MinIO.spark.hadoop.fs.s3a.impl
: Questa è la classe di implementazione per il file system S3A.spark.sql.extensions
: registra i comandi e le configurazioni SQL di Delta Lake all’interno del parser Spark SQL.spark.sql.catalog.spark_catalog
: imposta il catalogo Spark sul catalogo di Delta Lake, consentendo a Delta Lake di gestire le operazioni di gestione delle tabelle e dei metadati.
Scegliere la versione JAR giusta è fondamentale per evitare errori. Utilizzando la stessa immagine Docker, la versione JAR menzionata qui dovrebbe funzionare correttamente. Se riscontri problemi di configurazione, non esitare a lasciare un commento. Farò del mio meglio per aiutarti 🙂
Il nostro prossimo passo è creare un grande dataframe Spark. Avrà 10 milioni di righe, divise in dieci colonne: metà sono testo e metà sono numeri.
# Generate sample data
num_rows = 10000000
df = spark.range(0, num_rows)# Add columns
for i in range(1, 10): # Since we already have one column
if i % 2 == 0:
# Integer column
df = df.withColumn(f"int_col_{i}", (F.randn() * 100).cast(T.IntegerType()))
else:
# String column
df = df.withColumn(f"str_col_{i}", (F.rand() * num_rows).cast(T.IntegerType()).cast("string"))
df.count()
Diamo un’occhiata alle prime voci per vedere come appaiono.
# Show rows from sample data
df.show(10,truncate = False)+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|id |str_col_1|int_col_2|str_col_3|int_col_4|str_col_5|int_col_6|str_col_7|int_col_8|str_col_9|
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|0 |7764018 |128 |1632029 |-15 |5858297 |114 |1025493 |-88 |7376083 |
|1 |2618524 |118 |912383 |235 |6684042 |-115 |9882176 |170 |3220749 |
|2 |6351000 |75 |3515510 |26 |2605886 |89 |3217428 |87 |4045983 |
|3 |4346827 |-70 |2627979 |-23 |9543505 |69 |2421674 |-141 |7049734 |
|4 |9458796 |-106 |6374672 |-142 |5550170 |25 |4842269 |-97 |5265771 |
|5 |9203992 |23 |4818602 |42 |530044 |28 |5560538 |-75 |2307858 |
|6 |8900698 |-130 |2735238 |-135 |1308929 |22 |3279458 |-22 |3412851 |
|7 |6876605 |-35 |6690534 |-41 |273737 |-178 |8789689 |88 |4200849 |
|8 |3274838 |-42 |1270841 |-62 |4592242 |133 |4665549 |-125 |3993964 |
|9 |4904488 |206 |2176042 |58 |1388630 |-63 |9364695 |78 |2657371 |
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
only showing top 10 rows
Per comprendere la struttura del nostro dataframe, useremo df.printSchema()
per vedere i tipi di dati che contiene. Successivamente, creeremo quattro file CSV. Questi verranno utilizzati per Parquet, Avro, ORC e Delta Lake. Lo stiamo facendo per evitare qualsiasi pregiudizio nei test delle prestazioni: l’utilizzo dello stesso CSV consente a Spark di memorizzare nella cache e ottimizzare le cose in background.
# Write 4 CSVs for comparing performance for every file type
df.write.csv("s3a://mybucket/ten_million_parquet.csv")
df.write.csv("s3a://mybucket/ten_million_avro.csv")
df.write.csv("s3a://mybucket/ten_million_orc.csv")
df.write.csv("s3a://mybucket/ten_million_delta.csv")
Ora creeremo quattro frame di dati separati da questi CSV, ciascuno per un formato di file diverso.
# Read all four CSVs to create dataframes
schema = T.StructType((
T.StructField("id", T.LongType(), nullable=False),
T.StructField("str_col_1", T.StringType(), nullable=True),
T.StructField("int_col_2", T.IntegerType(), nullable=True),
T.StructField("str_col_3", T.StringType(), nullable=True),
T.StructField("int_col_4", T.IntegerType(), nullable=True),
T.StructField("str_col_5", T.StringType(), nullable=True),
T.StructField("int_col_6", T.IntegerType(), nullable=True),
T.StructField("str_col_7", T.StringType(), nullable=True),
T.StructField("int_col_8", T.IntegerType(), nullable=True),
T.StructField("str_col_9", T.StringType(), nullable=True)
))df_csv_parquet = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_parquet.csv")
df_csv_avro = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_avro.csv")
df_csv_orc = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_orc.csv")
df_csv_delta = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_delta.csv")
E questo è tutto! Siamo pronti per esplorare questi formati di file di big data.
Lavorare con il parquet
Parquet è un formato di file orientato alle colonne che si adatta molto bene ad Apache Spark, rendendolo la scelta migliore per la gestione dei big data. Brilla negli scenari analitici, in particolare quando si esaminano i dati colonna per colonna.
Una delle sue caratteristiche interessanti è la capacità di archiviare i dati in un formato compresso, con compressione scattante essendo la scelta giusta. Ciò non solo consente di risparmiare spazio ma migliora anche le prestazioni.
Un altro aspetto interessante di Parquet è il suo approccio flessibile agli schemi di dati. Puoi iniziare con una struttura di base e poi espanderla senza problemi aggiungendo più colonne man mano che le tue esigenze crescono. Questa adattabilità lo rende estremamente facile da usare per i progetti di dati in evoluzione.
Ora che abbiamo capito il parquet, mettiamolo alla prova. Scriveremo 10 milioni di record in un file Parquet e terremo d’occhio quanto tempo ci vuole. Invece di usare il %timeit
La funzione Python, che viene eseguita più volte e può richiedere molte risorse per le attività relative ai big data, la misureremo solo una volta.
# Write data as Parquet
start_time = time.time()
df_csv_parquet.write.parquet("s3a://mybucket/ten_million_parquet2.parquet")
end_time = time.time()
print(f"Time taken to write as Parquet: {end_time - start_time} seconds")
Per me, questo compito ha richiesto 15,14 secondima ricorda, questa volta può cambiare a seconda del tuo computer. Ad esempio, su un PC meno potente, ci è voluto più tempo. Quindi, non preoccuparti se il tuo tempo è diverso. Ciò che è importante qui è confrontare le prestazioni tra diversi formati di file.
Successivamente, eseguiremo una query di aggregazione sui nostri dati Parquet.
# Perfom aggregation query using Parquet data
start_time = time.time()
df_parquet = spark.read.parquet("s3a://mybucket/ten_million_parquet2.parquet")
df_parquet \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+
Questa query è terminata in 12,33 secondi. Va bene, ora cambiamo marcia ed esploriamo il formato file ORC.
Lavorare con ORC
Il formato file ORC, un altro contendente orientato alle colonne, potrebbe non essere così noto come Parquet, ma ha i suoi vantaggi. Una caratteristica straordinaria è la sua capacità di comprimere i dati in modo ancora più efficace di Parquet, utilizzando lo stesso algoritmo di compressione scattante.
È un successo nel mondo Hive, grazie al supporto per le operazioni ACID nelle tabelle Hive. ORC è inoltre progettato su misura per gestire in modo efficiente letture di streaming di grandi dimensioni.
Inoltre, è flessibile quanto Parquet quando si tratta di schemi: puoi iniziare con una struttura di base e quindi aggiungere più colonne man mano che il tuo progetto cresce. Ciò rende ORC una scelta solida per l’evoluzione delle esigenze dei big data.
Immergiamoci nel testare le prestazioni di scrittura di ORC.
# Write data as ORC
start_time = time.time()
df_csv_orc.write.orc("s3a://mybucket/ten_million_orc2.orc")
end_time = time.time()
print(f"Time taken to write as ORC: {end_time - start_time} seconds")
Mi ha preso 12,94 secondi per completare l’attività. Un altro punto di interesse è la dimensione dei dati scritti nel bucket MinIO. Nel ten_million_orc2.orc
cartella, troverai diversi file di partizione, ciascuno di dimensioni coerenti. Ogni file ORC della partizione riguarda 22,3 MiBe ci sono 16 file in totale.
Confrontandolo con Parquet, ogni file di partizione Parquet è in giro 26,8 MiBanche per un totale di 16 file. Ciò dimostra che l’ORC offre effettivamente una compressione migliore rispetto al Parquet.
Successivamente, testeremo il modo in cui ORC gestisce una query di aggregazione. Utilizziamo la stessa query per tutti i formati di file per mantenere equi i nostri benchmark.
# Perform aggregation using ORC data
df_orc = spark.read.orc("s3a://mybucket/ten_million_orc2.orc")
start_time = time.time()
df_orc \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+
La query ORC è terminata 13,44 secondiun po’ più a lungo dei tempi di Parquet. Con ORC spuntato dalla nostra lista, passiamo a sperimentare con Avro.
Lavorare con Avro
Avro è un formato di file basato su righe con i suoi punti di forza unici. Anche se non comprime i dati in modo efficiente come Parquet o ORC, compensa con una velocità di scrittura più elevata.
Ciò che distingue davvero Avro sono le sue eccellenti capacità di evoluzione dello schema. Gestisce con facilità modifiche come campi aggiunti, rimossi o modificati, rendendolo la scelta ideale per scenari in cui le strutture dei dati si evolvono nel tempo.
Avro è particolarmente adatto per carichi di lavoro che comportano la scrittura di molti dati.
Ora diamo un’occhiata a come si comporta Avro con la scrittura dei dati.
# Write data as Avro
start_time = time.time()
df_csv_avro.write.format("avro").save("s3a://mybucket/ten_million_avro2.avro")
end_time = time.time()
print(f"Time taken to write as Avro: {end_time - start_time} seconds")
Mi ha preso 12,81 secondiche in realtà è più veloce sia di Parquet che di ORC. Successivamente, esamineremo le prestazioni di Avro con una query di aggregazione.
# Perform aggregation using Avro data
df_avro = spark.read.format("avro").load("s3a://mybucket/ten_million_avro2.avro")
start_time = time.time()
df_avro \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+
Questa query ha richiesto circa 15,42 secondi. Quindi, quando si tratta di interrogare, Parquet e ORC sono avanti in termini di velocità. Bene, è il momento di esplorare il nostro formato file finale e più recente: Delta Lake.
Lavorare con Delta Lake
Delta Lake è una nuova stella nell’universo dei formati di file Big Data, strettamente correlato a Parquet in termini di dimensioni di archiviazione: è come Parquet ma con alcune funzionalità extra.
Durante la scrittura dei dati, Delta Lake impiega un po’ più tempo di Parquet, soprattutto a causa della sua _delta_log
cartella, che è fondamentale per le sue funzionalità avanzate. Queste funzionalità includono la conformità ACID per transazioni affidabili, il viaggio nel tempo per l’accesso ai dati storici e la compattazione di piccoli file per mantenere le cose in ordine.
Sebbene sia un nuovo arrivato nella scena dei big data, Delta Lake è rapidamente diventato uno dei preferiti sulle piattaforme cloud che eseguono Spark, superando il suo utilizzo nei sistemi locali.
Passiamo a testare le prestazioni di Delta Lake, iniziando con un test di scrittura dei dati.
# Write data as Delta
start_time = time.time()
df_csv_delta.write.format("delta").save("s3a://mybucket/ten_million_delta2.delta")
end_time = time.time()
print(f"Time taken to write as Delta Lake: {end_time - start_time} seconds")
L’operazione di scrittura è stata eseguita 17,78 secondiche è un po’ più lungo degli altri formati di file che abbiamo esaminato. Una cosa interessante da notare è che in the ten_million_delta2.delta
cartella, ogni file di partizione è in realtà un file Parquet, di dimensioni simili a quanto osservato con Parquet. Inoltre, c’è il _delta_log
cartella.
IL _delta_log
La cartella nel formato file Delta Lake svolge un ruolo fondamentale nel modo in cui Delta Lake gestisce e mantiene l’integrità dei dati e il controllo delle versioni. È un componente chiave che distingue Delta Lake dagli altri formati di file di big data. Ecco una semplice ripartizione della sua funzione:
- Registro delle transazioni: IL
_delta_log
La cartella contiene un registro delle transazioni che registra ogni modifica apportata ai dati nella tabella Delta. Questo registro è una serie di file JSON che descrivono in dettaglio le aggiunte, le eliminazioni e le modifiche ai dati. Funziona come un diario completo di tutte le transazioni di dati. - Conformità ACIDO: questo registro consente la conformità ACID (Atomicità, Coerenza, Isolamento, Durabilità). Ogni transazione in Delta Lake, come la scrittura di nuovi dati o la modifica dei dati esistenti, è atomica e coerente, garantendo l’integrità e l’affidabilità dei dati.
- Viaggio nel tempo e auditing: Il registro delle transazioni consente il “viaggio nel tempo”, il che significa che puoi facilmente visualizzare e ripristinare versioni precedenti dei dati. Ciò è estremamente utile per il recupero dei dati, il controllo e la comprensione di come i dati si sono evoluti nel tempo.
- Applicazione ed evoluzione dello schema: IL
_delta_log
tiene traccia anche dello schema (struttura) dei dati. Applica lo schema durante le scritture dei dati e consente un’evoluzione sicura dello schema nel tempo senza danneggiare i dati. - Concorrenza e operazioni di fusione: Gestisce letture e scritture simultanee, garantendo che più utenti possano accedere e modificare i dati contemporaneamente senza conflitti. Ciò lo rende ideale per operazioni complesse come l’unione, l’aggiornamento e l’eliminazione.
In sintesi, il _delta_log
La cartella è il cervello dietro le funzionalità avanzate di gestione dei dati di Delta Lake, offrendo una solida registrazione delle transazioni, controllo della versione e miglioramenti dell’affidabilità che in genere non sono disponibili in formati di file più semplici come Parquet o ORC.
Ora è il momento di vedere come si comporta Delta Lake con una query di aggregazione.
# Perform aggregation using Delta data
df_delta = spark.read.format("delta").load("s3a://mybucket/ten_million_delta2.delta")
start_time = time.time()
df_delta \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+
Questa query è terminata tra circa 15,51 secondi. Sebbene sia un po’ più lento rispetto a Parquet e ORC, è abbastanza vicino. Ciò suggerisce che le prestazioni di Delta Lake negli scenari del mondo reale sono abbastanza simili a quelle di Parquet.
Eccezionale! Abbiamo concluso tutti i nostri esperimenti. Ricapitoliamo i nostri risultati nella prossima sezione.
Quando utilizzare quale formato di file?
Abbiamo concluso i nostri test, quindi riuniamo tutti i nostri risultati. Per la scrittura dei dati, Avro è al primo posto. Questo è davvero ciò che riesce meglio negli scenari pratici.
Quando si tratta di leggere ed eseguire query di aggregazione, Parquet è in testa al gruppo. Tuttavia, ciò non significa che ORC e Delta Lake non siano all’altezza. Essendo formati di file a colonne, funzionano egregiamente nella maggior parte delle situazioni.
Ecco un breve riassunto:
- Scegli ORC per la migliore compressione, soprattutto se utilizzi Hive e Pig per attività analitiche.
- Lavori con Spark? Parquet e Delta Lake sono le tue scelte preferite.
- Per scenari con molta scrittura di dati, come le aree della zona di atterraggio, Avro è la soluzione migliore.
E questo è tutto con questo tutorial!
Fonte: towardsdatascience.com