Una guida pratica per ottimizzare i join non equi in Spark

Foto di John Lee su Unsplash

Arricchire gli eventi di rete con informazioni di geolocalizzazione IP è un compito cruciale, soprattutto per organizzazioni come Centro canadese per la sicurezza informaticail CSIRT nazionale del Canada. In questo articolo dimostreremo come ottimizzare i join Spark SQL, concentrandoci in particolare su scenari che coinvolgono condizioni di non uguaglianza, una sfida comune quando si lavora con dati di geolocalizzazione IP.

Come professionisti della sicurezza informatica, il nostro affidamento sull’arricchimento degli eventi di rete con database di geolocalizzazione IP richiede strategie efficienti per gestire join non equi. Sebbene numerosi articoli facciano luce sulle varie strategie di unione supportate da Spark, l’applicazione pratica di queste strategie rimane una preoccupazione prevalente per i professionisti del settore.

L’articolo penetrante di David Vrba, “Informazioni sui join in Spark 3.0”pubblicato su Towards Data Science, costituisce una risorsa preziosa. Spiega le condizioni che guidano la selezione di strategie di unione specifiche da parte di Spark. Nel suo articolo, David suggerisce brevemente che l’ottimizzazione dei non-equi join implica trasformarli in equi-join.

Questo articolo mira a fornire una guida pratica per ottimizzare le prestazioni di un JOIN non equi, con un focus specifico sull’unione con intervalli IP in una tabella di geolocalizzazione.

Per esemplificare queste ottimizzazioni, rivisiteremo la tabella di geolocalizzazione introdotta nel nostro articolo precedente.

+----------+--------+---------+-----------+-----------+
| start_ip | end_ip | country | city | owner |
+----------+--------+---------+-----------+-----------+
| 1 | 2 | ca | Toronto | Telus |
| 3 | 4 | ca | Quebec | Rogers |
| 5 | 8 | ca | Vancouver | Bell |
| 10 | 14 | ca | Montreal | Telus |
| 19 | 22 | ca | Ottawa | Rogers |
| 23 | 29 | ca | Calgary | Videotron |
+----------+--------+---------+-----------+-----------+

Equi-Join

Per illustrare l’esecuzione di un equi-join da parte di Spark, inizieremo la nostra esplorazione considerando uno scenario ipotetico. Supponiamo di avere una tabella di eventi, in cui ogni evento è associato a uno specifico ownerindicato con il event_owner colonna.

+------------+--------------+
| event_time | event_owner |
+------------+--------------+
| 2024-01-01 | Telus |
| 2024-01-02 | Bell |
| 2024-01-03 | Rogers |
| 2024-01-04 | Videotron |
| 2024-01-05 | Telus |
| 2024-01-06 | Videotron |
| 2024-01-07 | Rogers |
| 2024-01-08 | Bell |
+------------+--------------+

Diamo uno sguardo più da vicino a come Spark gestisce questo equi-join:

SELECT
*
FROM
events
JOIN geolocation
ON (event_owner = owner)

In questo esempio, l’equi-join viene stabilito tra i events tavolo e il geolocation tavolo. Il criterio di collegamento si basa sull’uguaglianza dei event_owner colonna nel events tavolo e il owner colonna nel geolocation tavolo.

Come spiegato da David Vrba nel suo post sul blog:

Spark pianificherà l’unione con SMJ se esiste un’equicondizione e le chiavi di unione sono ordinabili

Spark eseguirà un Sort Merge Join, distribuendo le righe delle due tabelle tramite hashing del file event_owner sul lato sinistro e il owner dal lato giusto. Le righe di entrambe le tabelle con hash sulla stessa partizione Spark verranno elaborate dalla stessa attività Spark, un’unità di lavoro. Ad esempio, l’Attività-1 potrebbe ricevere:

+----------+-------+---------+-----------+-----------+
| start_ip | end_ip| country | city | owner |
+----------+-------+---------+-----------+-----------+
| 1 | 2 | ca | Toronto | Telus |
| 10 | 14 | ca | Montreal | Telus |
+----------+-------+---------+-----------+-----------+

+------------+--------------+
| event_time | event_owner |
+------------+--------------+
| 2024-01-01 | Telus |
| 2024-01-05 | Telus |
+------------+--------------+

Nota come l’Attività 1 gestisce solo un sottoinsieme dei dati. Il problema di unione è suddiviso in più attività più piccole, in cui è richiesto solo un sottoinsieme di righe sia dal lato sinistro che da quello destro. Inoltre, le righe del lato sinistro e destro elaborate dall’Attività-1 devono corrispondere. Questo è vero perché ogni occorrenza di “Telus” avrà l’hash sulla stessa partizione, indipendentemente dal fatto che provenga da events O geolocation tavoli. Possiamo essere certi che nessun altro Task-X avrà righe con un owner di “Telus”.

Una volta divisi i dati come mostrato sopra, Spark ordinerà entrambi i lati, da qui il nome della strategia di join, Sort Merge Join. L’unione viene eseguita prendendo la prima riga a sinistra e verificando se corrisponde a destra. Una volta che le righe a destra non corrispondono più, Spark tirerà le righe da sinistra. Continuerà a rimuovere dalla coda ciascun lato finché non rimarranno più righe su entrambi i lati.

Partecipazione non equi

Ora che abbiamo una migliore comprensione di come vengono eseguiti gli equi-join, confrontiamolo con un non-equi join. Supponiamo di avere eventi con an event_ipe vogliamo aggiungere informazioni di geolocalizzazione a questa tabella.

+------------+----------+
| event_time | event_ip |
+------------+----------+
| 2024-01-01 | 6 |
| 2024-01-02 | 14 |
| 2024-01-03 | 18 |
| 2024-01-04 | 27 |
| 2024-01-05 | 9 |
| 2024-01-06 | 23 |
| 2024-01-07 | 15 |
| 2024-01-08 | 1 |
+------------+----------+

Per eseguire questo join, dobbiamo determinare l’intervallo IP all’interno del quale event_ip cascate. Otteniamo ciò con la seguente condizione:

SELECT
*
FROM
events
JOIN geolocation
ON (event_ip >= start_ip and event_ip <= end_ip)

Consideriamo ora come Spark eseguirà questo join. Sul lato destro (la tabella di geolocalizzazione), non esiste una chiave con cui Spark possa eseguire l’hashing e distribuire le righe. È impossibile dividere questo problema in attività più piccole che possono essere distribuite nel cluster di calcolo ed eseguite in parallelo.

In una situazione come questa, Spark è costretto a impiegare strategie di join che richiedono più risorse. Come affermato da David Vrba:

Se non è presente alcuna equi-condizione, Spark deve utilizzare BroadcastNestedLoopJoin (BNLJ) o il prodotto cartesiano (CPJ).

Entrambe queste strategie implicano la forzatura bruta del problema; per ogni riga sul lato sinistro, Spark testerà la condizione “tra” su ogni singola riga del lato destro. Non ha altra scelta. Se la tabella a destra è sufficientemente piccola, Spark può ottimizzare copiando la tabella a destra in ogni attività che legge il lato sinistro, uno scenario noto come caso BNLJ. Tuttavia, se il lato sinistro è troppo grande, ogni attività dovrà leggere sia il lato destro che quello sinistro della tabella, nel caso CPJ. In entrambi i casi, entrambe le strategie sono altamente costose.

Quindi, come possiamo migliorare questa situazione? Il trucco sta nell’introdurre un’uguaglianza nella condizione di unione. Ad esempio, potremmo semplicemente srotolare tutti gli intervalli IP nella tabella di geolocalizzazione, producendo una riga per ogni IP trovato negli intervalli IP.

Questo è facilmente realizzabile in Spark; possiamo eseguire il seguente SQL per srotolare tutti gli intervalli IP:

SELECT
country,
city,
owner,
explode(sequence(start_ip, end_ip)) AS ip
FROM
geolocation

IL sequenza la funzione crea un array con i valori IP da start_ip A end_ip. IL esplodere La funzione svolge questa matrice in singole righe.

+---------+---------+---------+-----------+
| country | city | owner | ip |
+---------+---------+---------+-----------+
| ca | Toronto | Telus | 1 |
| ca | Toronto | Telus | 2 |
| ca | Quebec | Rogers | 3 |
| ca | Quebec | Rogers | 4 |
| ca | Vancouver | Bell | 5 |
| ca | Vancouver | Bell | 6 |
| ca | Vancouver | Bell | 7 |
| ca | Vancouver | Bell | 8 |
| ca | Montreal | Telus | 10 |
| ca | Montreal | Telus | 11 |
| ca | Montreal | Telus | 12 |
| ca | Montreal | Telus | 13 |
| ca | Montreal | Telus | 14 |
| ca | Ottawa | Rogers | 19 |
| ca | Ottawa | Rogers | 20 |
| ca | Ottawa | Rogers | 21 |
| ca | Ottawa | Rogers | 22 |
| ca | Calgary | Videotron | 23 |
| ca | Calgary | Videotron | 24 |
| ca | Calgary | Videotron | 25 |
| ca | Calgary | Videotron | 26 |
| ca | Calgary | Videotron | 27 |
| ca | Calgary | Videotron | 28 |
| ca | Calgary | Videotron | 29 |
+---------+---------+---------+-----------+

Con una chiave su entrambi i lati, ora possiamo eseguire un equi-join e Spark può distribuire in modo efficiente il problema, ottenendo prestazioni ottimali. Tuttavia, in pratica, questo scenario non è realistico, poiché una vera tabella di geolocalizzazione contiene spesso miliardi di righe.

Per risolvere questo problema, possiamo migliorare l’efficienza aumentando la grossolanità di questa mappatura. Invece di mappare gli intervalli IP su ogni singolo IP, possiamo mappare gli intervalli IP su segmenti all’interno dello spazio IP. Supponiamo di dividere lo spazio IP in segmenti di 5. Lo spazio segmentato sarebbe simile a questo:

+---------------+-------------+-----------+
| segment_start | segment_end | bucket_id |
+---------------+-------------+-----------+
| 1 | 5 | 0 |
| 6 | 10 | 1 |
| 11 | 15 | 2 |
| 16 | 20 | 3 |
| 21 | 25 | 4 |
| 26 | 30 | 5 |
+---------------+-------------+-----------+

Ora, il nostro obiettivo è mappare gli intervalli IP sui segmenti con cui si sovrappongono. Similmente a quanto fatto in precedenza, possiamo srotolare gli intervalli IP, ma questa volta lo faremo in segmenti di 5.

SELECT
country,
city,
owner,
explode(sequence(start_ip / 5, end_ip / 5)) AS bucket_id
FROM
geolocations

Osserviamo che alcuni intervalli IP condividono a bucket_id. Gli intervalli 1–2 e 3–4 rientrano entrambi nel segmento 1–5.

+----------+--------+---------+-----------+-----------+-----------+
| start_ip | end_ip | country | city | owner | bucket_id |
+----------+--------+---------+-----------+-----------+-----------+
| 1 | 2 | ca | Toronto | Telus | 0 |
| 3 | 4 | ca | Quebec | Rogers | 0 |
| 5 | 8 | ca | Vancouver | Bell | 1 |
| 10 | 14 | ca | Montreal | Telus | 2 |
| 19 | 22 | ca | Ottawa | Rogers | 3 |
| 19 | 22 | ca | Ottawa | Rogers | 4 |
| 23 | 29 | ca | Calgary | Videotron | 4 |
| 23 | 29 | ca | Calgary | Videotron | 5 |
+----------+--------+---------+-----------+-----------+-----------+

Inoltre, notiamo che alcuni intervalli IP sono duplicati. Le ultime due righe per l’intervallo IP 23–29 si sovrappongono ai segmenti 20–25 e 26–30. Similmente allo scenario in cui abbiamo srotolato i singoli IP, stiamo ancora duplicando le righe, ma in misura molto minore.

Ora possiamo utilizzare questa tabella con bucket per eseguire la nostra unione.

SELECT
*
FROM
events
JOIN geolocation
ON (
event_ip / 5 = bucket_id
AND event_ip >= start_ip
AND event_ip <= end_ip
)

L’uguaglianza nel join consente a Spark di eseguire una strategia Sort Merge Join (SMJ). La condizione “tra” elimina i casi in cui gli intervalli IP condividono lo stesso bucket_id.

In questa illustrazione abbiamo utilizzato segmenti da 5; tuttavia, in realtà, segmenteremmo lo spazio IP in segmenti di 256. Questo perché lo spazio degli indirizzi IP globale è supervisionato dalla Internet Assigned Numbers Authority (IANA) e, tradizionalmente, la IANA alloca lo spazio degli indirizzi in blocchi di 256 IP.

Analizzare gli intervalli IP in una vera tabella di geolocalizzazione utilizzando Spark approx_percentile La funzione rivela che la maggior parte dei record ha intervalli inferiori a 256, mentre pochissimi sono superiori a 256.

SELECT 
approx_percentile(
end_ip - start_ip,
array(0.800, 0.900, 0.950, 0.990, 0.999, 0.9999),
10000)
FROM
geolocation

Ciò implica che alla maggior parte degli intervalli IP viene assegnato un bucket_idmentre i pochi più grandi vengono srotolati, risultando nella tabella srotolata contenente circa il 10% in più di righe.

Una query eseguita con una tabella di geolocalizzazione autentica potrebbe essere simile alla seguente:

WITH
b_geo AS (
SELECT
explode(
sequence(
CAST(start_ip / 256 AS INT),
CAST(end_ip / 256 AS INT))) AS bucket_id,
*
FROM
geolocation
),
b_events AS (
SELECT
CAST(event_ip / 256 AS INT) AS bucket_id,
*
FROM
events
)

SELECT
*
FROM
b_events
JOIN b_geo
ON (
b_events.bucket_id = b_geo.bucket_id
AND b_events.event_ip >= b_geo.start_ip
AND b_events.event_ip <= b_geo.end_ip
);

Conclusione

In conclusione, questo articolo ha presentato una dimostrazione pratica di conversione di un non-equi join in un equi-join attraverso l’implementazione di una tecnica di mappatura che prevede la segmentazione degli intervalli IP. È fondamentale notare che questo approccio si estende oltre gli indirizzi IP e può essere applicato a qualsiasi set di dati caratterizzato da bande o intervalli.

La capacità di mappare e segmentare in modo efficace i dati è uno strumento prezioso nell’arsenale di ingegneri e analisti di dati, poiché fornisce una soluzione pragmatica alle sfide poste dalle condizioni di non uguaglianza nei join Spark SQL.

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *