Dopo aver configurato il nostro database Postgres, approfondiamo i dettagli del lavoro Spark. L’obiettivo è trasmettere in streaming i dati dall’argomento Kafka promemoria_conso alla tabella Postgres promemoria_conso_tabella.
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType,
StructField,
StringType,
)
from pyspark.sql.functions import from_json, col
from src.constants import POSTGRES_URL, POSTGRES_PROPERTIES, DB_FIELDS
import logginglogging.basicConfig(
level=logging.INFO, format="%(asctime)s:%(funcName)s:%(levelname)s:%(message)s"
)
def create_spark_session() -> SparkSession:
spark = (
SparkSession.builder.appName("PostgreSQL Connection with PySpark")
.config(
"spark.jars.packages",
"org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
)
.getOrCreate()
)
logging.info("Spark session created successfully")
return spark
def create_initial_dataframe(spark_session):
"""
Reads the streaming data and creates the initial dataframe accordingly.
"""
try:
# Gets the streaming data from topic random_names
df = (
spark_session.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "rappel_conso")
.option("startingOffsets", "earliest")
.load()
)
logging.info("Initial dataframe created successfully")
except Exception as e:
logging.warning(f"Initial dataframe couldn't be created due to exception: {e}")
raise
return df
def create_final_dataframe(df):
"""
Modifies the initial dataframe, and creates the final dataframe.
"""
schema = StructType(
(StructField(field_name, StringType(), True) for field_name in DB_FIELDS)
)
df_out = (
df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).alias("data"))
.select("data.*")
)
return df_out
def start_streaming(df_parsed, spark):
"""
Starts the streaming to table spark_streaming.rappel_conso in postgres
"""
# Read existing data from PostgreSQL
existing_data_df = spark.read.jdbc(
POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES
)
unique_column = "reference_fiche"
logging.info("Start streaming ...")
query = df_parsed.writeStream.foreachBatch(
lambda batch_df, _: (
batch_df.join(
existing_data_df, batch_df(unique_column) == existing_data_df(unique_column), "leftanti"
)
.write.jdbc(
POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES
)
)
).trigger(once=True) \
.start()
return query.awaitTermination()
def write_to_postgres():
spark = create_spark_session()
df = create_initial_dataframe(spark)
df_final = create_final_dataframe(df)
start_streaming(df_final, spark=spark)
if __name__ == "__main__":
write_to_postgres()
Analizziamo i punti salienti e le funzionalità principali del lavoro Spark:
- Per prima cosa creiamo la sessione Spark
def create_spark_session() -> SparkSession:
spark = (
SparkSession.builder.appName("PostgreSQL Connection with PySpark")
.config(
"spark.jars.packages",
"org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",)
.getOrCreate()
)
logging.info("Spark session created successfully")
return spark
2. Il create_initial_dataframe
la funzione acquisisce i dati in streaming dall’argomento Kafka utilizzando lo streaming strutturato di Spark.
def create_initial_dataframe(spark_session):
"""
Reads the streaming data and creates the initial dataframe accordingly.
"""
try:
# Gets the streaming data from topic random_names
df = (
spark_session.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "rappel_conso")
.option("startingOffsets", "earliest")
.load()
)
logging.info("Initial dataframe created successfully")
except Exception as e:
logging.warning(f"Initial dataframe couldn't be created due to exception: {e}")
raisereturn df
3. Una volta acquisiti i dati, create_final_dataframe
lo trasforma. Applica uno schema (definito dalle colonne CAMPI_DB) ai dati JSON in entrata, garantendo che i dati siano strutturati e pronti per l’ulteriore elaborazione.
def create_final_dataframe(df):
"""
Modifies the initial dataframe, and creates the final dataframe.
"""
schema = StructType(
(StructField(field_name, StringType(), True) for field_name in DB_FIELDS)
)
df_out = (
df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).alias("data"))
.select("data.*")
)
return df_out
4. Il start_streaming
La funzione legge i dati esistenti dal database, li confronta con il flusso in entrata e aggiunge nuovi record.
def start_streaming(df_parsed, spark):
"""
Starts the streaming to table spark_streaming.rappel_conso in postgres
"""
# Read existing data from PostgreSQL
existing_data_df = spark.read.jdbc(
POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES
)unique_column = "reference_fiche"
logging.info("Start streaming ...")
query = df_parsed.writeStream.foreachBatch(
lambda batch_df, _: (
batch_df.join(
existing_data_df, batch_df(unique_column) == existing_data_df(unique_column), "leftanti"
)
.write.jdbc(
POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES
)
)
).trigger(once=True) \
.start()
return query.awaitTermination()
Il codice completo per il processo Spark si trova nel file src/spark_pgsql/spark_streaming.py
. Utilizzeremo Airflow DockerOperator per eseguire questo lavoro, come spiegato nella prossima sezione.
Esaminiamo il processo di creazione dell’immagine Docker necessaria per eseguire il nostro lavoro Spark. Ecco il Dockerfile come riferimento:
FROM bitnami/spark:latestWORKDIR /opt/bitnami/spark
RUN pip install py4j
COPY ./src/spark_pgsql/spark_streaming.py ./spark_streaming.py
COPY ./src/constants.py ./src/constants.py
ENV POSTGRES_DOCKER_USER=host.docker.internal
ARG POSTGRES_PASSWORD
ENV POSTGRES_PASSWORD=$POSTGRES_PASSWORD
In questo Dockerfile iniziamo con il file bitnami/spark
immagine come nostra base. È un’immagine Spark pronta per l’uso. Quindi installiamo py4j
uno strumento necessario affinché Spark funzioni con Python.
Le variabili d’ambiente POSTGRES_DOCKER_USER
E POSTGRES_PASSWORD
sono configurati per la connessione a un database PostgreSQL. Poiché il nostro database si trova sul computer host, utilizziamo host.docker.internal
come utente. Ciò consente al nostro contenitore Docker di accedere ai servizi sull’host, in questo caso il database PostgreSQL. La password per PostgreSQL viene passata come argomento di compilazione, quindi non è codificata nell’immagine.
È importante notare che questo approccio, in particolare il passaggio della password del database in fase di compilazione, potrebbe non essere sicuro per gli ambienti di produzione. Potrebbe potenzialmente esporre informazioni sensibili. In questi casi, dovrebbero essere presi in considerazione metodi più sicuri come Docker BuildKit.
Ora creiamo l’immagine Docker per Spark:
docker build -f spark/Dockerfile -t rappel-conso/spark:latest --build-arg POSTGRES_PASSWORD=$POSTGRES_PASSWORD .
Questo comando costruirà l’immagine rappel-conso/spark:latest
. Questa immagine include tutto il necessario per eseguire il nostro lavoro Spark e verrà utilizzata dal DockerOperator di Airflow per eseguire il lavoro. Ricordarsi di sostituire $POSTGRES_PASSWORD
con la tua password PostgreSQL effettiva quando esegui questo comando.
Fonte: towardsdatascience.com