I notebook non sono sufficienti per il machine learning su larga scala
Tutte le immagini, se non diversamente specificato, sono dell'autore
C’è un malinteso (per non dire fantasia) che continua a ripresentarsi nelle aziende quando si parla di AI e Machine Learning. Le persone spesso valutano erroneamente la complessità e le competenze necessarie per portare in produzione progetti di machine learning, o perché non capiscono il lavoro o (ancora peggio) perché pensano di capirlo, mentre non è così.
La loro prima reazione quando scoprono l'intelligenza artificiale potrebbe essere qualcosa del tipo: “L'intelligenza artificiale in realtà è piuttosto semplice, mi basta un Jupyter Notebook, copiare e incollare il codice da qua e là – o chiedere a Copilot – e boom. Dopotutto non c'è bisogno di assumere data scientist…” E la storia finisce sempre male, con amarezza, delusione e la sensazione che l'intelligenza artificiale sia una truffa: difficoltà nel passare alla produzione, deriva dei dati, bug, comportamenti indesiderati.
Quindi scriviamolo una volta per tutte: l'intelligenza artificiale/il machine learning/qualsiasi lavoro relativo ai dati è un lavoro vero e proprio, non un hobby. Richiede abilità, maestria e strumenti. Se pensi di poter fare ML in produzione con i notebook, ti sbagli.
Questo articolo mira a mostrare, con un semplice esempio, tutto l'impegno, le competenze e gli strumenti necessari per passare da un notebook a una vera pipeline in produzione. Perché il machine learning in produzione significa soprattutto essere in grado di automatizzare l'esecuzione del codice su base regolare, con automazione e monitoraggio.
E per coloro che sono alla ricerca di un tutorial end-to-end “condutture da notebook a vertice”, potresti trovarlo utile.
Immaginiamo che tu sia un Data Scientist che lavora presso un'azienda di e-commerce. La tua azienda vende vestiti online e il team di marketing chiede il tuo aiuto: stanno preparando un'offerta speciale per prodotti specifici e vorrebbero rivolgersi in modo efficiente ai clienti personalizzando il contenuto delle email che verrà loro inviato per massimizzare la conversione. Il tuo compito è quindi semplice: a ogni cliente dovrebbe essere assegnato un punteggio che rappresenta la probabilità che acquisti un prodotto dell'offerta speciale.
L'offerta speciale si rivolgerà specificamente a questi marchi, il che significa che il team di marketing vuole sapere quali clienti acquisteranno il loro prossimo prodotto dai seguenti marchi:
Allegra K, Calvin Klein, Carhartt, Hanes, Volcom, Nautica, Quiksilver, Diesel, Dockers, Hurley
Per questo articolo utilizzeremo un set di dati disponibile pubblicamente di Google, il `thelook_ecommerce` set di dati. Contiene dati falsi con transazioni, dati dei clienti, dati dei prodotti, tutto ciò di cui avremmo a nostra disposizione lavorando presso un rivenditore di moda online.
Per seguire questo notebook, avrai bisogno dell'accesso a Google Cloud Platform, ma la logica può essere replicata su altri provider Cloud o terze parti come Neptune, MLFlow, ecc.
In qualità di rispettabile Data Scientist, inizi creando un taccuino che ci aiuterà nell'esplorazione dei dati.
Per prima cosa importiamo le librerie che utilizzeremo durante questo articolo:
import catboost as cb
import pandas as pd
import sklearn as sk
import numpy as np
import datetime as dtfrom dataclasses import dataclass
from sklearn.model_selection import train_test_split
from google.cloud import bigquery
%load_ext watermark
%watermark --packages catboost,pandas,sklearn,numpy,google.cloud.bigquery
catboost : 1.0.4
pandas : 1.4.2
numpy : 1.22.4
google.cloud.bigquery: 3.2.0
Ottenere e preparare i dati
Caricheremo quindi i dati da BigQuery utilizzando il client Python. Assicurati di utilizzare il tuo ID progetto:
query = """
SELECT
transactions.user_id,
products.brand,
products.category,
products.department,
products.retail_price,
users.gender,
users.age,
users.created_at,
users.country,
users.city,
transactions.created_at
FROM `bigquery-public-data.thelook_ecommerce.order_items` as transactions
LEFT JOIN `bigquery-public-data.thelook_ecommerce.users` as users
ON transactions.user_id = users.id
LEFT JOIN `bigquery-public-data.thelook_ecommerce.products` as products
ON transactions.product_id = products.id
WHERE status <> 'Cancelled'
"""client = bigquery.Client()
df = client.query(query).to_dataframe()
Dovresti vedere qualcosa del genere quando guardi il dataframe:
Questi rappresentano le transazioni/acquisti effettuati dai clienti, arricchiti con informazioni sui clienti e sui prodotti.
Dato che il nostro obiettivo è prevedere quale marca i clienti acquisteranno nel loro prossimo acquisto, procederemo come segue:
- Raggruppa gli acquisti in ordine cronologico per ciascun cliente
- Se un cliente effettua N acquisti, consideriamo l'ennesimo acquisto come obiettivo e N-1 come nostre caratteristiche.
- Escludiamo quindi i clienti con 1 solo acquisto
Mettiamolo nel codice:
# Compute recurrent customers
recurrent_customers = df.groupby('user_id')('created_at').count().to_frame("n_purchases")# Merge with dataset and filter those with more than 1 purchase
df = df.merge(recurrent_customers, left_on='user_id', right_index=True, how='inner')
df = df.query('n_purchases > 1')
# Fill missing values
df.fillna('NA', inplace=True)
target_brands = (
'Allegra K',
'Calvin Klein',
'Carhartt',
'Hanes',
'Volcom',
'Nautica',
'Quiksilver',
'Diesel',
'Dockers',
'Hurley'
)
aggregation_columns = ('brand', 'department', 'category')
# Group purchases by user chronologically
df_agg = (df.sort_values('created_at')
.groupby(('user_id', 'gender', 'country', 'city', 'age'), as_index=False)(('brand', 'department', 'category'))
.agg({k: ";".join for k in ('brand', 'department', 'category')})
)
# Create the target
df_agg('last_purchase_brand') = df_agg('brand').apply(lambda x: x.split(";")(-1))
df_agg('target') = df_agg('last_purchase_brand').isin(target_brands)*1
df_agg('age') = df_agg('age').astype(float)
# Remove last item of sequence features to avoid target leakage :
for col in aggregation_columns:
df_agg(col) = df_agg(col).apply(lambda x: ";".join(x.split(";")(:-1)))
Notate come abbiamo rimosso l'ultimo elemento nella sequenza features: questo è molto importante perché altrimenti otteniamo quello che chiamiamo “data leakeage”: il target fa parte delle features, al modello viene data la risposta durante l'apprendimento.
Ora otteniamo questo nuovo df_agg
frame dati:
Confrontando con il dataframe originale, vediamo che user_id 2 ha effettivamente acquistato IZOD, Parke & Ronen e infine Orvis che non è tra i marchi target.
Suddivisione in formazione, validazione e test
In qualità di data scientist esperto, ora dividerai i tuoi dati in set diversi, poiché ovviamente sai che tutti e tre sono tenuti a eseguire un rigoroso machine learning. (La validazione incrociata è fuori portata per la gente di oggi, manteniamola semplice.)
Una cosa fondamentale quando si suddividono i dati è utilizzare quelli non così conosciuti stratify
parametro da scikit-learn train_test_split()
metodo. Il motivo è lo squilibrio delle classi: se la distribuzione target (% di 0 e 1 nel nostro caso) differisce tra training e test, potremmo sentirci frustrati dagli scarsi risultati durante la distribuzione del modello. ML 101 kids: mantieni le distribuzioni dei dati il più simili possibile tra i dati di addestramento e i dati di test.
# Remove unecessary featuresdf_agg.drop('last_purchase_category', axis=1, inplace=True)
df_agg.drop('last_purchase_brand', axis=1, inplace=True)
df_agg.drop('user_id', axis=1, inplace=True)
# Split the data into train and eval
df_train, df_val = train_test_split(df_agg, stratify=df_agg('target'), test_size=0.2)
print(f"{len(df_train)} samples in train")
df_train, df_val = train_test_split(df_agg, stratify=df_agg('target'), test_size=0.2)
print(f"{len(df_train)} samples in train")
# 30950 samples in train
df_val, df_test = train_test_split(df_val, stratify=df_val('target'), test_size=0.5)
print(f"{len(df_val)} samples in val")
print(f"{len(df_test)} samples in test")
# 3869 samples in train
# 3869 samples in test
Fatto questo, divideremo con garbo il nostro set di dati tra funzionalità e obiettivi:
X_train, y_train = df_train.iloc(:, :-1), df_train('target')
X_val, y_val = df_val.iloc(:, :-1), df_val('target')
X_test, y_test = df_test.iloc(:, :-1), df_test('target')
Tra le funzionalità ci sono diversi tipi. Di solito li separiamo tra:
- caratteristiche numeriche: sono continue e riflettono una quantità misurabile, o ordinata.
- caratteristiche categoriche: solitamente sono discrete e sono spesso rappresentate come stringhe (es: un paese, un colore, ecc…)
- caratteristiche del testo: solitamente sono sequenze di parole.
Naturalmente possono esserci di più come immagini, video, audio, ecc.
Il modello: introduzione di CatBoost
Per il nostro problema di classificazione (sapevi già che eravamo in un framework di classificazione, vero?), utilizzeremo una libreria semplice ma molto potente: CatBoost. È costruito e gestito da Yandex e fornisce un'API di alto livello per giocare facilmente con alberi potenziati. È vicino a XGBoost, anche se non funziona esattamente allo stesso modo sotto il cofano.
CatBoost offre un simpatico wrapper per gestire funzionalità di diverso tipo. Nel nostro caso, alcune caratteristiche possono essere considerate “testo” in quanto sono la concatenazione di parole, come ad esempio “Calvin Klein;BCBGeneration;Hanes”. Gestire questo tipo di funzionalità a volte può essere doloroso poiché è necessario gestirle con divisori di testo, tokenizzatori, lemmatizzatori, ecc. Speriamo che CatBoost possa gestire tutto per noi!
# Define features
features = {
'numerical': ('retail_price', 'age'),
'static': ('gender', 'country', 'city'),
'dynamic': ('brand', 'department', 'category')
}# Build CatBoost "pools", which are datasets
train_pool = cb.Pool(
X_train,
y_train,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
validation_pool = cb.Pool(
X_val,
y_val,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
# Specify text processing options to handle our text features
text_processing_options = {
"tokenizers": (
{"tokenizer_id": "SemiColon", "delimiter": ";", "lowercasing": "false"}
),
"dictionaries": ({"dictionary_id": "Word", "gram_order": "1"}),
"feature_processing": {
"default": (
{
"dictionaries_names": ("Word"),
"feature_calcers": ("BoW"),
"tokenizers_names": ("SemiColon"),
}
),
},
}
Ora siamo pronti per definire e addestrare il nostro modello. Analizzare ogni singolo parametro non rientra nell'ambito odierno poiché il numero di parametri è piuttosto impressionante, ma sentiti libero di controllare tu stesso l'API.
E per brevità, oggi non eseguiremo la messa a punto degli iperparametri, ma questa è ovviamente una parte importante del lavoro del Data Scientist!
# Train the model
model = cb.CatBoostClassifier(
iterations=200,
loss_function="Logloss",
random_state=42,
verbose=1,
auto_class_weights="SqrtBalanced",
use_best_model=True,
text_processing=text_processing_options,
eval_metric='AUC'
)model.fit(
train_pool,
eval_set=validation_pool,
verbose=10
)
E voilà, il nostro modello è addestrato. Abbiamo finito?
No. Dobbiamo verificare che le prestazioni del nostro modello tra formazione e test siano coerenti. Un enorme divario tra addestramento e test significa che il nostro modello è eccessivo (ovvero “impara i dati di addestramento a memoria e non è bravo a prevedere dati invisibili”).
Per la valutazione del nostro modello, utilizzeremo il punteggio ROC-AUC. Non approfondisco nemmeno questo, ma dalla mia esperienza si tratta di una metrica generalmente abbastanza solida e molto migliore della precisione.
Una breve nota a margine sulla precisione: di solito non consiglio di utilizzare questo come parametro di valutazione. Pensa a un set di dati sbilanciato in cui hai l'1% di positivi e il 99% di negativi. Quale sarebbe la precisione di un modello molto stupido che prevedesse sempre 0? 99%. Quindi la precisione non è utile qui.
from sklearn.metrics import roc_auc_scoreprint(f"ROC-AUC for train set : {roc_auc_score(y_true=y_train, y_score=model.predict(X_train)):.2f}")
print(f"ROC-AUC for validation set : {roc_auc_score(y_true=y_val, y_score=model.predict(X_val)):.2f}")
print(f"ROC-AUC for test set : {roc_auc_score(y_true=y_test, y_score=model.predict(X_test)):.2f}")
ROC-AUC for train set : 0.612
ROC-AUC for validation set : 0.586
ROC-AUC for test set : 0.622
Ad essere onesti, 0,62 AUC non è affatto eccezionale e un po’ deludente per l’esperto Data Scientist che sei. Il nostro modello ha sicuramente bisogno di un po' di regolazione dei parametri qui, e forse dovremmo anche eseguire l'ingegneria delle funzionalità più seriamente.
Ma è già meglio delle previsioni casuali (uff):
# random predictionsprint(f"ROC-AUC for train set : {roc_auc_score(y_true=y_train, y_score=np.random.rand(len(y_train))):.3f}")
print(f"ROC-AUC for validation set : {roc_auc_score(y_true=y_val, y_score=np.random.rand(len(y_val))):.3f}")
print(f"ROC-AUC for test set : {roc_auc_score(y_true=y_test, y_score=np.random.rand(len(y_test))):.3f}")
ROC-AUC for train set : 0.501
ROC-AUC for validation set : 0.499
ROC-AUC for test set : 0.501
Supponiamo che per ora siamo soddisfatti del nostro modello e del nostro notebook. È qui che i data scientist dilettanti si fermerebbero. Allora come possiamo fare il passo successivo e diventare pronti per la produzione?
Incontra Docker
Docker è un insieme di prodotti di piattaforma come servizio che utilizzano la virtualizzazione a livello di sistema operativo per fornire software in pacchetti chiamati contenitori. Detto questo, pensa a Docker come a un codice che può essere eseguito ovunque e che ti consente di evitare la situazione “funziona sulla tua macchina ma non sulla mia”.
Perché usare Docker? Perché tra le cose interessanti come la possibilità di condividere il codice, conservarne le versioni e garantirne una facile distribuzione ovunque, può anche essere utilizzato per creare pipeline. Abbi pazienza e capirai mentre andiamo.
Il primo passo per creare un'applicazione containerizzata è eseguire il refactoring e ripulire il nostro notebook disordinato. Definiremo 2 file, preprocess.py
E train.py
per il nostro esempio molto semplice e inserirli in a src
directory. Includeremo anche il ns requirements.txt
file con tutto quello che c'è dentro.
# src/preprocess.pyfrom sklearn.model_selection import train_test_split
from google.cloud import bigquery
def create_dataset_from_bq():
query = """
SELECT
transactions.user_id,
products.brand,
products.category,
products.department,
products.retail_price,
users.gender,
users.age,
users.created_at,
users.country,
users.city,
transactions.created_at
FROM `bigquery-public-data.thelook_ecommerce.order_items` as transactions
LEFT JOIN `bigquery-public-data.thelook_ecommerce.users` as users
ON transactions.user_id = users.id
LEFT JOIN `bigquery-public-data.thelook_ecommerce.products` as products
ON transactions.product_id = products.id
WHERE status <> 'Cancelled'
"""
client = bigquery.Client(project='<replace_with_your_project_id>')
df = client.query(query).to_dataframe()
print(f"{len(df)} rows loaded.")
# Compute recurrent customers
recurrent_customers = df.groupby('user_id')('created_at').count().to_frame("n_purchases")
# Merge with dataset and filter those with more than 1 purchase
df = df.merge(recurrent_customers, left_on='user_id', right_index=True, how='inner')
df = df.query('n_purchases > 1')
# Fill missing value
df.fillna('NA', inplace=True)
target_brands = (
'Allegra K',
'Calvin Klein',
'Carhartt',
'Hanes',
'Volcom',
'Nautica',
'Quiksilver',
'Diesel',
'Dockers',
'Hurley'
)
aggregation_columns = ('brand', 'department', 'category')
# Group purchases by user chronologically
df_agg = (df.sort_values('created_at')
.groupby(('user_id', 'gender', 'country', 'city', 'age'), as_index=False)(('brand', 'department', 'category'))
.agg({k: ";".join for k in ('brand', 'department', 'category')})
)
# Create the target
df_agg('last_purchase_brand') = df_agg('brand').apply(lambda x: x.split(";")(-1))
df_agg('target') = df_agg('last_purchase_brand').isin(target_brands)*1
df_agg('age') = df_agg('age').astype(float)
# Remove last item of sequence features to avoid target leakage :
for col in aggregation_columns:
df_agg(col) = df_agg(col).apply(lambda x: ";".join(x.split(";")(:-1)))
df_agg.drop('last_purchase_category', axis=1, inplace=True)
df_agg.drop('last_purchase_brand', axis=1, inplace=True)
df_agg.drop('user_id', axis=1, inplace=True)
return df_agg
def make_data_splits(df_agg):
df_train, df_val = train_test_split(df_agg, stratify=df_agg('target'), test_size=0.2)
print(f"{len(df_train)} samples in train")
df_val, df_test = train_test_split(df_val, stratify=df_val('target'), test_size=0.5)
print(f"{len(df_val)} samples in val")
print(f"{len(df_test)} samples in test")
return df_train, df_val, df_test
# src/train.pyimport catboost as cb
import pandas as pd
import sklearn as sk
import numpy as np
import argparse
from sklearn.metrics import roc_auc_score
def train_and_evaluate(
train_path: str,
validation_path: str,
test_path: str
):
df_train = pd.read_csv(train_path)
df_val = pd.read_csv(validation_path)
df_test = pd.read_csv(test_path)
df_train.fillna('NA', inplace=True)
df_val.fillna('NA', inplace=True)
df_test.fillna('NA', inplace=True)
X_train, y_train = df_train.iloc(:, :-1), df_train('target')
X_val, y_val = df_val.iloc(:, :-1), df_val('target')
X_test, y_test = df_test.iloc(:, :-1), df_test('target')
features = {
'numerical': ('retail_price', 'age'),
'static': ('gender', 'country', 'city'),
'dynamic': ('brand', 'department', 'category')
}
train_pool = cb.Pool(
X_train,
y_train,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
validation_pool = cb.Pool(
X_val,
y_val,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
test_pool = cb.Pool(
X_test,
y_test,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
params = CatBoostParams()
text_processing_options = {
"tokenizers": (
{"tokenizer_id": "SemiColon", "delimiter": ";", "lowercasing": "false"}
),
"dictionaries": ({"dictionary_id": "Word", "gram_order": "1"}),
"feature_processing": {
"default": (
{
"dictionaries_names": ("Word"),
"feature_calcers": ("BoW"),
"tokenizers_names": ("SemiColon"),
}
),
},
}
# Train the model
model = cb.CatBoostClassifier(
iterations=200,
loss_function="Logloss",
random_state=42,
verbose=1,
auto_class_weights="SqrtBalanced",
use_best_model=True,
text_processing=text_processing_options,
eval_metric='AUC'
)
model.fit(
train_pool,
eval_set=validation_pool,
verbose=10
)
roc_train = roc_auc_score(y_true=y_train, y_score=model.predict(X_train))
roc_eval = roc_auc_score(y_true=y_val, y_score=model.predict(X_val))
roc_test = roc_auc_score(y_true=y_test, y_score=model.predict(X_test))
print(f"ROC-AUC for train set : {roc_train:.2f}")
print(f"ROC-AUC for validation set : {roc_eval:.2f}")
print(f"ROC-AUC for test. set : {roc_test:.2f}")
return {"model": model, "scores": {"train": roc_train, "eval": roc_eval, "test": roc_test}}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--train-path", type=str)
parser.add_argument("--validation-path", type=str)
parser.add_argument("--test-path", type=str)
parser.add_argument("--output-dir", type=str)
args, _ = parser.parse_known_args()
_ = train_and_evaluate(
args.train_path,
args.validation_path,
args.test_path)
Molto più pulito adesso. Ora puoi effettivamente avviare il tuo script dalla riga di comando!
$ python train.py --train-path xxx --validation-path yyy etc.
Ora siamo pronti per creare la nostra immagine Docker. Per questo dobbiamo scrivere un Dockerfile nella root del progetto:
# DockerfileFROM python:3.8-slim
WORKDIR /
COPY requirements.txt /requirements.txt
COPY src /src
RUN pip install --upgrade pip && pip install -r requirements.txt
ENTRYPOINT ( "bash" )
Questo richiederà i nostri requisiti, copia il file src
cartella e il suo contenuto e installa i requisiti con pip quando l'immagine verrà creata.
Per creare e distribuire questa immagine in un registro contenitori, possiamo utilizzare Google Cloud SDK e gcloud
comandi:
PROJECT_ID = ...
IMAGE_NAME=f'thelook_training_demo'
IMAGE_TAG='latest'
IMAGE_URI='eu.gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, IMAGE_TAG)!gcloud builds submit --tag $IMAGE_URI .
Se tutto va bene, dovresti vedere qualcosa del genere:
Vertex Pipelines, il passaggio alla produzione
Le immagini Docker sono il primo passo per implementare seriamente il Machine Learning in produzione. Il prossimo passo è costruire quelle che chiamiamo “pipeline”. Le pipeline sono una serie di operazioni orchestrate da un framework chiamato Kubeflow. Kubeflow può essere eseguito su Vertex AI su Google Cloud.
Le ragioni per preferire le pipeline rispetto ai notebook in produzione possono essere discutibili, ma te ne darò tre in base alla mia esperienza:
- Monitoraggio e riproducibilità: ogni pipeline viene archiviata con i suoi artefatti (set di dati, modelli, metriche), il che significa che puoi confrontare le esecuzioni, rieseguirle e controllarle. Ogni volta che riesegui un notebook, perdi la cronologia (o devi gestire tu stesso gli artefatti e i registri. Buona fortuna.)
- Costi: Eseguire un notebook implica avere una macchina su cui viene eseguito. — Questa macchina ha un costo e per modelli di grandi dimensioni o set di dati enormi avrai bisogno di macchine virtuali con specifiche pesanti.
— Devi ricordarti di spegnerlo quando non lo usi.
— Oppure potresti semplicemente mandare in crash la tua macchina locale se scegli di non utilizzare una macchina virtuale e avere altre applicazioni in esecuzione.
— Le pipeline Vertex AI sono a senza server servizio, nel senso che non devi gestire l'infrastruttura sottostante e paghi solo quello che utilizzi, ovvero il tempo di esecuzione. - Scalabilità: Buona fortuna quando esegui dozzine di esperimenti contemporaneamente sul tuo laptop locale. Tornerai all'utilizzo di una VM, ridimensionerai quella VM e rileggerai il punto elenco sopra.
L'ultimo motivo per preferire le pipeline rispetto ai notebook è soggettivo e anch'esso altamente discutibile, ma a mio parere i notebook semplicemente non sono progettati per eseguire carichi di lavoro secondo una pianificazione. Sono ottimi però per l'esplorazione.
Utilizza almeno un processo cron con un'immagine Docker o pipeline se vuoi fare le cose nel modo giusto, ma non eseguire mai e poi mai un notebook in produzione.
Senza ulteriori indugi, scriviamo i componenti della nostra pipeline:
# IMPORT REQUIRED LIBRARIES
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
Dataset,
Input,
Model,
Output,
Metrics,
Markdown,
HTML,
component,
OutputPath,
InputPath)
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs%watermark --packages kfp,google.cloud.aiplatform
kfp : 2.7.0
google.cloud.aiplatform: 1.50.0
Il primo componente scaricherà i dati da Bigquery e li memorizzerà come file CSV.
La BASE_IMAGE che usiamo è l'immagine che abbiamo costruito in precedenza! Possiamo usarlo per importare moduli e funzioni che abbiamo definito nella nostra immagine Docker src
cartella:
@component(
base_image=BASE_IMAGE,
output_component_file="get_data.yaml"
)
def create_dataset_from_bq(
output_dir: Output(Dataset),
):from src.preprocess import create_dataset_from_bq
df = create_dataset_from_bq()
df.to_csv(output_dir.path, index=False)
Passaggio successivo: dividere i dati
@component(
base_image=BASE_IMAGE,
output_component_file="train_test_split.yaml",
)
def make_data_splits(
dataset_full: Input(Dataset),
dataset_train: Output(Dataset),
dataset_val: Output(Dataset),
dataset_test: Output(Dataset)):import pandas as pd
from src.preprocess import make_data_splits
df_agg = pd.read_csv(dataset_full.path)
df_agg.fillna('NA', inplace=True)
df_train, df_val, df_test = make_data_splits(df_agg)
print(f"{len(df_train)} samples in train")
print(f"{len(df_val)} samples in train")
print(f"{len(df_test)} samples in test")
df_train.to_csv(dataset_train.path, index=False)
df_val.to_csv(dataset_val.path, index=False)
df_test.to_csv(dataset_test.path, index=False)
Passaggio successivo: formazione del modello. Salveremo i punteggi del modello per visualizzarli nel passaggio successivo:
@component(
base_image=BASE_IMAGE,
output_component_file="train_model.yaml",
)
def train_model(
dataset_train: Input(Dataset),
dataset_val: Input(Dataset),
dataset_test: Input(Dataset),
model: Output(Model)
):import json
from src.train import train_and_evaluate
outputs = train_and_evaluate(
dataset_train.path,
dataset_val.path,
dataset_test.path
)
cb_model = outputs('model')
scores = outputs('scores')
model.metadata("framework") = "catboost"
# Save the model as an artifact
with open(model.path, 'w') as f:
json.dump(scores, f)
L'ultimo passaggio è il calcolo delle metriche (che vengono effettivamente calcolate durante l'addestramento del modello). È semplicemente necessario ma è bello mostrarti quanto sia facile costruire componenti leggeri. Nota come in questo caso non costruiamo il componente da BASE_IMAGE (che a volte può essere piuttosto grande), ma costruiamo solo un'immagine leggera con i componenti necessari:
@component(
base_image="python:3.9",
output_component_file="compute_metrics.yaml",
)
def compute_metrics(
model: Input(Model),
train_metric: Output(Metrics),
val_metric: Output(Metrics),
test_metric: Output(Metrics)
):import json
file_name = model.path
with open(file_name, 'r') as file:
model_metrics = json.load(file)
train_metric.log_metric('train_auc', model_metrics('train'))
val_metric.log_metric('val_auc', model_metrics('eval'))
test_metric.log_metric('test_auc', model_metrics('test'))
Di solito ci sono altri passaggi che possiamo includere, ad esempio se vogliamo distribuire il nostro modello come endpoint API, ma questo è di livello più avanzato e richiede la creazione di un'altra immagine Docker per il servizio del modello. Da coprire la prossima volta.
Ora incolliamo insieme i componenti:
# USE TIMESTAMP TO DEFINE UNIQUE PIPELINE NAMES
TIMESTAMP = dt.datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-thelook-demo-{}'.format(TIMESTAMP)
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"# Define the pipeline. Notice how steps reuse outputs from previous steps
@dsl.pipeline(
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline. Use to determine the pipeline Context.
name="pipeline-demo"
)
def pipeline(
project: str = PROJECT_ID,
region: str = REGION,
display_name: str = DISPLAY_NAME
):
load_data_op = create_dataset_from_bq()
train_test_split_op = make_data_splits(
dataset_full=load_data_op.outputs("output_dir")
)
train_model_op = train_model(
dataset_train=train_test_split_op.outputs("dataset_train"),
dataset_val=train_test_split_op.outputs("dataset_val"),
dataset_test=train_test_split_op.outputs("dataset_test"),
)
model_evaluation_op = compute_metrics(
model=train_model_op.outputs("model")
)
# Compile the pipeline as JSON
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path='thelook_pipeline.json'
)
# Start the pipeline
start_pipeline = pipeline_jobs.PipelineJob(
display_name="thelook-demo-pipeline",
template_path="thelook_pipeline.json",
enable_caching=False,
location=REGION,
project=PROJECT_ID
)
# Run the pipeline
start_pipeline.run(service_account=<your_service_account_here>)
Se tutto funziona bene, ora vedrai la tua pipeline nell'interfaccia utente di Vertex:
Puoi fare clic su di esso e vedere i diversi passaggi:
La scienza dei dati, nonostante tutti gli appassionati di no-code/low-code che ti dicono che non è necessario essere uno sviluppatore per fare machine learning, è un vero lavoro. Come ogni lavoro, richiede competenze, concetti e strumenti che vanno oltre i quaderni.
E per coloro che aspirano a diventare Data Scientist, ecco la realtà del lavoro.
Buona programmazione.
Fonte: towardsdatascience.com