Introduction
Dans certains projets big data, j’ai pu observer que plusieurs Data Ingénieurs utilisent des langages tels que python
, java
, scala
ou même spark
pour réaliser la phase de préparation de données
(et évite pour l’occasion l’usage des outils ETL/ELT qui peuvent paraître lourds dans certains cas).
Le cas d’usage type peut être le suivant :
- Récupération des données depuis des fichiers csv ou depuis une base de données
- Opérer quelques transformations sur les données récupérées et sauvegarde des résultats dans HDFS
- Utiliser Cron pour orchestrer des Jobs Spark.
Cette approche, même si elle apporte une solution immédiate, a son lot d’inconvénients. En voici quelques-uns :
- Gestion de l’échec d’un Job: Pas de moyen possible pour relancer un Job en cas d’échec (quand le relancer, combien de fois faut-il le relancer avant d’abandonner)
- Gestion de la succession des Job : Comment ordonner que le Job 3 ne peut démarrer que lorsque le Job 2 soit terminé (sans utiliser l’horodatage qui est très limitatif).
- Evolutivité : Il n’y a pas de planificateur centralisé entre différentes machines cron.
- Déploiement : Apporter constamment de nouvelles modifications.
Apache Airflow
C’est une boite à outils open source permettant d’écrire, d’orchestrer (scheduler) et de monitorer des workflows (ou pipelines de données).
- Écrit en
Python
. - Workflows définis en
Python
. - UI présentant une vue des jobs actuels et passés; les logs des jobs.
- Extensible via des plugins.
- Développement du projet actif.
- Propose une belle UI et une interface REST
Les cas d’usage d’Airflow
Airflow peut traiter une pléthore de cas d’utilisations différents, mais il convient particulièrement à tous les ETL/ELT qu’on peut imaginer. Chaque étape d’un pipeline étant exprimée en code, il est facile d’adapter ces pipelines à nos besoins. Qu’il s’agisse d’un ping sur des points de terminaison d’API spécifiques ou de transformations personnalisées qui nettoient les données en fonction de nos spécifications personnalisées, on peut réellement adapter les éléments à nos cas d’utilisation.
Voici quelques cas d’usage intéressants avec Airflow:
- Regrouper les mises à jour quotidiennes des équipes de vente depuis Salesforce (ou un autre CRM) pour envoyer un rapport quotidien aux dirigeants de l’entreprise.
- Utiliser Airflow pour organiser et lancer des tâches de Machine Learning s’exécutant sur un cluster Spark externe.
- Charger et analyser, heure par heure, les données d’analyse de site Web / d’applications dans un entrepôt de données.
Vous voulez en savoir plus sur les cas d’usage d’Airflow ? C’est par ici: best-practices-lessons
Workflow
- suite de tâches (successives ou en parallèles) formant un graphe orienté acyclique
- déclenchées par des événements ou à via un horodatage
- fréquemment utilisées pour manipuler de gros pipelines de traitement de données
Exemple de workflow
- Job de récupération des données de la source
- Envoyer les données récupérées au processus de traitement
- Néttoyer, croiser et mettre en forme les données
- Générer les modèles de données (appliquer des algorithmes de ML)
- Envoyer le rapport par email
Airflow vs Cron
Un cron ne peut ordonnancer que des tâches successives en fonction du temps. En effet, supposons qu’une tâche B ait besoin de données créées par une tâche A
. B
a donc besoin de s’exécuter après A
.
En cron, le seul moyen d’y parvenir est de laisser un temps raisonnable entre le lancement des deux tâches pour que celles-ci s’exécutent dans le bon ordre. L’inconvénient de ce mécanisme d’horodatage fixe, est que si la tâche A
échoue pour une raison quelconque, la tâche B
s’exécutera avec potentiellement des données corrompues.
C’est là qu’on trouve tout l’avantage d’Airflow, qui est capable de dire que B
ne démarre que lorsque A
se termine.
On parle alors de workflow ou encore de pipeline.
Note: Dans les projets Data, Airflow est très utilisé dans les phases ELT.
Les principaux composants Airflow sont :
- Web server:
Ce composant permet d’avoir une interface web (UI) dans laquelle on peut:
* Monitorer nos data pipelines,
* Vérifier que les jobs se terminent correctement,
* Ajouter de nouvelle connexion vers des systèmes externes. - Scheduler: Permet de planifier les tâches des pipelines en fonction de leurs dépendances.
- Executor: Lié étroitement au scheduler, l’Executor détermine le worker (processus) qui exécute chaque tâche à orchestrer.
- Worker: Processus qui exécute les tasks que l’executor lui indique.
- Metastore: Une base de données qui stocke toutes les données liées à nos Jobs, DAGs et l’interface Admin.
Concepts de base
DAG (Pipeline)
- Airflow est principalement basé sur le concept de DAG.
- Un DAG est un graphe orienté acyclique où les nœuds sont les Tasks (tâches).
- Les tâches s’exécuteront donc dans un ordre précis, en parallèle ou à la suite sans risque de boucle infinie.
- Cette succussion de tâches forme un pipeline.
- Un DAG est configuré à l’aide du code Python.
- Backfill: Possibilité de rejouer un DAG à partir d’un point donné dans le passé.
DagInstance
- Un DAG est exécuté de manière régulière ou via un trigger.
- Un DAG exécuté ou en cours d’exécution est appelé une DagInstance.
TaskInstance
Chaque tâche d’un DAG en cours d’exécution s’appelle TaskInstance.
Task
- C’est l’instance d’un operator.
- La création d’une tâche passe par un opérator.
- Un operator est tout simplement une classe Python héritant de BaseOperator.
- Lorsque la tâche est appelée, la fonction execute() de l’operator est exécutée.
Operators
C’est un concept permettant de typer/catégoriser les tâches. Il existe plusieurs operators prédéfinis :
- BashOperator : exécute une commande bash,
- PythonOperator : exécute une fonction Python,
- EmailOperator : envoie un mail,
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. : pour exécuter des commandes SQL
- DockerOperator : exécute une commande dans un container Docker,
- HttpOperator : effectue une requête sur un endpoint HTTP, etc (hive, s3, redshift, hive …)
- Sensor : attend pendant un certain temps, l’arrivé d’un fichier, ligne de base de données, clé S3, etc.
Vous pouvez consulter la liste des operators disponibles via le lien ci-dessous:
airflow-operators
Executors
Airflow dispose de scheduler pour lancer ces tâches.
Lorsqu’une tâche est appelée, elle va s’exécuter à un endroit précis. Le mécanisme d’appel des tâches est géré par Airflow via les executors.
SequentialExecutor (mode par défaut):
Les tâches sont lancées sur le serveur local en série (là où Airflow lui-même est lancé) sans aucun parallélisme.
Le mécanisme de parallélisme est utile en développement, mais les limites de ce dernier sont atteintes très rapidement lors du passage à l’échelle.
LocalExecutor
Plus performant que le précédent, le Local Executor permet le lancement de plusieurs tâches en parallèle jusqu’à ce que les ressources de la machine locale soient utilisées.
On parle ici d’un scaling verticale (car un seul worker : la machine locale)
CeleryExecutor
Celery : C’est une file d’attente (Queue) de tâches asynchrones basées sur le transfert de message distribué. Il est axé sur un fonctionnement en temps réel, mais prend également en charge la planification.
Le scaling (horizontale) des tâches pourra réellement se faire puisqu’Airflow ira déléguer à Celery la distribution des tâches sur un pool de workers.
A noter que Celery nécessite l’utilisation d’un broker pour l’échange des messages, ainsi qu’une synchronisation du code sur l’ensemble des workers (réalisable avec l’utilisation de Redis et Kubernetes par exemple).
Architecture Airflow en mode distribué
Si un DAG spécifique doit être déclenché, le scheduler
crée une nouvelle instance DagRun dans le dépot Aiflow metastore
et commence à déclencher les tâches individuelles dans le DAG. Le scheduler
envoie des messages dans le Queuing service
. Un message contient des informations sur la tâche à exécuter (DAG_id, task_id ..) et la fonction à exécuter.
Dans certains cas, l’utilisateur interagira avec le Webserver
. Il peut manuellement déclencher l’exécution d’un DAG. Un DAGRun est créé et le scheduler
commencera à déclencher des tâches individuelles de la même manière que décrit précédemment.
Les worker Airflow n’ont pas besoin de s’enregistrer et n’ont pas besoin de se connaître. Chaque démon s’occupe de l’exécution d’une tâche. Le scheduler
examine périodiquement si des DAG enregistrés doivent être exécutés.
Dans une architecture à plusieurs noeuds, les démons sont répartis sur différentes machines. Dans l’architecture ci-dessus, le serveur Web et le planificateur sont colocalisés. Pour se faire, Airflow doit être configuré en mode Celery Executor
.
Installation Airflow
L’installation se tient en deux lignes
pip install apache-airflow # Installation du package apache-airflow
airflow initdb # Initialisation de la base de données
cf. Install airflow
Ecriture d’un DAG airflow
from datetime import datetime
import dataprep
import model_reco
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG('spark_by_bashoperator', description='ALS DAG',
schedule_interval='@hourly',
start_date=datetime(2019, 10, 25), catchup=False)
spark_home = '/usr/local/spark/bin/'
srcDir = '/tools/airflow/dags/'
with dag:
task_spark_datatransform = BashOperator(task_id='task_spark_datatransform',
bash_command = spark_home +'spark-submit --master local[*] ' + srcDir + 'dataprep.py')
task_spark_model_reco = BashOperator(task_id='task_spark_model_reco',
bash_command = spark_home +'spark-submit --master local[*] ' + srcDir + 'model_reco.py')
task_spark_datatransform >> task_spark_model_reco
- ligne1 à ligne5: Import des modules python nécessaires
- ligne7 à ligne9 : Définir les arguments du Dag.
- Nom du Dag : spark_by_bashoperator
- Description
- schedule_interval: Cycle d’exécution du DAG. Pour définir la valeur on peut utiliser un pattern qu’on utilise habituellement dans les crons. La valeur ‘@hourly’ que le DAG sera exécuté chaque heure.
- start_date: La date à laquelle Airflow exécutera le DAG pour la première fois.
- catchup : En définissant
catchup = False
, peu importe sistart_date
appartient au passé ou non, le DAG sera exécuté à partir de l’heure actuelle. Si on définit unend_date
, cela implique que Airflow doit arrêter l’exécution du DAG à cette date.
- ligne14 à ligne18: Instancier un deux ayant comportant deux tâches
- task_spark_datatransform : Exécute une méthode de transformation et préparation de données
- task_spark_model_reco: Exécute un script spark permettant de lire les données issues de la tâche précédente pour générer un modèle de recommandation
- ligne20 : détermine l’ordre d’exécution des deux tâches
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
def data_transform():
spark = SparkSession. \
builder. \
appName('Reco-DataPrep'). \
getOrCreate()
# import data
data_dir = '/home/test2Spark/'
path_datafile = data_dir + "very_small.txt.gz"
triplet = spark.sparkContext.textFile(path_datafile).map(
x: x.split('\t')).toDF([user_id, song_id, play_count])
# create new indexed columns
triplet = triplet.withColumn("play_count", triplet["play_count"].cast(IntegerType()))
five_uplet = StringIndexer(inputCol="user_id", outputCol="user_id_int").fit(triplet).transform(triplet)
five_uplet = StringIndexer(inputCol="song_id", outputCol="song_id_int").fit(five_uplet).transform(five_uplet)
triplet_indexed = five_uplet.select("user_id_int", "song_id_int", "play_count")
# transform to parquet files
five_uplet.write.parquet(data_dir + "five_uplet.parquet")
triplet_indexed.write.parquet(data_dir + "triplet.parquet")
if __name__ == "__main__":
data_transform()
A l’ouverture de l’UI airflow, il suffit de cliquer sur le nom associé au DAG, puis sur l’onglet Graph View
pour vérifier que le DAG est bien lancé :
La visualisation des logs d’une tâche se fait via le bouton view log
(bouton le plus à droite) :
Conclusion
Dans cet article, j’ai commencé par expliquer les concepts fondamentaux ainsi qu’une architecture de déploiement type de Airflow. Dans un second temps, j’ai pu mettre en avant avec un cas concret la création d’un Dag contenant deux tâches Spark. Si vous souhaitez approfondir le sujet, j’ai indiqué dans la partie références, quelques liens permettant aux lecteurs d’aller plus loin dans l’exploration de cette technologie.
1 commentaire
Stéphane RAULT · 8 juin 2021 à 14 h 12 min
Merci pour cet article. Pourriez-vous ajouter le fichier manquant: model_reco.py ?