Semplificare le pipeline di dati: come utilizzare WhyLogs con PySpark per un’efficace profilazione e convalida dei dati |  di Sarthak Sarbahi |  Gennaio 2024

 | Intelligenza-Artificiale

Componenti dei whylog

Cominciamo comprendendo le caratteristiche importanti dei whylog.

  • Dati di registrazione: Il nucleo di whylogs è la sua capacità di registrare dati. Immagina di tenere un diario dettagliato delle caratteristiche dei tuoi dati. Registra vari aspetti dei tuoi dati, come il numero di righe, l’intervallo di valori in ciascuna colonna e altri dettagli statistici.
  • Profili Whylogs: Una volta registrati i dati, whylogs crea “profili”. Questi profili sono come istantanee che riassumono i tuoi dati. Includono statistiche come medie, conteggi e distribuzioni. Questo è utile per comprendere i tuoi dati a colpo d’occhio e monitorare come cambiano nel tempo.
  • Tracciamento dei dati: Con whylogs puoi tenere traccia delle modifiche apportate ai tuoi dati nel tempo. Questo è importante perché i dati spesso si evolvono e ciò che era vero il mese scorso potrebbe non essere vero oggi. Il monitoraggio ti aiuta a cogliere questi cambiamenti e a comprenderne l’impatto.
  • Convalida dei dati: Whylogs ti consente di impostare regole o vincoli per garantire che i tuoi dati siano come previsto. Ad esempio, se sai che una determinata colonna dovrebbe contenere solo numeri positivi, puoi impostare una regola a riguardo. Se qualcosa non corrisponde alle tue regole, saprai che potrebbe esserci un problema.
  • Visualizzazione: è più facile comprendere i dati attraverso le immagini. Whylogs può creare grafici e tabelle per aiutarti a vedere cosa succede nei tuoi dati, rendendoli più accessibili, soprattutto per coloro che non sono esperti di dati.
  • Integrazioni: Whylogs supporta integrazioni con una varietà di strumenti, framework e linguaggi: Spark, Kafka, Pandas, MLFlow, azioni GitHub, RAPIDS, Java, Docker, AWS S3 e altro ancora.

Questo è tutto ciò che dobbiamo sapere sui whylogs. Se sei curioso di saperne di più, ti incoraggio a controllare il documentazione. Successivamente, lavoriamo per impostare le cose per il tutorial.

Configurazione dell’ambiente

Utilizzeremo un notebook Jupyter per questo tutorial. Per far funzionare il nostro codice ovunque, utilizzeremo JupyterLab in Docker. Questa configurazione installa tutte le librerie necessarie e prepara i dati di esempio. Se non conosci Docker e vuoi sapere come configurarlo, dai un’occhiata a questo collegamento.

Inizia scaricando i dati di esempio (CSV) da Qui. Questi dati sono ciò che utilizzeremo per la profilazione e la convalida. Creare un data cartella nella directory principale del progetto e salva lì il file CSV. Successivamente, crea un file Dockerfile nella stessa directory principale.

Dockerfile per questo tutorial (Immagine dell’autore)

Questo Dockerfile è un insieme di istruzioni per creare un ambiente specifico per il tutorial. Analizziamolo:

  • La prima riga FROM quay.io/jupyter/pyspark-notebook dice a Docker di utilizzare un’immagine esistente come punto di partenza. Questa immagine è un notebook Jupyter su cui è già configurato PySpark.
  • IL RUN pip install whylogs whylogs(viz) whylogs(spark) line riguarda l’aggiunta delle librerie necessarie a questo ambiente. Utilizza pip aggiungere whylogs e le sue funzionalità aggiuntive per la visualizzazione (viz) e per lavorare con Spark (spark).
  • L’ultima riga, COPY data/patient_data.csv /home/patient_data.csvriguarda lo spostamento del file di dati in questo ambiente. Prende il file CSV patient_data.csv dal data cartella nella directory del progetto e la inserisce nel file /home/ directory all’interno dell’ambiente Docker.

A questo punto la directory del tuo progetto dovrebbe assomigliare a questa.

Directory del progetto in VS Code (immagine dell’autore)

Eccezionale! Ora creiamo un’immagine Docker. Per fare ciò, digita il seguente comando nel tuo terminale, assicurandoti di essere nella cartella principale del tuo progetto.

docker build -t pyspark-whylogs .

Questo comando crea un’immagine Docker denominata pyspark-whylogs. Puoi vederlo nella scheda “Immagini” del tuo Desktop Docker app.

Immagine Docker creata (Immagine dell’autore)

Passaggio successivo: eseguiamo questa immagine per avviare JupyterLab. Digita un altro comando nel tuo terminale.

docker run -p 8888:8888 pyspark-whylogs

Questo comando avvia un contenitore da pyspark-whylogs Immagine. Ti assicura di poter accedere a JupyterLab tramite la porta 8888 sul tuo computer.

Dopo aver eseguito questo comando, vedrai un URL nei log simile al seguente: http://127.0.0.1:8888/lab?token=your_token. Fare clic su di esso per aprire l’interfaccia web di JupyterLab.

Log del contenitore Docker (immagine dell’autore)

Grande! Tutto è impostato per l’utilizzo dei whylogs. Ora conosciamo il set di dati con cui lavoreremo.

Comprendere il set di dati

Utilizzeremo un set di dati sui pazienti ospedalieri. Il file, denominato patient_data.csvinclude 100.000 righe con queste colonne:

  • patient_id: ID univoco di ciascun paziente. Ricorda, potresti vedere lo stesso ID paziente più di una volta nel set di dati.
  • patient_name: Il nome del paziente. Pazienti diversi possono avere lo stesso nome.
  • height: Altezza del paziente in centimetri. Ogni paziente ha la stessa altezza indicata per ogni visita in ospedale.
  • weight: Il peso del paziente è indicato chilogrammi. È sempre più di zero.
  • visit_date: la data in cui il paziente ha visitato l’ospedale, nel formato YYYY-MM-DD.

Per quanto riguarda la provenienza di questo set di dati, non preoccuparti. È stato creato da ChatGPT. Successivamente, iniziamo a scrivere del codice.

Iniziare con PySpark

Innanzitutto, apri un nuovo notebook in JupyterLab. Ricordati di salvarlo prima di iniziare a lavorare.

Inizieremo importando le librerie necessarie.

# Import libraries
from typing import Any
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from whylogs.api.pyspark.experimental import collect_column_profile_views
from whylogs.api.pyspark.experimental import collect_dataset_profile_view
from whylogs.core.metrics.condition_count_metric import Condition
from whylogs.core.relations import Predicate
from whylogs.core.schema import DeclarativeSchema
from whylogs.core.resolvers import STANDARD_RESOLVER
from whylogs.core.specialized_resolvers import ConditionCountMetricSpec
from whylogs.core.constraints.factories import condition_meets
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import no_missing_values
from whylogs.core.constraints.factories import greater_than_number
from whylogs.viz import NotebookProfileVisualizer
import pandas as pd
import datetime

Quindi, configureremo una SparkSession. Questo ci consente di eseguire il codice PySpark.

# Initialize a SparkSession
spark = SparkSession.builder.appName('whylogs').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

Successivamente, creeremo un dataframe Spark leggendo il file CSV. Verificheremo anche il suo schema.

# Create a dataframe from CSV file
df = spark.read.option("header",True).option("inferSchema",True).csv("/home/patient_data.csv")
df.printSchema()

Successivamente, diamo un’occhiata ai dati. Visualizzeremo la prima riga nel dataframe.

# First row from dataframe
df.show(n=1, vertical=True)

Ora che abbiamo visto i dati, è il momento di iniziare la profilazione dei dati con whylogs.

Profilazione dei dati con whylogs

Per profilare i nostri dati, utilizzeremo due funzioni. Innanzitutto c’è collect_column_profile_views. Questa funzione raccoglie profili dettagliati per ciascuna colonna nel dataframe. Questi profili ci forniscono statistiche come conteggi, distribuzioni e altro, a seconda di come impostiamo i whylog.

# Profile the data with whylogs
df_profile = collect_column_profile_views(df)
print(df_profile)

Ogni colonna nel set di dati ha il proprio ColumnProfileView oggetto in un dizionario. Possiamo esaminare varie metriche per ciascuna colonna, come i loro valori medi.

whylogs esaminerà ogni punto dati e deciderà statisticamente se quel punto dati è rilevante o meno per il calcolo finale

Consideriamo ad esempio la media height.

df_profile("height").get_metric("distribution").mean.value

Successivamente, calcoleremo anche la media direttamente dal dataframe per il confronto.

# Compare with mean from dataframe
df.select(F.mean(F.col("height"))).show()

Tuttavia, la profilazione delle colonne una per una non è sempre sufficiente. Quindi, usiamo un’altra funzione, collect_dataset_profile_view. Questa funzione profila l’intero set di dati, non solo le singole colonne. Possiamo combinarlo con Panda per analizzare tutte le metriche del profilo.

# Putting everything together
df_profile_view = collect_dataset_profile_view(input_df=df)
df_profile_view.to_pandas().head()

Possiamo anche salvare questo profilo come file CSV per un uso successivo.

# Persist profile as a file
df_profile_view.to_pandas().reset_index().to_csv("/home/jovyan/patint_profile.csv",header = True,index = False)

La cartella /home/jovyan nel nostro contenitore Docker proviene Stack Docker di Jupyter (immagini Docker pronte all’uso contenenti applicazioni Jupyter). In queste configurazioni Docker, “jovyan” è l’utente predefinito per l’esecuzione di Jupyter. IL /home/jovyan La cartella è il punto in cui solitamente iniziano i notebook Jupyter e dove dovresti inserire i file per accedervi in ​​Jupyter.

Ed è così che profiliamo i dati con whylogs. Successivamente, esploreremo la convalida dei dati.

Convalida dei dati con whylogs

Per la convalida dei dati, eseguiremo questi controlli:

  • patient_id: Assicurati che non ci siano valori mancanti.
  • weight: assicurarsi che ogni valore sia maggiore di zero.
  • visit_date: controlla se le date sono presenti in YYYY-MM-DD formato.

Ora cominciamo. La validazione dei dati in whylogs inizia dalla profilazione dei dati. Possiamo usare il collect_dataset_profile_view funzione per creare un profilo, come abbiamo visto prima.

Tuttavia, questa funzione solitamente crea un profilo con metriche standard come media e conteggio. Ma cosa succede se dobbiamo controllare? valori individuali in una colonna rispetto agli altri vincoli, che possono essere verificati rispetto a parametri aggregati? È qui che entrano in gioco le metriche di conteggio delle condizioni. È come aggiungere una metrica personalizzata al nostro profilo.

Creiamone uno per visit_date colonna per convalidare ogni riga.

def check_date_format(date_value: Any) -> bool:
date_format = '%Y-%m-%d'
try:
datetime.datetime.strptime(date_value, date_format)
return True
except ValueError:
return False

visit_date_condition = {"is_date_format": Condition(Predicate().is_(check_date_format))}

Una volta ottenuta la nostra condizione, la aggiungiamo al profilo. Usiamo a Schema standard e aggiungi il nostro controllo personalizzato.

# Create condition count metric
schema = DeclarativeSchema(STANDARD_RESOLVER)
schema.add_resolver_spec(column_name="visit_date", metrics=(ConditionCountMetricSpec(visit_date_condition)))

Quindi ricreiamo il profilo sia con le metriche standard che con la nostra nuova metrica personalizzata per visit_date colonna.

# Use the schema to pass to logger with collect_dataset_profile_view
# This creates profile with standard metrics as well as condition count metrics
df_profile_view_v2 = collect_dataset_profile_view(input_df=df, schema=schema)

Con il nostro profilo pronto, ora possiamo impostare i nostri controlli di convalida per ciascuna colonna.

builder = ConstraintsBuilder(dataset_profile_view=df_profile_view_v2)
builder.add_constraint(no_missing_values(column_name="patient_id"))
builder.add_constraint(condition_meets(column_name="visit_date", condition_name="is_date_format"))
builder.add_constraint(greater_than_number(column_name="weight",number=0))

constraints = builder.build()
constraints.generate_constraints_report()

Possiamo anche usare whylogs per mostrare un resoconto di questi controlli.

# Visualize constraints report using Notebook Profile Visualizer
visualization = NotebookProfileVisualizer()
visualization.constraints_report(constraints, cell_height=300)

Sarà un report HTML che mostra quali controlli hanno superato o fallito.

Rapporto sui vincoli di whylogs (Immagine dell’autore)

Ecco cosa troviamo:

  • IL patient_id la colonna non ha valori mancanti. Bene!
  • Alcuni visit_date i valori non corrispondono a YYYY-MM-DD formato.
  • Alcuni weight i valori sono zero.

Ricontrolliamo questi risultati nel nostro dataframe. Per prima cosa controlliamo il visit_date formato con codice PySpark.

# Validate visit_date column
df \
.withColumn("check_visit_date",F.to_date(F.col("visit_date"),"yyyy-MM-dd")) \
.withColumn("null_check",F.when(F.col("check_visit_date").isNull(),"null").otherwise("not_null")) \
.groupBy("null_check") \
.count() \
.show(truncate = False)

+----------+-----+
|null_check|count|
+----------+-----+
|not_null |98977|
|null |1023 |
+----------+-----+

Mostra che 1023 righe su 100.000 non corrispondono al nostro formato di data. Successivamente, il weight colonna.

# Validate weight column
df \
.select("weight") \
.groupBy("weight") \
.count() \
.orderBy(F.col("weight")) \
.limit(1) \
.show(truncate = False)

+------+-----+
|weight|count|
+------+-----+
|0 |2039 |
+------+-----+

Ancora una volta, i nostri risultati corrispondono ai whylog. Quasi 2.000 righe hanno un peso pari a zero. E questo conclude il nostro tutorial. È possibile trovare il taccuino per questo tutorial Qui.

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *