Approfondimenti sulle prestazioni dai rilevamenti delle regole Sigma in Spark Streaming |  di Jean-Claude Cote |  Giugno 2024

 | Intelligenza-Artificiale

Utilizzo delle regole Sigma per il rilevamento di anomalie nei registri di sicurezza informatica: uno studio sull'ottimizzazione delle prestazioni

Foto di Ed Vazquez su Unsplash

Uno dei ruoli del Centro canadese per la sicurezza informatica (CCCS) è quello di rilevare le anomalie ed emettere soluzioni di mitigazione il più rapidamente possibile.

Durante la messa in produzione dei rilevamenti delle regole Sigma, abbiamo fatto un'osservazione interessante nella nostra applicazione di streaming Spark. L'esecuzione di una singola istruzione SQL di grandi dimensioni che esprime 1000 regole di rilevamento Sigma era più lenta rispetto all'esecuzione di cinque query separate, ciascuna delle quali applicava 200 regole Sigma. Ciò è stato sorprendente, poiché l'esecuzione di cinque query costringe Spark a leggere i dati di origine cinque volte anziché una. Per ulteriori dettagli vi invitiamo a consultare la nostra serie di articoli:

Data la grande quantità di dati di telemetria e regole di rilevamento che dobbiamo eseguire, ogni miglioramento in termini di prestazioni comporta un notevole risparmio sui costi. Pertanto, abbiamo deciso di indagare su questa peculiare osservazione, con l’obiettivo di spiegarla e potenzialmente scoprire ulteriori opportunità per migliorare le prestazioni. Abbiamo imparato alcune cose lungo il percorso e volevamo condividerle con la comunità più ampia.

introduzione

La nostra impressione era che stessimo raggiungendo un limite nella generazione del codice di Spark. Quindi, è necessaria una piccola conoscenza di base su questo argomento. Nel 2014 Spark ha introdotto la generazione di codice per valutare le espressioni del modulo (id > 1 and id > 2) and (id < 1000 or (id + id) = 12). Questo articolo di Databricks lo spiega molto bene: Emozionanti miglioramenti delle prestazioni all'orizzonte per Spark SQL

Due anni dopo, Spark ha introdotto la generazione di codice a stadio intero. Questa ottimizzazione unisce più operatori in un'unica funzione Java. Come la generazione del codice di espressione, la generazione del codice a stadio intero elimina le chiamate di funzioni virtuali e sfrutta i registri della CPU per i dati intermedi. Tuttavia, anziché essere a livello di espressione, viene applicato a livello di operatore. Gli operatori sono i nodi di un piano di esecuzione. Per saperne di più, leggi Apache Spark come compilatore: unire un miliardo di righe al secondo su un laptop

Per riassumere questi articoli, generiamo il piano per questa semplice query:

explain codegen
select
id,
(id > 1 and id > 2) and (id < 1000 or (id + id) = 12) as test
from
range(0, 10000, 1, 32)

In questa semplice query utilizziamo due operatori: Range per generare righe e Select per eseguire una proiezione. Vediamo questi operatori nel piano fisico della query. Notare l'asterisco (codegen id : 1)accanto ai nodi e ai loro associati

|== Physical Plan ==
* Project (2)
+- * Range (1)

(1) Range (codegen id : 1)
Output (1): (id#36167L)
Arguments: Range (0, 10000, step=1, splits=Some(32))

(2) Project (codegen id : 1)
Output (2): (id#36167L, (((id#36167L > 1) AND (id#36167L > 2)) AND ((id#36167L < 1000) OR ((id#36167L + id#36167L) = 12))) AS test#36161)
Input (1): (id#36167L)

. Ciò indica che questi due operatori sono stati uniti in un'unica funzione Java utilizzando la generazione di codice a fase intera.

Generated code:
/* 001 */ public Object generate(Object() references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object() references;
/* 008 */ private scala.collection.Iterator() inputs;
/* 009 */ private boolean range_initRange_0;
/* 010 */ private long range_nextIndex_0;
/* 011 */ private TaskContext range_taskContext_0;
/* 012 */ private InputMetrics range_inputMetrics_0;
/* 013 */ private long range_batchEnd_0;
/* 014 */ private long range_numElementsTodo_0;
/* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter() range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3);
/* 016 */
/* 017 */ public GeneratedIteratorForCodegenStage1(Object() references) {
/* 018 */ this.references = references;
/* 019 */ }
/* 020 */
/* 021 */ public void init(int index, scala.collection.Iterator() inputs) {
/* 022 */ partitionIndex = index;
/* 023 */ this.inputs = inputs;
/* 024 */
/* 025 */ range_taskContext_0 = TaskContext.get();
/* 026 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 027 */ range_mutableStateArray_0(0) = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 028 */ range_mutableStateArray_0(1) = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */ range_mutableStateArray_0(2) = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 030 */
/* 031 */ }
/* 032 */
/* 033 */ private void project_doConsume_0(long project_expr_0_0) throws java.io.IOException {
/* 034 */ // common sub-expressions
/* 035 */
/* 036 */ boolean project_value_4 = false;
/* 037 */ project_value_4 = project_expr_0_0 > 1L;
/* 038 */ boolean project_value_3 = false;
/* 039 */
/* 040 */ if (project_value_4) {
/* 041 */ boolean project_value_7 = false;
/* 042 */ project_value_7 = project_expr_0_0 > 2L;
/* 043 */ project_value_3 = project_value_7;
/* 044 */ }
/* 045 */ boolean project_value_2 = false;
/* 046 */
/* 047 */ if (project_value_3) {
/* 048 */ boolean project_value_11 = false;
/* 049 */ project_value_11 = project_expr_0_0 < 1000L;
/* 050 */ boolean project_value_10 = true;
/* 051 */
/* 052 */ if (!project_value_11) {
/* 053 */ long project_value_15 = -1L;
/* 054 */
/* 055 */ project_value_15 = project_expr_0_0 + project_expr_0_0;
/* 056 */
/* 057 */ boolean project_value_14 = false;
/* 058 */ project_value_14 = project_value_15 == 12L;
/* 059 */ project_value_10 = project_value_14;
/* 060 */ }
/* 061 */ project_value_2 = project_value_10;
/* 062 */ }
/* 063 */ range_mutableStateArray_0(2).reset();
/* 064 */
/* 065 */ range_mutableStateArray_0(2).write(0, project_expr_0_0);
/* 066 */
/* 067 */ range_mutableStateArray_0(2).write(1, project_value_2);
/* 068 */ append((range_mutableStateArray_0(2).getRow()));
/* 069 */
/* 070 */ }
/* 071 */
/* 072 */ private void initRange(int idx) {
/* 073 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 074 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(32L);
/* 075 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
/* 076 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 077 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 078 */ long partitionEnd;
/* 079 */
/* 080 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 081 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 082 */ range_nextIndex_0 = Long.MAX_VALUE;
/* 083 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 084 */ range_nextIndex_0 = Long.MIN_VALUE;
/* 085 */ } else {
/* 086 */ range_nextIndex_0 = st.longValue();
/* 087 */ }
/* 088 */ range_batchEnd_0 = range_nextIndex_0;
/* 089 */
/* 090 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 091 */ .multiply(step).add(start);
/* 092 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 093 */ partitionEnd = Long.MAX_VALUE;
/* 094 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 095 */ partitionEnd = Long.MIN_VALUE;
/* 096 */ } else {
/* 097 */ partitionEnd = end.longValue();
/* 098 */ }
/* 099 */
/* 100 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 101 */ java.math.BigInteger.valueOf(range_nextIndex_0));
/* 102 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue();
/* 103 */ if (range_numElementsTodo_0 < 0) {
/* 104 */ range_numElementsTodo_0 = 0;
/* 105 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 106 */ range_numElementsTodo_0++;
/* 107 */ }
/* 108 */ }
/* 109 */
/* 110 */ protected void processNext() throws java.io.IOException {
/* 111 */ // initialize Range
/* 112 */ if (!range_initRange_0) {
/* 113 */ range_initRange_0 = true;
/* 114 */ initRange(partitionIndex);
/* 115 */ }
/* 116 */
/* 117 */ while (true) {
/* 118 */ if (range_nextIndex_0 == range_batchEnd_0) {
/* 119 */ long range_nextBatchTodo_0;
/* 120 */ if (range_numElementsTodo_0 > 1000L) {
/* 121 */ range_nextBatchTodo_0 = 1000L;
/* 122 */ range_numElementsTodo_0 -= 1000L;
/* 123 */ } else {
/* 124 */ range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 125 */ range_numElementsTodo_0 = 0;
/* 126 */ if (range_nextBatchTodo_0 == 0) break;
/* 127 */ }
/* 128 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 129 */ }
/* 130 */
/* 131 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 132 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 133 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 134 */
/* 135 */ project_doConsume_0(range_value_0);
/* 136 */
/* 137 */ if (shouldStop()) {
/* 138 */ range_nextIndex_0 = range_value_0 + 1L;
/* 139 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references(0) /* numOutputRows */).add(range_localIdx_0 + 1);
/* 140 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 141 */ return;
/* 142 */ }
/* 143 */
/* 144 */ }
/* 145 */ range_nextIndex_0 = range_batchEnd_0;
/* 146 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references(0) /* numOutputRows */).add(range_localEnd_0);
/* 147 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 148 */ range_taskContext_0.killTaskIfInterrupted();
/* 149 */ }
/* 150 */ }
/* 151 */
/* 152 */ }

Il codice generato mostra chiaramente la fusione dei due operatori. project_doConsume_0 IL (id > 1 and id > 2) and (id < 1000 or (id + id) = 12)la funzione contiene il codice da valutare

. Nota come viene generato questo codice per valutare questa espressione specifica. Questa è un'illustrazione della generazione del codice di espressione. processNext L'intera classe è un operatore con a project_doConsume_0metodo. Questo operatore generato esegue sia le operazioni di proiezione che quelle di intervallo. All'interno del ciclo while alla riga 117, vediamo il codice per produrre righe e una chiamata specifica (non una funzione virtuale) a

. Questo illustra cosa fa la generazione del codice a tutto stadio.

Abbattere la performance Imagepath Ora che abbiamo una migliore comprensione della generazione del codice di Spark, proviamo a spiegare perché suddividere una query che esegue regole 1000 Sigma in regole più piccole offre risultati migliori. Consideriamo un'istruzione SQL che valuta due regole Sigma. Queste regole sono semplici: la Regola1 abbina gli eventi con un Imagepath che termina con “schtask.exe” e Rule2 corrisponde a an


select /* #3 */
Imagepath,
CommandLine,
PID,
map_keys(map_filter(results_map, (k,v) -> v = TRUE)) as matching_rules
from (
select /* #2 */
*,
map('rule1', rule1, 'rule2', rule2) as results_map
from (
select /* #1 */
*,
(lower_Imagepath like '%schtasks.exe') as rule1,
(lower_Imagepath like 'd:%') as rule2
from (
select
lower(PID) as lower_PID,
lower(CommandLine) as lower_CommandLine,
lower(Imagepath) as lower_Imagepath,
*
from (
select
uuid() as PID,
uuid() as CommandLine,
uuid() as Imagepath,
id
from
range(0, 10000, 1, 32)
)
)
)
)

iniziando con 'd:'. results_mapLa selezione etichettata #1 esegue i rilevamenti e memorizza i risultati in nuove colonne denominate regola1 e regola2. Seleziona #2 raggruppa queste colonne sotto una singola map_filter e infine selezionare #3 trasforma la mappa in una serie di regole corrispondenti. Utilizza map_keys per conservare solo le voci delle regole che effettivamente corrispondevano, e poi

viene utilizzato per convertire le voci della mappa in un elenco di nomi di regole corrispondenti.


== Physical Plan ==
Project (4)
+- * Project (3)
+- * Project (2)
+- * Range (1)

...

(4) Project
Output (4): (Imagepath#2, CommandLine#1, PID#0, map_keys(map_filter(map(rule1, EndsWith(lower_Imagepath#5, schtasks.exe), rule2, StartsWith(lower_Imagepath#5, d:)), lambdafunction(lambda v#12, lambda k#11, lambda v#12, false))) AS matching_rules#9)
Input (4): (lower_Imagepath#5, PID#0, CommandLine#1, Imagepath#2)

Stampiamo il piano di esecuzione di Spark per questa query:

Si noti che il nodo Progetto (4) non è un codice generato. Il nodo 4 ha una funzione lambda, impedisce la generazione del codice dell'intera fase? Ne parleremo più avanti.

+--------------------+--------------------+--------------------+--------------+
| Imagepath| CommandLine| PID| matched_rule|
+--------------------+--------------------+--------------------+--------------+
|09401675-dc09-4d0...|6b8759ee-b55a-486...|44dbd1ec-b4e0-488...| rule1|
|e2b4a0fd-7b88-417...|46dd084d-f5b0-4d7...|60111cf8-069e-4b8...| rule1|
|1843ee7a-a908-400...|d1105cec-05ef-4ee...|6046509a-191d-432...| rule2|
+--------------------+--------------------+--------------------+--------------+

Questa query non è proprio ciò che vogliamo. Vorremmo produrre una tabella di eventi con una colonna che indica la regola che è stata soddisfatta. Qualcosa come questo: matching_rules È abbastanza facile. Dobbiamo solo far esplodere il


select
Imagepath,
CommandLine,
PID,
matched_rule
from (
select
*,
explode(matching_rules) as matched_rule
from (
/* original statement */
)
)

colonna.

== Physical Plan ==
* Project (7)
+- * Generate (6)
+- Project (5)
+- * Project (4)
+- Filter (3)
+- * Project (2)
+- * Range (1)

...

(3) Filter
Input (3): (PID#34, CommandLine#35, Imagepath#36)
Condition : (size(map_keys(map_filter(map(rule1, EndsWith(lower(Imagepath#36),
schtasks.exe), rule2, StartsWith(lower(Imagepath#36), d:)),
lambdafunction(lambda v#47, lambda k#46, lambda v#47, false))), true) > 0)
...

(6) Generate (codegen id : 3)
Input (4): (PID#34, CommandLine#35, Imagepath#36, matching_rules#43)
Arguments: explode(matching_rules#43), (PID#34, CommandLine#35, Imagepath#36), false, (matched_rule#48)

(7) Project (codegen id : 3)
Output (4): (Imagepath#36, CommandLine#35, PID#34, matched_rule#48)
Input (4): (PID#34, CommandLine#35, Imagepath#36, matched_rule#48)

Ciò produce due operatori aggiuntivi: Genera (6) e Progetto (7). Tuttavia, c'è anche un nuovo Filtro (3). explode IL explode la funzione genera righe per ogni elemento dell'array. Quando l'array è vuoto,

non produce alcuna riga, filtrando efficacemente le righe in cui l'array è vuoto. org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerateSpark dispone di una regola di ottimizzazione che rileva la funzione di esplosione e produce questa condizione aggiuntiva. Il filtro è un tentativo di Spark di cortocircuitare il più possibile l'elaborazione. Il codice sorgente per questa regola, denominato

lo spiega così:

Deduce i filtri da Generate, in modo che le righe che sarebbero state rimosse da questo Generate possano essere rimosse in precedenza, prima dei join e nelle origini dati. Per maggiori dettagli su come Spark ottimizza i piani di esecuzione, fare riferimento all'articolo di David Vrba

Padroneggiare i piani di query in Spark 3.0

Sorge un’altra domanda: traiamo vantaggio da questo filtro aggiuntivo? Si noti che anche questo filtro aggiuntivo non è generato dal codice dell'intera fase, presumibilmente a causa della funzione lambda. Proviamo a esprimere la stessa query ma senza utilizzare una funzione lambda. map_filterPossiamo invece inserire i risultati della regola in una mappa, esplodere la mappa e filtrare le righe, evitando così la necessità di


select
Imagepath,
CommandLine,
PID,
matched_rule
from (
select
*
from (
select
*,
explode(results_map) as (matched_rule, matched_result)
from (
/* original statement */
)
)
where
matched_result = TRUE
)

. matched_rule L'operazione select #3 esplode la mappa in due nuove colonne. IL matched_result la colonna conterrà la chiave, che rappresenta il nome della regola, mentre la colonna matched_resultla colonna conterrà il risultato del test di rilevamento. Per filtrare le righe, manteniamo semplicemente solo quelle con un positivo

.


== Physical Plan ==
* Project (8)
+- * Filter (7)
+- * Generate (6)
+- * Project (5)
+- * Project (4)
+- * Filter (3)
+- * Project (2)
+- * Range (1)

Il piano fisico indica che tutti i nodi sono codice a fase intera generato in un'unica funzione Java, il che è promettente. map_filter Conduciamo alcuni test per confrontare le prestazioni della query utilizzando

e quello che usa esplode quindi filtra.

Abbiamo eseguito questi test su una macchina con 4 CPU. Abbiamo generato 1 milione di righe, ciascuna con 100 regole e ciascuna regola valuta 5 espressioni. Questi test sono stati eseguiti 5 volte.

  • In media
  • map_filter ha impiegato 42,6 secondi

burst_then_filter ha impiegato 51,2 secondi

Pertanto, map_filter è leggermente più veloce anche se non utilizza la generazione del codice in tutta la fase.

Caused by: org.codehaus.commons.compiler.InternalCompilerException: Code grows beyond 64 KB

Tuttavia, nella nostra query di produzione, eseguiamo molte più regole Sigma, per un totale di 1000 regole. Ciò include 29 espressioni regex, 529 uguali, 115 iniziano con, 2352 finiscono con e 5838 contengono espressioni. Testiamo nuovamente la nostra query, ma questa volta con un leggero aumento del numero di espressioni, utilizzando 7 invece di 5 per regola. In questo modo, abbiamo riscontrato un errore nei nostri registri: spark.sql.codegen.maxFields Abbiamo provato ad aumentare spark.sql.codegen.hugeMethodLimitE

ma fondamentalmente, le classi Java hanno un limite di dimensione della funzione di 64 KB. Inoltre, il compilatore JVM JIT si limita a compilare funzioni inferiori a 8 KB.

Tuttavia, la query funziona ancora correttamente perché Spark ricorre al modello di esecuzione Volcano per alcune parti del piano. Dopotutto, WholeStageCodeGen è solo un'ottimizzazione.

  • Eseguendo lo stesso test di prima ma con 7 espressioni per regola anziché 5, exploit_then_filter è molto più veloce di map_filter.
  • map_filter ha impiegato 68,3 secondi

burst_then_filter ha impiegato 15,8 secondi org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate L'aumento del numero di espressioni fa sì che parti di exploit_then_filter non siano più generate come codice dell'intera fase. In particolare, l'operatore Filter introdotto dalla regola

spark.sql("SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate")

è troppo grande per essere incorporato nella generazione del codice in tutta la fase. Vediamo cosa succede se escludiamo la regola InferFiltersFromGenerate:


== Physical Plan ==
* Project (6)
+- * Generate (5)
+- Project (4)
+- * Project (3)
+- * Project (2)
+- * Range (1)

== Physical Plan ==
* Project (7)
+- * Filter (6)
+- * Generate (5)
+- * Project (4)
+- * Project (3)
+- * Project (2)
+- * Range (1)

Come previsto, il piano fisico di entrambe le query non dispone più di un operatore Filter aggiuntivo.

  • La rimozione della regola ha infatti avuto un impatto significativo sulle prestazioni:
  • map_filter ha impiegato 22,49 secondi

burst_then_filter ha impiegato 4,08 secondi

Entrambe le query hanno tratto grandi benefici dalla rimozione della regola. Visto il miglioramento delle prestazioni, abbiamo deciso di aumentare il numero di regole Sigma a 500 e la complessità a 21 espressioni:

  • Risultati:
  • map_filter ha impiegato 195,0 secondi

burst_then_filter ha impiegato 25,09 secondi

Nonostante la maggiore complessità, entrambe le query offrono comunque prestazioni piuttosto buone, con exploit_then_filter che supera significativamente map_filter.

È interessante esplorare i diversi aspetti della generazione del codice utilizzata da Spark. Anche se al momento potremmo non trarre vantaggio dalla generazione del codice in tutta la fase, possiamo comunque ottenere vantaggi dalla generazione di espressioni. spark.sql.codegen.methodSplitThreshold La generazione di espressioni non presenta le stesse limitazioni della generazione di codice a fase intera. Alberi di espressione molto grandi possono essere suddivisi in alberi più piccoli e in Spark

controlla come questi vengono suddivisi. Sebbene abbiamo sperimentato questa proprietà, non abbiamo osservato miglioramenti significativi. L'impostazione predefinita sembra soddisfacente. spark.sql.codegen.factoryModeSpark fornisce una proprietà di debug denominata spark.sql.codegen.factoryMode=NO_CODEGENche può essere impostato su FALLBACK, CODEGEN_ONLY o NO_CODEGEN. Possiamo disattivare la generazione del codice di espressione impostando

che si traduce in un drastico degrado delle prestazioni:

  • Con 500 regole e 21 espressioni:
  • map_filter ha impiegato 1581 secondi

burst_then_filter ha impiegato 122,31 secondi.

Anche se non tutti gli operatori partecipano alla generazione del codice in tutta la fase, osserviamo comunque vantaggi significativi dalla generazione del codice di espressione.

I risultati

Immagine dell'autore

Con il nostro caso migliore di 25,1 secondi per valutare 10.500 espressioni su 1 milione di righe, otteniamo una velocità di tutto rispetto di 104 milioni di espressioni al secondo per CPU. map_filter Il risultato di questo studio è che quando valutiamo un gran numero di espressioni, possiamo trarre vantaggio dalla conversione delle nostre query in use org.apache.spark.sql.catalyst.optimizer.InferFiltersFromGenerate a quelli che utilizzano un approccio esploso e poi filtrato. Inoltre, il

regola non sembra vantaggiosa nel nostro caso d'uso, quindi dovremmo escludere tale regola dalle nostre query.

Spiega le nostre osservazioni iniziali?

L’implementazione di queste lezioni apprese nei nostri lavori di produzione ha prodotto vantaggi significativi. Tuttavia, anche dopo queste ottimizzazioni, la suddivisione della query di grandi dimensioni in più query più piccole ha continuato a fornire vantaggi. Dopo ulteriori indagini, abbiamo scoperto che ciò non era dovuto esclusivamente alla generazione del codice, ma piuttosto a una spiegazione più semplice.

Lo streaming Spark funziona eseguendo un micro-batch fino al completamento e quindi controllandone l'avanzamento prima di avviare un nuovo micro-batch.

Durante ogni micro-lotto, Spark deve completare tutte le sue attività, in genere 200. Tuttavia, non tutte le attività sono uguali. Spark utilizza una strategia round-robin per distribuire le righe tra queste attività. Pertanto, a volte, alcune attività possono contenere eventi con attributi di grandi dimensioni, ad esempio una riga di comando molto grande, causando il completamento rapido di alcune attività mentre altre richiedono molto più tempo. Ad esempio qui la distribuzione del tempo di esecuzione di un'attività in micro-batch. Il tempo medio dell'attività è di 14 secondi. Tuttavia, il ritardo peggiore è di 1,6 minuti!

Immagine dell'autore

Questo in effetti fa luce su un fenomeno diverso. Il fatto che Spark attenda alcune attività ritardatarie durante ogni micro-batch lascia molte CPU inattive, il che spiega perché la suddivisione della query di grandi dimensioni in più query più piccole ha comportato prestazioni complessive più veloci.

Questa immagine mostra 5 query più piccole in esecuzione in parallelo all'interno della stessa applicazione Spark. Batch3 è in attesa di un'attività in ritardo mentre le altre query continuano a progredire.

Immagine dell'autore

Durante questi periodi di attesa, Spark può utilizzare le CPU inattive per affrontare altre query, massimizzando così l'utilizzo delle risorse e il throughput complessivo.

Conclusione

In questo articolo abbiamo fornito una panoramica del processo di generazione del codice di Spark e abbiamo discusso di come le ottimizzazioni integrate potrebbero non sempre produrre risultati desiderabili. Inoltre, abbiamo dimostrato che il refactoring di una query dall'utilizzo delle funzioni lambda a una che utilizza una semplice operazione di esplosione ha comportato miglioramenti delle prestazioni. Infine, abbiamo concluso che, sebbene la suddivisione di un'istruzione di grandi dimensioni abbia portato a miglioramenti delle prestazioni, il fattore principale che ha portato a questi miglioramenti è stata la topologia di esecuzione piuttosto che le query stesse.

Fonte: towardsdatascience.com

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *