Introduction
L’omniprésence des objets connectés et des médias sociaux a généré une grande volumétrie de données. En général, le volume des données au sein des entreprises double tous les deux ans. Cela a donc engendré trois grandes problématiques Big data que l’on appelle les 3V : Volume (la quantité de données générées), Variété (les données sont très différentes les unes des autres) et Vitesse (la fréquence à laquelle les données sont générées).
Nombreux sont les outils dits Big Data qui permettent de mettre en valeur et traiter efficacement ces données. Le traitement des données peut être réalisé de trois manières différentes :
Batch : les données disponibles vont être traitées à un instant T.
Micro-Batch : les données disponibles vont être traitées toutes les n secondes.
Real time : les données vont être traitées au fur et à mesure de leur disponibilité.
En particulier, le traitement du Big data en temps réel (real-time) est un défi majeur pour tout type d’entreprise. Il n’est pas difficile de penser à des « uses cases » qui traitent la problématique de l’analyse des données en temps réel. On peut citer entre autres la supervision des trafics, les systèmes de recommandation, le trading sur les marchés financiers, etc.
L’entreprise, pour augmenter l’efficacité et la rapidité du traitement des données, utilise les architectures modernes pour les données en temps réel qui font appel aux principals composants de l’écosystème Hadoop à savoir Spark Streaming.
Concepts Spark streaming
Spark streaming est une extension de l’API Spark qui permet le traitement rapide, scalable et tolérant à la panne des données provenant de sources diverses telles que Twitter, Kafka, HDFS, etc. Ensuite, d’autres extensions de Spark comme Spark GraphX ou Spark ML peuvent être appliquées à ces données et qui peuvent être finalement, après le traitement, envoyées à des systèmes de fichiers, à des Dashboards, ou encore à des bases de données. Mais, comment Spark streaming peut-il être appliqué à des problématiques « real-time data » ?
Concrètement, Spark streaming utilise des micro-batchs. Cela signifie que, Spark streaming reçoit les données et les divise en plusieurs mini batch RDDs qui sont à leur tour traités par « Spark Engine » pour générer le flux des résultats en batch.
Spark streaming utilise « discretized streams (DStreams) », qui est exprimé par une séquence de RDDs et représente un flux continu de données.
Dans ce blog, vous trouverez un exemple de Spark Streaming pour l’analyse en temps réel des tweets provenant de Twitter.
Cas pratique : analyser des tweets sur la paire de devises EUR/USD
Twitter a montré qu’il était un moyen de faire circuler l’information à une vitesse formidable.
Grâce à cette puissance, certains professionnels tirent profit efficacement de ce service pour échanger et exprimer leurs opinions. C’est ainsi le cas de certains journaux d’informations, comme CNN ou BBC, et du président Donald Trump qui est également connu pour faire un usage important de Twitter. Et enfin, pour les traders, consulter Twitter dans le cadre de leur activité fait partie de leur routine, comme regarder le terminal Bloomberg.
Contrairement à Facebook, Twitter permet la récupération des données en streaming gratuitement,tirer profit de cet avantage à l’aide d’un outil de streaming comme « Spark Streaming » s’avère incontournable pour le traitement de ces données en temps réel.
Dans cet exemple, on va lire et analyser des messages de Twitter en temps réel avec « Spark streaming ».
Afin de récupérer les tweets, il faut tout d’abord, créer une nouvelle application (FXP-2 par exemple) dans le site Twitter API . Cela va vous fournir les « keys » dont vous aurez besoin pour utiliser les APIs Twitter.
Ensuite, on crée notre application qui récupère les tweets de « Twitter API » en utilisant « scala » et Spark Streaming. La plupart des programmeurs Spark utilisent des IDEs comme « IntelliJ » ou « Eclipse ». Nous avons utilisé « Scala IDE » pour « Eclipse » afin de profiter du support fourni par « Eclipse » pour les applications en « Scala ».
On commence par la configuration standard des applications « Spark » et Twitter en utilisant les « keys » précédemment crées.
package com.Ingeniance.spark
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import Utilities._
object PrintTweets {
def main(args: Array[String]) {
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", "accessToken")
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val ssc = new StreamingContext("local[*]", "PrintTweets", Seconds(1))
Dans cette configuration, nous avons paramétré les informations d’authentification de Twitter en utilisant la bibliothèque Twitter4J. Une fois cette étape terminée, la première chose à faire est de créer une instance de « SteamingContext » afin d’utiliser toutes les fonctionnalités de « Spark streaming ». L’instanciation est faite en utilisant les détails (master et appName) pour la création d’un nouveau « SparkContext » et un objet « Duration » qui indique l’intervalle du temps à laquelle « Spark Streaming » doit diviser le flux des données et créer les « mini batches RDDs ». Nous fixons par exemple la durée à une seconde.
Afin de créer un « Dstream », on utilise la méthode « createStream » de « TwitterUtils ». « TwitterUtils » utilise « Twitter4J » pour la récupération du flux des données.
val filters = Array("eurusd")
val tweets = TwitterUtils.createStream(ssc, None, filters)
A l’aide du paramètre filtre, « Spark Streaming » peut récupérer les messages qui ne contiennent que des mots spécifiques (« eurusd » par exemple).
Ensuite, à l’aide de la fonction prédéfinie « map », on peut extraire le texte des tweets récupérés par « Spark streaming » et l’afficher dans la console d’Eclipse.
val statuses = tweets.map(status => status.getText())
statuses.print()
On peut aussi rajouter un autre filtre sur le « Dstream » afin de ne récupérer que les messages qui contiennent les mots souhaités et qui sont en anglais, en utilisant la fonction « filter ». On enregistre les tweets dans des fichiers que l’on appelle « tweets » sous le format « json ».
val englishTweets = tweets.filter(_.getLang() == "en")
englishTweets.saveAsTextFiles("tweets", "json")
Il faut noter que si on exécute ces lignes de code, « Spark streaming » enregistre uniquement le traitement à suivre lors du lancement du traitement. Pour bien lancer le traitement et obtenir les résultats de « Spark Streaming », il faut rajouter ces deux lignes à votre code :
ssc.start()
ssc.awaitTermination()
Enfin, l’exécution de ce code permet d’afficher un flux des tweets dans la console d’Eclipse :
Les tweets dans la console n’étaient pas filtrés par langue. On a donc obtenu un tweet en arabe qui s’articule autour de la devise EURUSD.
Par contre, la plupart des tweets sont vides étant donné que les tweets autour de EURUSD ne sont pas publiés chaque seconde. En effet, on se retrouve avec un grand nombre de fichiers vides qui nous empêchent de repérer les fichiers informatifs contenant les tweets qui nous intéressent.
Pour résoudre ce problème, on rajoute les lignes de code ci-dessous :
englishTweets.foreachRDD{ (x, time) =>
if(!x.isEmpty){
x.saveAsTextFile("results/tweets/file" + time )
}
}
Si un batch ne contient pas de données, Spark génère des « RDDs » que l’on appelle « emptyRDD ». Il suffit, donc, de vérifier si les RDDs, que chaque « Dstream » contient, sont des « emptyRDD » ou pas.
Enfin, on obtient l’exemple d’un tweet au format JSON. Ce fichier JSON contient toutes les informations concernant le tweet : l’auteur, la date, le texte, les « hashtags », etc. On ne garde que les intéressants.
Ce tweet a été publié le 24/06/2018 à 20h30 par « DailyForex », « DailyForex » est un magasin unique pour tous les traders, il fournit des analyses de brokers Forex, des analyses techniques quotidiennes et des signaux gratuitement pour les traders Forex internationaux. Son compte twitter a été créé en 2009 et compte actuellement 39195 abonnés.
Voici l’exemple d’un de leurs tweets :
« DailyForex », dans ce tweet, exprime son opinion sur la tendance de la paire de devises « EURUSD » qui va rester autour du niveau 1.15.
Conclusion
Pour conclure, Spark Streaming permet de répondre à plusieurs problématiques Big data en temps réel. Cela est dû au fait que Spark Streaming supporte des sources variées de données à savoir Kafka, HDFS, Twitter etc. Il permet aux développeurs d’utiliser un seul Framework pour des besoins diversifiés. Dans cet article, nous avons utilisé l’API de « Spark streaming » pour la récupération des tweets de Twitter.
Même si Apache Storm est très connu pour le traitement des données en temps réel, Spark streaming reste une solution abordable et moins complexe pour le traitement des flux des données, et il s’intègre bien avec l’écosystème Hadoop.
2 commentaires
Mourad · 1 avril 2020 à 16 h 33 min
Lorsque j’implémente ce code là il s’exécute mais il ne donne pas aucun résultat
M2 · 31 mars 2020 à 17 h 51 min
Merci pour le code, mais ça n’a pas donné des résultats pour moi, même si je change le mot de filtrage…