← Retour aux projets
Projet E5

Pipeline ETL avec Apache Airflow & Docker

Pipeline ETL automatisé synchronisant des bases hétérogènes via Airflow et Docker.

PythonApache AirflowDockerSQL ServerMariaDB

Conception et mise en place d'une pipeline ETL automatisé pour la synchronisation de données entre plusieurs bases de données hétérogènes (SQL Server, MariaDB) au sein d'un environnement conteneurisé et orchestré par Apache Airflow. Ce projet répond à un besoin critique de consolidation des données métier pour l'analyse et l'intégrité même des données.


Contexte et problématique métier

L'entreprise disposait de plusieurs sources de données isolées créant des silos d'information :

  • Base GX — ERP sur SQL Server contenant les données commerciales (affaires, devis, commandes, factures, articles, clients...)
  • Base GSM — Application de gestion de stock sur MariaDB (produits, emplacements, mouvements, bons de réception...)

Problèmes identifiés :

  • Impossibilité de croiser les données commerciales et logistiques
  • Rapports manuels chronophages et source d'erreurs
  • Données non synchronisées entre les systèmes

Objectifs du projet :

  • Centraliser les données de 2 systèmes sources vers une base transformée et unifiée (SYM_BD_TEST)
  • Automatiser les flux ETL avec une exécution planifiée toutes les 3 heures via Airflow
  • Garantir l'intégrité référentielle via un système de synchronisation intelligent (hashing MD5)
  • Assurer une traçabilité complète des opérations et des alertes en temps réel
  • Permettre une scalabilité horizontale pour l'ajout de nouvelles sources

Architecture technique globale

L'infrastructure repose sur une architecture Docker multi-conteneurs orchestrée via Docker Compose, avec séparation des responsabilitées :

Services Docker Compose

Le fichier docker-compose.yml définit une architecture simplifiée de 4 services basée sur l'exécution des scripts Python via PythonVirtualenvOperator, sans la complexité d'un Docker-in-Docker :

1. Service db — Base de données PostgreSQL

Stocke les métadonnées Airflow (DAG runs, task instances, logs, connexions, variables). Port 5432, healthcheck via pg_isready.

db:
  image: postgres:15
  container_name: etl-postgres
  restart: unless-stopped
  networks: [etl-network]
  environment:
    POSTGRES_USER: ${DB_USER:-etl_user}
    POSTGRES_PASSWORD: ${DB_PASSWORD:-etl_password}
    POSTGRES_DB: ${DB_NAME:-symetrie_test}
    TZ: ${TIMEZONE:-Europe/Paris}
  volumes:
    - db_data:/var/lib/postgresql/data
  ports:
    - "${DB_PORT:-5432}:5432"
  healthcheck:
    test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER"]
    interval: 10s
    timeout: 5s
    retries: 5
  • Variables d'environnement : DB_USER, DB_PASSWORD, DB_NAME définies via .env avec valeurs par défaut
  • Volume persistant : db_data:/var/lib/postgresql/data pour garantir la durabilité des données entre redémarrages
  • Restart policy : unless-stopped — redémarrage automatique sauf arrêt manuel
2. Service airflow-init — Initialisation Airflow

Image custom etl-airflow:custom. Initialise la base de métadonnées et crée l'utilisateur admin (exécuté une seule fois au premier démarrage, dépend de db healthy).

airflow-init:
  image: etl-airflow:custom
  container_name: etl-airflow-init
  networks: [etl-network]
  depends_on:
    db: { condition: service_healthy }
  env_file: .env
  environment:
    _AIRFLOW_DB_UPGRADE: "true"
    _AIRFLOW_WWW_USER_CREATE: "true"
    _AIRFLOW_WWW_USER_USERNAME: ${AIRFLOW_USERNAME:-admin}
    _AIRFLOW_WWW_USER_PASSWORD: ${AIRFLOW_PASSWORD:-admin}
    _AIRFLOW_WWW_USER_FIRSTNAME: ${AIRFLOW_FIRSTNAME:-ETL}
    _AIRFLOW_WWW_USER_LASTNAME: ${AIRFLOW_LASTNAME:-Admin}
    _AIRFLOW_WWW_USER_ROLE: ${AIRFLOW_ROLE:-Admin}
    _AIRFLOW_WWW_USER_EMAIL: ${AIRFLOW_EMAIL:-etl@admin.local}
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
  volumes:
    - airflow_logs:/opt/airflow/logs
  entrypoint: /bin/bash
  command:
    - -c
    - |
      mkdir -p /opt/airflow/dags /opt/airflow/logs /opt/airflow/plugins
      airflow db init
      airflow users create \
        --username "$$_AIRFLOW_WWW_USER_USERNAME" \
        --password "$$_AIRFLOW_WWW_USER_PASSWORD" \
        --firstname "$$_AIRFLOW_WWW_USER_FIRSTNAME" \
        --lastname "$$_AIRFLOW_WWW_USER_LASTNAME" \
        --role "$$_AIRFLOW_WWW_USER_ROLE" \
        --email "$$_AIRFLOW_WWW_USER_EMAIL"
3. Service airflow-webserver — Interface Web

Interface de monitoring et de gestion des DAGs sur le port 8080 (image etl-airflow:custom, healthcheck sur /health).

airflow-webserver:
  build:
    context: .
    dockerfile: Dockerfile
  image: etl-airflow:custom
  container_name: etl-airflow-webserver
  restart: unless-stopped
  networks: [etl-network]
  depends_on:
    db: { condition: service_healthy }
    airflow-init: { condition: service_completed_successfully }
  env_file: .env
  environment:
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "false"
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY}
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
    AIRFLOW__CORE__DEFAULT_TIMEZONE: ${TIMEZONE:-Europe/Paris}
    AIRFLOW_CONN_DB_DEFAULT: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./config:/opt/airflow/config
    - airflow_logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./data:/opt/airflow/data
    # Montage du code source et requirements pour PythonVirtualenvOperator
    - ./src:/opt/airflow/src
    - ./requirements:/opt/airflow/requirements
  ports: ["${AIRFLOW_PORT:-8080}:8080"]
  command: webserver
  healthcheck:
    test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
    interval: 10s
    timeout: 10s
    retries: 5

L'UI Airflow donne accès à la vue DAGs (statut, planification), au Graph View des dépendances, à la Gantt Chart, aux logs en temps réel, au déclenchement manuel et à la gestion des connexions aux bases.

4. Service airflow-scheduler — Ordonnanceur

Orchestration et planification des tâches ETL (image etl-airflow:custom partagée avec le webserver).

airflow-scheduler:
  image: etl-airflow:custom
  container_name: etl-airflow-scheduler
  restart: unless-stopped
  networks: [etl-network]
  depends_on:
    db: { condition: service_healthy }
    airflow-init: { condition: service_completed_successfully }
  env_file: .env
  environment:
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "false"
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY}
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
    AIRFLOW__CORE__DEFAULT_TIMEZONE: ${TIMEZONE:-Europe/Paris}
    AIRFLOW_CONN_DB_DEFAULT: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./config:/opt/airflow/config
    - airflow_logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./data:/opt/airflow/data
    - ./src:/opt/airflow/src
    - ./requirements:/opt/airflow/requirements
  command: scheduler
  healthcheck:
    test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
    interval: 10s
    timeout: 10s
    retries: 5

Rôles du Scheduler : parsing des DAGs toutes les 30s, planification selon le schedule_interval, exécution via PythonVirtualenvOperator (environnement isolé) et retry automatique des tâches échouées.

Image Docker personnalisée etl-airflow:custom

L'image etl-airflow:custom est construite à partir d'un Dockerfile personnalisé qui inclut toutes les dépendances nécessaires pour l'exécution des ETL, notamment les drivers ODBC pour SQL Server :

FROM apache/airflow:2.8.1-python3.10

USER root

# 1. Installation des outils de base
RUN apt-get update && apt-get install -y \
    gcc g++ git curl gnupg2 apt-utils apt-transport-https \
    --no-install-recommends && \
    rm -rf /var/lib/apt/lists/*

# 2. NETTOYAGE DES CONFLITS (Crucial pour Debian 12 / Airflow 2.8)
# On supprime les librairies ODBC par défaut de Debian 12 qui bloquent l'installation des drivers Microsoft
RUN apt-get update && apt-get remove -y \
    libodbc2 \
    libodbcinst2 \
    unixodbc-common \
    && apt-get clean

# 3. Installation des drivers ODBC (SQL Server)
# On télécharge la clé dans un fichier temporaire pour éviter l'erreur de pipe
RUN curl -fsSL https://packages.microsoft.com/keys/microsoft.asc -o microsoft.asc \
    && gpg --batch --yes --dearmor -o /usr/share/keyrings/microsoft-prod.gpg microsoft.asc \
    && rm microsoft.asc \
    && echo "deb [arch=amd64,arm64,armhf signed-by=/usr/share/keyrings/microsoft-prod.gpg] https://packages.microsoft.com/debian/11/prod bullseye main" > /etc/apt/sources.list.d/mssql-release.list \
    && apt-get update \
    && ACCEPT_EULA=Y apt-get install -y msodbcsql17 unixodbc unixodbc-dev \
    && rm -rf /var/lib/apt/lists/*

# 4. Préparation des dossiers
USER airflow
RUN mkdir -p /opt/airflow/src/Logs/GX /opt/airflow/src/Logs/GSM

# 5. Installation des dépendances Airflow
COPY requirements/airflow.txt /tmp/airflow.txt
RUN pip install --no-cache-dir -r /tmp/airflow.txt
# On installe virtualenv pour permettre l'utilisation du PythonVirtualenvOperator
RUN pip install --no-cache-dir apache-airflow-providers-docker psycopg2-binary virtualenv

Points clés du Dockerfile :

  • Gestion des conflits ODBC — Suppression des librairies Debian 12 qui bloquent les drivers Microsoft
  • Driver ODBC 17 — Installation sécurisée via clé GPG pour SQL Server
  • Virtualenv — Support du PythonVirtualenvOperator pour isoler les dépendances ETL
  • Structure des logs — Dossiers pré-créés pour GX et GSM

Avantages : isolation des dépendances par DAG (pas de Docker-in-Docker), debugging facilité, logs centralisés dans Airflow et possibilité de définir des dépendances différentes par DAG.

Flux de données et réseau
┌──────────────────────────────────────────────────────────────────────────┐
│                         RÉSEAU: etl-network                              │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   ┌──────────────┐      ┌──────────────┐      ┌──────────────┐           │
│   │  SQL Server  │      │   MariaDB    │      │   MariaDB    │           │
│   │(GX/SYMETRIE) │      │    (GSM)     │      │ (SYM_BD_TEST)│           │
│   │   :1433      │      │    :3306     │      │    :3306     │           │
│   └──────┬───────┘      └──────┬───────┘      └──────▲───────┘           │
│          │                     │                     │                   │
│          │    EXTRACT          │    EXTRACT          │   LOAD            │
│          └──────────┬──────────┘                     │                   │
│                     │                                │                   │
│                     ▼                                │                   │
│          ┌───────────────────────────────────────────┴────────┐          │
│          │              AIRFLOW SCHEDULER                     │          │
│          │         ┌──────────────────────────┐               │          │
│          │         │  PythonVirtualenvOperator│               │          │
│          │         │  ┌────────────────────┐  │               │          │
│          │         │  │  ETL_Modulaire     │  │               │          │
│          │         │  │  - Python 3.10     │  │               │          │
│          │         │  │  - ODBC Driver 17  │  │               │          │
│          │         │  │  - Pandas/SQLAlch  │  │               │          │
│          │         │  └────────────────────┘  │               │          │
│          │         └──────────────────────────┘               │          │
│          └────────────────────────────────────────────────────┘          │
│                              │                                           │
│                              │                                           │
│          ┌───────────────────┴───────────────────┐                       │
│          │          AIRFLOW WEBSERVER            │                       │
│          │             port 8080                 │                       │
│          │  - Monitoring DAGs                    │                       │
│          │  - Logs temps réel                    │                       │
│          │  - Déclenchement manuel               │                       │
│          └───────────────────┬───────────────────┘                       │
│                              │                                           │
│                              ▼                                           │
│                    ┌──────────────────┐                                  │
│                    │    PostgreSQL    │                                  │
│                    │   (Métadonnées)  │                                  │
│                    │      :5432       │                                  │
│                    └──────────────────┘                                  │
│                                                                          │
└──────────────────────────────────────────────────────────────────────────┘
Volumes et montages
  • ./dags/opt/airflow/dags — Fichiers DAG Python
  • ./src/opt/airflow/src — Module ETL_Modulaire et scripts principaux
  • ./config/opt/airflow/config — Fichiers de configuration
  • ./data/opt/airflow/data — Données intermédiaires
  • ./requirements/opt/airflow/requirements — Fichiers requirements pour virtualenv
  • ./plugins/opt/airflow/plugins — Plugins Airflow personnalisés
  • ./src/Logs/GX et ./src/Logs/GSM — Logs horodatés par source
  • db_data (volume nommé) — Persistance PostgreSQL
  • airflow_logs (volume nommé) — Logs Airflow

DAGs Airflow — Détail complet

Structure d'un DAG avec PythonVirtualenvOperator

Les DAGs utilisent @task.virtualenv pour créer un environnement Python isolé avec toutes les dépendances nécessaires :

with DAG(
    dag_id='ETL_GX_DAG',
    schedule_interval='0 */3 * * *',
    catchup=False,
    on_failure_callback=notify_failure_to_teams,
    tags=['gx', 'etl', 'teams'],
) as dag:

    @task.virtualenv(
        task_id='run_etl_gx_script',
        requirements=["pandas", "SQLAlchemy", "pyodbc",
                      "PyMySQL", "python-dotenv", ...],  # versions épinglées
        system_site_packages=False,
    )
    def run_etl_wrapper():
        import subprocess, sys
        subprocess.run([sys.executable, "/opt/airflow/src/ETL_GX_main.py"],
                       check=True)

    run_etl_wrapper() >> PythonOperator(
        task_id='notify_success_to_teams',
        python_callable=notify_success_to_teams,
    )
ETL_GX_DAG — Données commerciales (SQL Server)

Configuration :

  • Schedule : 0 */3 * * * (toutes les 3 heures)
  • Retry : 1 tentative avec délai de 5 secondes
  • Catchup : désactivé
  • Tags : ['gx', 'etl', 'teams']

Tables extraites avec requêtes personnalisées (extrait — 27 tables au total) :

Table sourceRequête personnaliséeClé primaire
ActiviteSELECT N_Activites, Code_Secteur, Secteur, ... FROM ActiviteN_Activites
AffaireSELECT N_Affaire, Designation, N_Client, N_ITC, ... FROM AffaireN_Affaire
AFFAIRE_VUE_LISTE_HEURESSELECT N_Heure, N_ITC, Personne, Nombre AS nombre_heures_pointe, ... FROM AFFAIRE_VUE_LISTE_HEURESN_Heure
Cde_cli / CDE_FOURSELECT N_Cde_*, Nom_Cde, N_Affaire, ...N_Cde_Cli / N_Cde_Four
CLIENT / FOURNISSSELECT N_Client/N_Fournisseur, Nom, Pays, ...N_Client / N_Fournisseur
DEVIS / Fact_cli / Fac_fourSELECT N_*, Nom_*, N_Client/N_Affaire, ...N_Devis / N_Fact_Cli / N_Fac_Four
MVTS_STOCKSELECT N_Mvts_Stock, N_Produit, N_Document, Origine, ... FROM MVTS_STOCKN_Mvts_Stock
USERS / Itc / ServiceSELECT identifiants, libellés, ...N_User / N_Itc / N_Service

Autres tables : DEPOT, DEVISE, EGXS_lsf_Demande_Conge, ENTREE_SORTIE, ENTREE_SORTIE_DETAIL, Famille_Client, FAMILLE_AFFAIRE, Frais, Rfac_cli, SCD_FOUR, Scde_cli, SS_BR, Treso.

ETL_GSM_DAG — Gestion de stock (MariaDB)

Configuration :

  • Schedule : 0 */3 * * * (toutes les 3 heures)
  • Retry : 1 tentative avec délai de 5 secondes
  • Catchup : désactivé
  • Tags : ['gsm', 'etl', 'teams']

Tables extraites avec requêtes personnalisées :

Table sourceRequête personnaliséeClé primaire
t_affairesSELECT ID, Societe, Sigle FROM t_affairesId
t_brSELECT ID, noBr, affaire, transfert, ... FROM t_brId
t_emplacementsSELECT *, CONCAT(...) AS Emplacement_Concat FROM t_emplacementsId
t_journalSELECT ID, Dates, Type, Commentaire, ... FROM t_journalId
t_manquants_prepa_affaireSELECT ID, composition, ref3, designation, qte FROM t_manquants_prepa_affaireId
t_placesSELECT ID, jour AS Date, reference, ... FROM t_placesId
t_produitsSELECT Id, Designation, Famille, Reference, ... FROM t_produitsId
t_sommesSELECT Id, reference, quantite, prix, affaire FROM t_sommesId
Workflow d'exécution d'un DAG
1. SCHEDULER détecte l'heure d'exécution (cron 0 */3 * * *)
        │
        ▼
2. CRÉATION VIRTUALENV (@task.virtualenv)
   ├─ Création environnement Python isolé
   ├─ Installation des dépendances (pandas, sqlalchemy, pyodbc...)
   ├─ Configuration du PYTHONPATH
   └─ Exécution subprocess.run() avec le script ETL
        │
        ▼
3. SCRIPT ETL PRINCIPAL (ETL_GX_main.py / ETL_GSM_main.py)
   ├─ Initialisation du logging (fichier horodaté)
   ├─ Chargement de la configuration (config.py)
   ├─ Établissement des connexions source et cible
   └─ Boucle sur chaque table configurée:
        │
        ├─ EXTRACT: extract_table_data()
        │   └─ Exécution de la requête SQL (personnalisée ou SELECT *)
        │
        ├─ TRANSFORM: transform_dataframe()
        │   ├─ convert_datetime_to_date()
        │   ├─ convert_numeric()
        │   ├─ rename_column_title()
        │   ├─ split_affaire_column() (si applicable)
        │   └─ dropna(axis=0, how='all')
        │
        └─ LOAD: load_table()
            ├─ Création staging table temporaire (stg_{table}_{uuid})
            ├─ Insertion des données transformées
            ├─ Calcul des hash MD5 pour comparaison
            ├─ Exécution INSERT/UPDATE/DELETE
            ├─ Logging des métriques (BEFORE/AFTER upsert)
            └─ Suppression staging table
        │
        ▼
4. FIN DU VIRTUALENV (nettoyage automatique)
        │
        ▼
5. NOTIFICATION TEAMS
   ├─ Succès → notify_success_to_teams() → MessageCard verte
   └─ Échec → on_failure_callback → MessageCard rouge avec détails

Module ETL Python — Détail des fonctions

Structure du module ETL_Modulaire
/opt/airflow/src/
├── ETL_Modulaire/
│   ├── __init__.py
│   ├── connexion.py          # Gestionnaire de connexions multi-bases
│   ├── config.py             # Configuration centralisée (tables, requêtes, PK)
│   └── data_manipulation.py  # Fonctions ETL (extract, transform, load)
├── ETL_GX_main.py            # Point d'entrée DAG GX
├── ETL_GSM_main.py           # Point d'entrée DAG GSM
└── Logs/
    ├── GX/                   # Logs ETL GX par jour
    └── GSM/                  # Logs ETL GSM par jour
Script principal ETL (ETL_GX_main.py / ETL_GSM_main.py)

Le script charge la configuration via get_config("GX") (ou"GSM") qui fournit les requêtes personnalisées, la liste des tables, les correspondances source→cible et les clés primaires. Il ouvre les connexions source et cible, puis itère sur chaque table avec un pipeline extract → transform → load :

with source.connect() as cnx_src, target.connect() as cnx_tgt:
    for table in tables:
        df = extract_table_data(table, cnx_src, custom_queries)
        df = transform_dataframe(df, table)
        load_table(df, cnx_tgt, correspondance[table], pk_map[...])

Les erreurs sont capturées par un try/except global qui log la stack trace et termine le script avec sys.exit(1) pour qu'Airflow déclenche le callback Teams. Logging configuré par jour dans /opt/airflow/src/Logs/GX/.

connexion.py — Gestion des connexions

Trois fabriques de moteurs SQLAlchemy exposées par le module : Connexion_GX() (SQL Server via ODBC Driver 17), Connexion_GSM() et Connexion_SYM_BD_TEST() (MariaDB via PyMySQL). Identifiants chargés depuis .env avec python-dotenv :

def Connexion_GX():
    engine = create_engine(
        f"mssql+pyodbc://{user}:{pwd}@{host}:{port}/{db}"
        "?driver=ODBC+Driver+17+for+SQL+Server"
    )
    with engine.connect() as cnx:
        cnx.execute(text("SELECT 1"))  # validation
    return engine

Les deux connexions MariaDB suivent le même pattern avec le driver mysql+pymysql. Chaque fonction valide la connexion avec un SELECT 1 avant de retourner l'engine.

data_manipulation.py — Fonctions de transformation

1. convert_numeric() — Conversion intelligente des types numériques :

def convert_numeric(dataframe: pd.DataFrame) -> pd.DataFrame:
    # Convertit toutes les colonnes numériques du DataFrame en types adaptés (Int64 pour les entiers, float64 sinon)
    for column in dataframe.select_dtypes(include=[np.number]).columns:
        # Si toutes les valeurs sont entières → Int64 (nullable)
        if (dataframe[column].dropna() % 1 == 0).all():
            dataframe[column] = dataframe[column].astype('Int64')
        else:
            dataframe[column] = dataframe[column].astype('float64')
    return dataframe

2. convert_datetime_to_date() — Conversion des datetime en dates :

def convert_datetime_to_date(dataframe: pd.DataFrame) -> pd.DataFrame:
    # Convertit toutes les colonnes de type datetime du DataFrame en simples dates (sans l'heure)
    for column in dataframe.columns:
        if pd.api.types.is_datetime64_any_dtype(dataframe[column]):
            dataframe[column] = dataframe[column].dt.date
    return dataframe

3. split_affaire_column() — Séparation de la colonne Affaire :

def split_affaire_column(dataframe: pd.DataFrame, nom_colonne: str) -> pd.DataFrame:
    # Sépare la colonne donnée en deux nouvelles colonnes : 'N_Affaire' (5 premiers caractères) et 'Designation' (à partir du 8e), puis supprime l'ancienne
    valeurs = dataframe[nom_colonne].astype(str)
    dataframe['N_Affaire'] = valeurs.str[:5]
    dataframe['Designation'] = valeurs.str[7:]
    return dataframe.drop(nom_colonne, axis=1)

4. replace_dot_with_comma() — Nettoyage des formats numériques européens :

def replace_dot_with_comma(dataframe: pd.DataFrame) -> pd.DataFrame:
    # Nettoie les colonnes texte : remplace les virgules par des points pour les nombres, 
    # supprime les espaces et convertit les valeurs numériques en float quand c'est possible
    for column in dataframe.select_dtypes(include='object'):
        # Remplace les virgules par des points (format FR → EN)
        # Supprime les espaces parasites
        # Tente la conversion en float si le pattern correspond
        dataframe[column] = dataframe[column].apply(clean_and_convert)
    return dataframe

5. rename_column_title() — Mise en forme des noms de colonnes :

def rename_column_title(dataframe: pd.DataFrame) -> pd.DataFrame:
    # Met en forme les noms de colonnes en capitalisant la première lettre de chaque mot (style "Title Case")
    try:
        dataframe.columns = [col.title() for col in dataframe.columns]
    except Exception as e:
        logging.error(f"Erreur lors du renommage des colonnes: {e}")
    return dataframe

6. extract_table_data() — Extraction des données :

def extract_table_data(table_name: str, engine, custom_queries: dict = {}) -> pd.DataFrame:
    # Extrait les données d'une table SQL (ou d'une requête personnalisée) et retourne le résultat sous forme de DataFrame
    query = custom_queries.get(table_name, f"SELECT * FROM {table_name}")
    dataframe = pd.read_sql(query, engine)
    logging.info(f"{len(dataframe)} lignes extraites de {table_name}")
    return dataframe

7. transform_dataframe() — Pipeline de transformation :

def transform_dataframe(dataframe: pd.DataFrame, table_name: str) -> pd.DataFrame:
    # Applique une série de transformations standardisées sur un DataFrame selon la table traitée (nettoyage, conversion, renommage, etc.)
    logging.info(f"Transformation de la table {table_name}")

    if table_name == 't_br':
        dataframe = replace_dot_with_comma(dataframe)

    cleaned_dataframe = (
        dataframe.pipe(convert_datetime_to_date)
          .pipe(convert_numeric)
          .pipe(rename_column_title)
    )
    cleaned_dataframe = cleaned_dataframe.dropna(axis=0, how='all')

    if table_name in ('SYMETRIE.dbo.Affaire', 'Affaire'):
        cleaned_dataframe = split_affaire_column(cleaned_dataframe, 'N_Affaire')

    return cleaned_dataframe

8. _compute_row_hash_for_dataframe() — Hashing MD5 pour détection des modifications :

def _compute_row_hash_for_dataframe(dataframe: pd.DataFrame, exclude_column=None) -> pd.Series:
    # Génère un hash unique (MD5) pour chaque ligne du DataFrame, en ignorant certaines colonnes si spécifié
    if exclude_column is None:
        exclude_column = []

    columns = [column for column in list(dataframe.columns) if column not in exclude_column]
    columns = sorted(columns, key=lambda x: str(x).lower())
    
    def row_hash(row):
        # Concaténation des valeurs avec séparateur
        values = '|'.join([safe_str(row[c]) for c in columns])
        # Hash MD5 hexadécimal
        return hashlib.md5(values.encode('utf-8')).hexdigest()
    
    return dataframe.apply(row_hash, axis=1)
load_table() — Algorithme de chargement intélligent

Étapes détaillées :

  1. _prepare_dataframe() — Ajout de la colonne row_hash MD5
  2. _create_insert_staging_table() — Création table temporaire stg_{table}_{uuid}
  3. _create_target_table() — Création table cible si inexistante (DDL auto-généré)
  4. _check_primary_key_possible() — Vérification unicité de la clé primaire
  5. _put_primary_key() — Ajout contrainte PK si absente
  6. _compute_pre_update_metrics() — Calcul des différences :
    • Nouvelles lignes (INSERT) : PK présente dans staging, absente dans cible
    • Lignes modifiées (UPDATE) : PK identique mais hash différent
    • Lignes à supprimer (DELETE) : PK présente dans cible, absente dans staging
    • Lignes inchangées : PK et hash identiques
  7. _execute_update_target_table() — Exécution des opérations SQL :
    • _insert_rows() INSERT INTO target SELECT * FROM staging WHERE...
    • _update_rows() UPDATE target SET ... FROM staging WHERE pk = pk AND hash != hash
    • _delete_rows() DELETE FROM target WHERE pk NOT IN (SELECT pk FROM staging)
  8. _compute_post_update_metrics() — Vérification du nombre de lignes final
  9. _drop_staging_table() — Nettoyage de la table temporaire

Système de notifications Microsoft Teams

Fonctions de notification

Les callbacks postent un MessageCard JSON sur le webhook Teams (rouge en cas d'échec, vert en succès) avec DAG, task, timestamp et lien vers les logs. Exemple du callback d'échec :

def notify_failure_to_teams(context):
    payload = {
        "@type": "MessageCard",
        "themeColor": "FF0000",
        "sections": [{
            "activityTitle": f"Task Failed: {context['task_instance'].task_id}",
            "facts": [
                {"name": "DAG", "value": context['dag'].dag_id},
                {"name": "Log URL", "value": context['task_instance'].log_url},
            ],
        }],
    }
    requests.post(TEAMS_WEBHOOK_URL, json=payload)

notify_success_to_teams() suit la même structure avec themeColor: "00FF00" et un message de confirmation.

Intégration dans les DAGs
  • on_failure_callback=notify_failure_to_teams — Au niveau du DAG (toutes les tasks)
  • notify_success — PythonOperator en fin de chaîne (etl_task >> notify_success)

Métriques et monitoring

Logs ETL (exemple réel)
2026-01-15 08:09:22,531 | INFO | Début du script ETL GX.
2026-01-15 08:09:22,722 | INFO | --- Debut traitement pour Activite ---
2026-01-15 08:09:22,791 | INFO | 106 lignes extraites de Activite
2026-01-15 08:09:22,791 | INFO | Transformation de la table Activite
2026-01-15 08:09:22,816 | INFO | [activite] début load staging `stg_activite_98ac4126`
2026-01-15 08:09:22,990 | INFO | [stg_activite_98ac4126] staging créé avec 106 lignes.
2026-01-15 08:09:22,999 | INFO | [stg_activite_98ac4126] 0 NULL(s) et 0 doublon(s) détectés.
2026-01-15 08:09:23,060 | INFO | [stg_activite_98ac4126] clé primaire ajoutée sur `N_Activites`.
2026-01-15 08:09:23,098 | INFO | [activite] BEFORE upsert: staging=106, target=106, matching=106, to_insert=0, to_update=0, to_delete=0
2026-01-15 08:09:23,103 | INFO | [activite] insert : 0 nouvelles lignes insérées.
2026-01-15 08:09:23,110 | INFO | [activite] update : 0 lignes modifiées.
2026-01-15 08:09:23,116 | INFO | [activite] delete : 0 lignes supprimées.
2026-01-15 08:09:23,124 | INFO | [activite] AFTER upsert: total=106
2026-01-15 08:09:23,148 | INFO | [activite] staging `stg_activite_98ac4126` supprimée.
2026-01-15 08:09:23,148 | INFO | [activite] load terminé.
2026-01-15 08:09:23,149 | INFO | --- Fin traitement pour Activite ---

2026-01-15 08:09:23,149 | INFO | --- Debut traitement pour Affaire ---
2026-01-15 08:09:23,759 | INFO | 5844 lignes extraites de Affaire
2026-01-15 08:09:23,761 | INFO | Transformation de la table Affaire
2026-01-15 08:09:24,147 | INFO | [affaire] début load staging `stg_affaire_78e5a9b9`
2026-01-15 08:09:35,968 | INFO | [stg_affaire_78e5a9b9] staging créé avec 5844 lignes.
2026-01-15 08:09:35,995 | INFO | [stg_affaire_78e5a9b9] 0 NULL(s) et 0 doublon(s) détectés.
2026-01-15 08:09:36,404 | INFO | [stg_affaire_78e5a9b9] clé primaire ajoutée sur `N_Affaire`.
2026-01-15 08:09:36,630 | INFO | [affaire] BEFORE upsert: staging=5844, target=5844, matching=5844, to_insert=0, to_update=3, to_delete=0
2026-01-15 08:09:36,658 | INFO | [affaire] insert : 0 nouvelles lignes insérées.
2026-01-15 08:09:36,699 | INFO | [affaire] update : 3 lignes modifiées.
2026-01-15 08:09:36,745 | INFO | [affaire] delete : 0 lignes supprimées.
2026-01-15 08:09:36,756 | INFO | [affaire] AFTER upsert: total=5844
2026-01-15 08:09:36,805 | INFO | [affaire] staging supprimée.
2026-01-15 08:09:36,806 | INFO | [affaire] load terminé.
2026-01-15 08:09:36,808 | INFO | --- Fin traitement pour Affaire ---

2026-01-15 08:09:36,809 | INFO | --- Debut traitement pour AFFAIRE_VUE_LISTE_HEURES ---
2026-01-15 08:09:58,956 | INFO | 163161 lignes extraites de AFFAIRE_VUE_LISTE_HEURES
2026-01-15 08:11:17,284 | INFO | [heures] BEFORE upsert: staging=163161, target=163156, matching=163156, to_insert=5, to_update=0, to_delete=0
2026-01-15 08:11:17,762 | INFO | [heures] insert : 5 nouvelles lignes insérées.
2026-01-15 08:11:18,938 | INFO | [heures] load terminé.
2026-01-15 08:11:18,959 | INFO | --- Fin traitement pour AFFAIRE_VUE_LISTE_HEURES ---
KPIs du projet
MétriqueValeur
Tables synchronisées35 tables métier (27 GX + 8 GSM)
Fréquence d'exécution8 fois/jour (toutes les 3h)
Temps moyen par DAG GX~2-3 minutes (27 tables)
Volume par exécution GX~180 000+ lignes traitées
Plus grosse tableAFFAIRE_VUE_LISTE_HEURES (163 161 lignes)
Taux de succès>99%

Compétences techniques mobilisées

DomaineTechnologies / Compétences
Data EngineeringConception pipelines ETL, modélisation données, synchronisation incrémentale
PythonPandas, SQLAlchemy, hashlib, logging, modules réutilisables
SQLRequêtes complexes, jointures, UPSERT, transactions, DDL dynamique
Bases de donnéesSQL Server, MariaDB/MySQL, PostgreSQL, drivers ODBC
DevOpsDocker, Docker Compose, volumes, réseaux
OrchestrationApache Airflow (DAGs, Operators, Scheduling, Callbacks)
IntégrationAPIs REST, Webhooks Microsoft Teams, MessageCards
AdministrationConfiguration ODBC, gestion des drivers, permissions Linux