C'è ancora una cosa prima di rivolgere la nostra attenzione alla parte divertente. L'interfaccia utente Web di Flink è un'interfaccia intuitiva che consente agli sviluppatori e agli amministratori di monitorare e gestire le proprie applicazioni Apache Flink. Fornisce una panoramica in tempo reale dei lavori in esecuzione o completati, visualizza parametri quali velocità effettiva e latenza e offre informazioni dettagliate sul piano di esecuzione del lavoro. Essenzialmente, è una comoda dashboard in cui puoi visualizzare le prestazioni e lo stato delle tue applicazioni Flink, rendendo il processo di debug, ottimizzazione e gestione dei processi di streaming o di elaborazione batch molto più semplice e intuitivo.
Quando esegui un'applicazione Flink localmente come in questo esempio, di solito non hai l'interfaccia utente Web Flink abilitata. Tuttavia, esiste un modo per ottenere anche l'interfaccia utente Web di Flink in un ambiente di esecuzione locale. Lo trovo utile, soprattutto per avere un'idea del piano di esecuzione prima di eseguire applicazioni di streaming in produzione.
Iniziamo aggiungendo una dipendenza al file pom.xml
:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
E cambia leggermente il codice nella nostra classe principale App.java
:
package de.vojay.flitch;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static void main(String() args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
env.fromSequence(1, Long.MAX_VALUE).print();
env.execute("Flitch");
env.close();
}
}
L'applicazione di streaming elaborerà ora una sequenza di numeri, in modo che non termini immediatamente. Anche con createLocalEnvironmentWithWebUI
avremo l'interfaccia utente Web di Flink disponibile localmente sul porto 8081
mentre l'applicazione è in esecuzione.
Ricominciare e aprire http://localhost:8081/ nel tuo browser. Oltre a vari parametri, puoi anche vedere il piano di esecuzione della tua applicazione Flink.
Ora disponiamo di una configurazione locale adeguata e possiamo iniziare a connettere la nostra applicazione a Twitch ed eseguire l'analisi del sentiment sui messaggi di chat.
Contrazionela principale piattaforma di live streaming per giocatori, offre un'API completa e una funzionalità di chat profondamente integrata con il protocollo Internet Relay Chat (IRC).
Fondamentalmente, l'API di Twitch consente alle applicazioni di interagire con i dati di Twitch. Ciò include il recupero di informazioni su streaming live, VOD (Video on Demand), utenti e dettagli del gioco. L'API è RESTful, ovvero segue lo stile architettonico del Web, rendendola semplice da utilizzare con le comuni richieste HTTP. Gli sviluppatori possono utilizzare questa API per creare esperienze personalizzate, come la visualizzazione delle statistiche del live streaming, la ricerca di canali o persino l'automazione delle configurazioni dello streaming.
La chat di Twitch è un aspetto vitale dell'esperienza Twitch, poiché consente agli spettatori di interagire con streamer e altri spettatori in tempo reale. Sotto la moderna interfaccia di Twitch Chat si trova il protocollo Internet Relay Chat (IRC), un punto fermo della comunicazione online dalla fine degli anni '80. Questa dipendenza da IRC consente un'ampia gamma di possibilità quando si tratta di leggere e interagire con la chat tramite applicazioni personalizzate.
Per il nostro scopo, vogliamo semplicemente leggere la chat, senza scrivere noi stessi i messaggi. Fortunatamente, Twitch consente connessioni anonime alla chat per casi d'uso di applicazioni di sola lettura.
Per ridurre lo sforzo di implementazione, utilizzeremo una libreria esistente per interagire con Twitch: Twitch4J. Twitch4J è una moderna libreria Java progettata per semplificare l'integrazione con le funzionalità di Twitch, tra cui API, Chat (tramite IRC), PubSub (per notifiche in tempo reale) e Webhook. Essenzialmente, è un potente toolkit per gli sviluppatori Java che desiderano interagire con i servizi Twitch senza dover gestire direttamente dettagli di basso livello come le richieste HTTP o la gestione del protocollo IRC.
Il primo passo è aggiungere Twitch4J come dipendenza al file pom.xml
:
<dependency>
<groupId>com.github.twitch4j</groupId>
<artifactId>twitch4j</artifactId>
<version>1.19.0</version>
</dependency>
Vorremmo avere un Plain Old Java Object (POJO) leggero e serializzabile per rappresentare i messaggi di chat di Twitch all'interno della nostra applicazione. A noi interessa il canale in cui è stato scritto il messaggio, l'utente e il contenuto stesso.
Crea una nuova classe TwitchMessage
con la seguente implementazione:
package de.vojay.flitch;public class TwitchMessage {
private final String channel;
private final String user;
private final String message;
public TwitchMessage(String channel, String user, String message) {
this.channel = channel;
this.user = user;
this.message = message;
}
public String getChannel() {
return channel;
}
public String getUser() {
return user;
}
public String getMessage() {
return message;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer("TwitchMessage{");
sb.append("channel='").append(channel).append('\'');
sb.append(", user='").append(user).append('\'');
sb.append(", message='").append(message).append('\'');
sb.append('}');
return sb.toString();
}
}
Come nota a margine: non è necessario scrivere funzioni di base come toString()
da solo, puoi utilizzare IntelliJ per generarlo per te. Basta fare clic su Codice → Creare… → toString()
per ottenere il risultato sopra.
Ora utilizzeremo Twitch4J per implementare una funzione sorgente Twitch personalizzata per Flink. La funzione sorgente genererà un flusso illimitato di dati, in questo caso i messaggi della chat di Twitch. Ciò significa anche che l'applicazione non verrà terminata finché non la interrompiamo esplicitamente.
Il client Twitch può essere costruito in questo modo:
TwitchClientBuilder clientBuilder = TwitchClientBuilder.builder();
client = clientBuilder
.withEnableChat(true)
.build();client.getChat().joinChannel("vojay");
Con questo esempio otteniamo a client
che si unisce al canale Twitch chiamato Vojay. Sì, una volta anch'io ero uno streamer attivo. Fatto divertente: ho insegnato alle persone lo sviluppo di giochi e lo sviluppo di software in generale nei miei stream. Mi è piaciuto anche giocare ai giochi retrò dal vivo in streaming 🎮. Ma questo è un altro discorso, concentriamoci sul progetto 😉.
Dovresti anche notare che non esiste alcuna autenticazione nell'esempio sopra. Come detto prima, poiché vogliamo solo leggere la chat, non è necessaria alcuna autenticazione. Infatti, ci uniamo semplicemente a una chat IRC in modo anonimo e leggiamo i messaggi.
Poiché vogliamo stabilire la connessione alla chat di Twitch solo una volta per istanza sorgente, dobbiamo estendere l'abstract RichSourceFunction
classe, per poter sovrascrivere il file open
funzione, che consente di aggiungere codice per l'inizializzazione.
public class TwitchSource extends RichSourceFunction<TwitchMessage> {
@Override
public void open(Configuration configuration) {
// ...
}// ...
}
Usiamo anche il nostro TwitchMessage
POJO per il parametro generico per indicare a Flink che questa fonte genera elementi di tipo TwitchMessage
.
Inoltre, vogliamo essere in grado di passare una serie di canali Twitch su cui vogliamo ascoltare nel costruttore della funzione sorgente.
Per controllare lo stato della nostra funzione sorgente, usiamo a boolean
variabile chiamata running
che abbiamo impostato true
nel open
funzione.
Sulla base di ciò, il costruttore e open
la funzione è simile alla seguente:
public class TwitchSource extends RichSourceFunction<TwitchMessage> {private final String() twitchChannels;
private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;
public TwitchSource(String() twitchChannels) {
this.twitchChannels = twitchChannels;
}
@Override
public void open(Configuration configuration) {
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();
for(String channel : twitchChannels) {
client.getChat().joinChannel(channel);
}
eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);
running = true;
}
// ...
Con ciò, abbiamo tutto ciò di cui abbiamo bisogno per consumare messaggi ed emetterli per un'ulteriore elaborazione come flusso di dati.
IL run
La funzione di una funzione sorgente è dove avviene la magia. Qui generiamo i dati e con un dato SourceContext
possiamo emettere dati.
IL SimpleEventHandler
fornito da Twitch4J può essere utilizzato per reagire a messaggi specifici.
Ogni volta che riceviamo un evento del tipo IRCMessageEvent
che è un messaggio nella chat di Twitch, generiamo un'istanza del nostro POJO e la inviamo allo stream tramite il contesto.
Per garantire che la nostra funzione sorgente non termini, aggiungeremo un ciclo con un ritardo artificiale, che verrà eseguito fino al nostro boolean
variabile running
è impostato per false
. Ciò verrà fatto nel cancel
funzione, che viene chiamata dall'ambiente Flink allo spegnimento.
@Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException {
eventHandler.onEvent(IRCMessageEvent.class, event -> {
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);ctx.collect(new TwitchMessage(channel, user, message));
});
while(running) {
Thread.sleep(100);
}
}
@Override
public void cancel() {
client.close();
running = false;
}
Mettendo tutto insieme, questa è l'implementazione completa della nostra funzione sorgente Twitch personalizzata per Flink TwitchSource.java
:
package de.vojay.flitch;import com.github.philippheuer.events4j.simple.SimpleEventHandler;
import com.github.twitch4j.TwitchClient;
import com.github.twitch4j.TwitchClientBuilder;
import com.github.twitch4j.chat.events.channel.IRCMessageEvent;
import com.github.twitch4j.common.events.domain.EventUser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class TwitchSource extends RichSourceFunction<TwitchMessage> {
private final String() twitchChannels;
private TwitchClient client;
private SimpleEventHandler eventHandler;
private boolean running = true;
public TwitchSource(String() twitchChannels) {
this.twitchChannels = twitchChannels;
}
@Override
public void open(Configuration configuration) {
client = TwitchClientBuilder
.builder()
.withEnableChat(true)
.build();
for(String channel : twitchChannels) {
client.getChat().joinChannel(channel);
}
eventHandler = client
.getEventManager()
.getEventHandler(SimpleEventHandler.class);
running = true;
}
@Override
public void run(SourceContext<TwitchMessage> ctx) throws InterruptedException {
eventHandler.onEvent(IRCMessageEvent.class, event -> {
String channel = event.getChannel().getName();
EventUser eventUser = event.getUser();
String user = eventUser == null ? "" : eventUser.getName();
String message = event.getMessage().orElseGet(String::new);
ctx.collect(new TwitchMessage(channel, user, message));
});
while(running) {
Thread.sleep(100);
}
}
@Override
public void cancel() {
client.close();
running = false;
}
}
Con questa funzione di origine personalizzata, possiamo già estendere la nostra pipeline di streaming App.java
per stampare semplicemente ogni messaggio di chat scritto nella chat:
package de.vojay.flitch;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static void main(String() args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
TwitchSource twitchSource = new TwitchSource(new String(){"vojay"});
env.addSource(twitchSource)
.print();
env.execute("Flitch");
env.close();
}
}
Con addSource
possiamo aggiungere la nostra funzione sorgente. Gli elementi vengono quindi elaborati dal passaggio successivo dello stream, ovvero print()
. Con questo sink, pubblicheremo nuovamente ogni elemento su STDOUT.
Quando si esegue l'applicazione adesso e si scrive nella chat all'indirizzo https://twitch.tv/vojayi messaggi verranno elaborati e stampati dalla nostra applicazione di streaming 🎉.
Ora che possiamo leggere la chat di Twitch come un flusso di dati, è il momento di elaborare ciascun messaggio. L'idea di base è: per ogni messaggio Twitch, rileviamo le singole frasi del messaggio e calcoliamo il sentiment per ciascuna frase. L'output sarà una struttura come questa:
Tuple2<TwitchMessage, Tuple2<List<Integer>, List<String>>>
Scomponiamolo: il risultato contiene il POJO originale del messaggio della chat di Twitch insieme ad un'altra tupla con 2 elementi:
- Una lista di punteggi del sentimento (
List<Integer>
) contenente il punteggio per ogni frase del messaggio, da 0 (molto negativo) a 4 (molto positivo) e - una lista di classi di sentimento (
List<String>
) contenente la classe leggibile per ogni frase del messaggio, ad esempio: Neutro o Negativo.
Fonte: towardsdatascience.com