Conception et mise en place d'une pipeline ETL automatisé pour la synchronisation de données entre plusieurs bases de données hétérogènes (SQL Server, MariaDB) au sein d'un environnement conteneurisé et orchestré par Apache Airflow. Ce projet répond à un besoin critique de consolidation des données métier pour l'analyse et l'intégrité même des données.
Contexte et problématique métier
L'entreprise disposait de plusieurs sources de données isolées créant des silos d'information :
- Base GX — ERP sur SQL Server contenant les données commerciales (affaires, devis, commandes, factures, articles, clients...)
- Base GSM — Application de gestion de stock sur MariaDB (produits, emplacements, mouvements, bons de réception...)
Problèmes identifiés :
- Impossibilité de croiser les données commerciales et logistiques
- Rapports manuels chronophages et source d'erreurs
- Données non synchronisées entre les systèmes
Objectifs du projet :
- Centraliser les données de 2 systèmes sources vers une base transformée et unifiée (SYM_BD_TEST)
- Automatiser les flux ETL avec une exécution planifiée toutes les 3 heures via Airflow
- Garantir l'intégrité référentielle via un système de synchronisation intelligent (hashing MD5)
- Assurer une traçabilité complète des opérations et des alertes en temps réel
- Permettre une scalabilité horizontale pour l'ajout de nouvelles sources
Architecture technique globale
L'infrastructure repose sur une architecture Docker multi-conteneurs orchestrée via Docker Compose, avec séparation des responsabilitées :
Services Docker Compose
Le fichier docker-compose.yml définit une architecture simplifiée de 4 services basée sur l'exécution des scripts Python via PythonVirtualenvOperator, sans la complexité d'un Docker-in-Docker :
1. Service db — Base de données PostgreSQL
Stocke les métadonnées Airflow (DAG runs, task instances, logs, connexions, variables). Port 5432, healthcheck via pg_isready.
db:
image: postgres:15
container_name: etl-postgres
restart: unless-stopped
networks: [etl-network]
environment:
POSTGRES_USER: ${DB_USER:-etl_user}
POSTGRES_PASSWORD: ${DB_PASSWORD:-etl_password}
POSTGRES_DB: ${DB_NAME:-symetrie_test}
TZ: ${TIMEZONE:-Europe/Paris}
volumes:
- db_data:/var/lib/postgresql/data
ports:
- "${DB_PORT:-5432}:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER"]
interval: 10s
timeout: 5s
retries: 5- Variables d'environnement :
DB_USER,DB_PASSWORD,DB_NAMEdéfinies via.envavec valeurs par défaut - Volume persistant :
db_data:/var/lib/postgresql/datapour garantir la durabilité des données entre redémarrages - Restart policy :
unless-stopped— redémarrage automatique sauf arrêt manuel
2. Service airflow-init — Initialisation Airflow
Image custom etl-airflow:custom. Initialise la base de métadonnées et crée l'utilisateur admin (exécuté une seule fois au premier démarrage, dépend de db healthy).
airflow-init:
image: etl-airflow:custom
container_name: etl-airflow-init
networks: [etl-network]
depends_on:
db: { condition: service_healthy }
env_file: .env
environment:
_AIRFLOW_DB_UPGRADE: "true"
_AIRFLOW_WWW_USER_CREATE: "true"
_AIRFLOW_WWW_USER_USERNAME: ${AIRFLOW_USERNAME:-admin}
_AIRFLOW_WWW_USER_PASSWORD: ${AIRFLOW_PASSWORD:-admin}
_AIRFLOW_WWW_USER_FIRSTNAME: ${AIRFLOW_FIRSTNAME:-ETL}
_AIRFLOW_WWW_USER_LASTNAME: ${AIRFLOW_LASTNAME:-Admin}
_AIRFLOW_WWW_USER_ROLE: ${AIRFLOW_ROLE:-Admin}
_AIRFLOW_WWW_USER_EMAIL: ${AIRFLOW_EMAIL:-etl@admin.local}
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
volumes:
- airflow_logs:/opt/airflow/logs
entrypoint: /bin/bash
command:
- -c
- |
mkdir -p /opt/airflow/dags /opt/airflow/logs /opt/airflow/plugins
airflow db init
airflow users create \
--username "$$_AIRFLOW_WWW_USER_USERNAME" \
--password "$$_AIRFLOW_WWW_USER_PASSWORD" \
--firstname "$$_AIRFLOW_WWW_USER_FIRSTNAME" \
--lastname "$$_AIRFLOW_WWW_USER_LASTNAME" \
--role "$$_AIRFLOW_WWW_USER_ROLE" \
--email "$$_AIRFLOW_WWW_USER_EMAIL"3. Service airflow-webserver — Interface Web
Interface de monitoring et de gestion des DAGs sur le port 8080 (image etl-airflow:custom, healthcheck sur /health).
airflow-webserver:
build:
context: .
dockerfile: Dockerfile
image: etl-airflow:custom
container_name: etl-airflow-webserver
restart: unless-stopped
networks: [etl-network]
depends_on:
db: { condition: service_healthy }
airflow-init: { condition: service_completed_successfully }
env_file: .env
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "false"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY}
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
AIRFLOW__CORE__DEFAULT_TIMEZONE: ${TIMEZONE:-Europe/Paris}
AIRFLOW_CONN_DB_DEFAULT: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
volumes:
- ./dags:/opt/airflow/dags
- ./config:/opt/airflow/config
- airflow_logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./data:/opt/airflow/data
# Montage du code source et requirements pour PythonVirtualenvOperator
- ./src:/opt/airflow/src
- ./requirements:/opt/airflow/requirements
ports: ["${AIRFLOW_PORT:-8080}:8080"]
command: webserver
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5L'UI Airflow donne accès à la vue DAGs (statut, planification), au Graph View des dépendances, à la Gantt Chart, aux logs en temps réel, au déclenchement manuel et à la gestion des connexions aux bases.
4. Service airflow-scheduler — Ordonnanceur
Orchestration et planification des tâches ETL (image etl-airflow:custom partagée avec le webserver).
airflow-scheduler:
image: etl-airflow:custom
container_name: etl-airflow-scheduler
restart: unless-stopped
networks: [etl-network]
depends_on:
db: { condition: service_healthy }
airflow-init: { condition: service_completed_successfully }
env_file: .env
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "false"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY}
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
AIRFLOW__CORE__DEFAULT_TIMEZONE: ${TIMEZONE:-Europe/Paris}
AIRFLOW_CONN_DB_DEFAULT: postgresql+psycopg2://${DB_USER}:${DB_PASSWORD}@db:5432/${DB_NAME}
volumes:
- ./dags:/opt/airflow/dags
- ./config:/opt/airflow/config
- airflow_logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./data:/opt/airflow/data
- ./src:/opt/airflow/src
- ./requirements:/opt/airflow/requirements
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5Rôles du Scheduler : parsing des DAGs toutes les 30s, planification selon le schedule_interval, exécution via PythonVirtualenvOperator (environnement isolé) et retry automatique des tâches échouées.
Image Docker personnalisée etl-airflow:custom
L'image etl-airflow:custom est construite à partir d'un Dockerfile personnalisé qui inclut toutes les dépendances nécessaires pour l'exécution des ETL, notamment les drivers ODBC pour SQL Server :
FROM apache/airflow:2.8.1-python3.10
USER root
# 1. Installation des outils de base
RUN apt-get update && apt-get install -y \
gcc g++ git curl gnupg2 apt-utils apt-transport-https \
--no-install-recommends && \
rm -rf /var/lib/apt/lists/*
# 2. NETTOYAGE DES CONFLITS (Crucial pour Debian 12 / Airflow 2.8)
# On supprime les librairies ODBC par défaut de Debian 12 qui bloquent l'installation des drivers Microsoft
RUN apt-get update && apt-get remove -y \
libodbc2 \
libodbcinst2 \
unixodbc-common \
&& apt-get clean
# 3. Installation des drivers ODBC (SQL Server)
# On télécharge la clé dans un fichier temporaire pour éviter l'erreur de pipe
RUN curl -fsSL https://packages.microsoft.com/keys/microsoft.asc -o microsoft.asc \
&& gpg --batch --yes --dearmor -o /usr/share/keyrings/microsoft-prod.gpg microsoft.asc \
&& rm microsoft.asc \
&& echo "deb [arch=amd64,arm64,armhf signed-by=/usr/share/keyrings/microsoft-prod.gpg] https://packages.microsoft.com/debian/11/prod bullseye main" > /etc/apt/sources.list.d/mssql-release.list \
&& apt-get update \
&& ACCEPT_EULA=Y apt-get install -y msodbcsql17 unixodbc unixodbc-dev \
&& rm -rf /var/lib/apt/lists/*
# 4. Préparation des dossiers
USER airflow
RUN mkdir -p /opt/airflow/src/Logs/GX /opt/airflow/src/Logs/GSM
# 5. Installation des dépendances Airflow
COPY requirements/airflow.txt /tmp/airflow.txt
RUN pip install --no-cache-dir -r /tmp/airflow.txt
# On installe virtualenv pour permettre l'utilisation du PythonVirtualenvOperator
RUN pip install --no-cache-dir apache-airflow-providers-docker psycopg2-binary virtualenvPoints clés du Dockerfile :
- Gestion des conflits ODBC — Suppression des librairies Debian 12 qui bloquent les drivers Microsoft
- Driver ODBC 17 — Installation sécurisée via clé GPG pour SQL Server
- Virtualenv — Support du
PythonVirtualenvOperatorpour isoler les dépendances ETL - Structure des logs — Dossiers pré-créés pour GX et GSM
Avantages : isolation des dépendances par DAG (pas de Docker-in-Docker), debugging facilité, logs centralisés dans Airflow et possibilité de définir des dépendances différentes par DAG.
Flux de données et réseau
┌──────────────────────────────────────────────────────────────────────────┐ │ RÉSEAU: etl-network │ ├──────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ SQL Server │ │ MariaDB │ │ MariaDB │ │ │ │(GX/SYMETRIE) │ │ (GSM) │ │ (SYM_BD_TEST)│ │ │ │ :1433 │ │ :3306 │ │ :3306 │ │ │ └──────┬───────┘ └──────┬───────┘ └──────▲───────┘ │ │ │ │ │ │ │ │ EXTRACT │ EXTRACT │ LOAD │ │ └──────────┬──────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌───────────────────────────────────────────┴────────┐ │ │ │ AIRFLOW SCHEDULER │ │ │ │ ┌──────────────────────────┐ │ │ │ │ │ PythonVirtualenvOperator│ │ │ │ │ │ ┌────────────────────┐ │ │ │ │ │ │ │ ETL_Modulaire │ │ │ │ │ │ │ │ - Python 3.10 │ │ │ │ │ │ │ │ - ODBC Driver 17 │ │ │ │ │ │ │ │ - Pandas/SQLAlch │ │ │ │ │ │ │ └────────────────────┘ │ │ │ │ │ └──────────────────────────┘ │ │ │ └────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌───────────────────┴───────────────────┐ │ │ │ AIRFLOW WEBSERVER │ │ │ │ port 8080 │ │ │ │ - Monitoring DAGs │ │ │ │ - Logs temps réel │ │ │ │ - Déclenchement manuel │ │ │ └───────────────────┬───────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ PostgreSQL │ │ │ │ (Métadonnées) │ │ │ │ :5432 │ │ │ └──────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────┘
Volumes et montages
./dags→/opt/airflow/dags— Fichiers DAG Python./src→/opt/airflow/src— Module ETL_Modulaire et scripts principaux./config→/opt/airflow/config— Fichiers de configuration./data→/opt/airflow/data— Données intermédiaires./requirements→/opt/airflow/requirements— Fichiers requirements pour virtualenv./plugins→/opt/airflow/plugins— Plugins Airflow personnalisés./src/Logs/GXet./src/Logs/GSM— Logs horodatés par sourcedb_data(volume nommé) — Persistance PostgreSQLairflow_logs(volume nommé) — Logs Airflow
DAGs Airflow — Détail complet
Structure d'un DAG avec PythonVirtualenvOperator
Les DAGs utilisent @task.virtualenv pour créer un environnement Python isolé avec toutes les dépendances nécessaires :
with DAG(
dag_id='ETL_GX_DAG',
schedule_interval='0 */3 * * *',
catchup=False,
on_failure_callback=notify_failure_to_teams,
tags=['gx', 'etl', 'teams'],
) as dag:
@task.virtualenv(
task_id='run_etl_gx_script',
requirements=["pandas", "SQLAlchemy", "pyodbc",
"PyMySQL", "python-dotenv", ...], # versions épinglées
system_site_packages=False,
)
def run_etl_wrapper():
import subprocess, sys
subprocess.run([sys.executable, "/opt/airflow/src/ETL_GX_main.py"],
check=True)
run_etl_wrapper() >> PythonOperator(
task_id='notify_success_to_teams',
python_callable=notify_success_to_teams,
)ETL_GX_DAG — Données commerciales (SQL Server)
Configuration :
- Schedule :
0 */3 * * *(toutes les 3 heures) - Retry : 1 tentative avec délai de 5 secondes
- Catchup : désactivé
- Tags :
['gx', 'etl', 'teams']
Tables extraites avec requêtes personnalisées (extrait — 27 tables au total) :
| Table source | Requête personnalisée | Clé primaire |
|---|---|---|
| Activite | SELECT N_Activites, Code_Secteur, Secteur, ... FROM Activite | N_Activites |
| Affaire | SELECT N_Affaire, Designation, N_Client, N_ITC, ... FROM Affaire | N_Affaire |
| AFFAIRE_VUE_LISTE_HEURES | SELECT N_Heure, N_ITC, Personne, Nombre AS nombre_heures_pointe, ... FROM AFFAIRE_VUE_LISTE_HEURES | N_Heure |
| Cde_cli / CDE_FOUR | SELECT N_Cde_*, Nom_Cde, N_Affaire, ... | N_Cde_Cli / N_Cde_Four |
| CLIENT / FOURNISS | SELECT N_Client/N_Fournisseur, Nom, Pays, ... | N_Client / N_Fournisseur |
| DEVIS / Fact_cli / Fac_four | SELECT N_*, Nom_*, N_Client/N_Affaire, ... | N_Devis / N_Fact_Cli / N_Fac_Four |
| MVTS_STOCK | SELECT N_Mvts_Stock, N_Produit, N_Document, Origine, ... FROM MVTS_STOCK | N_Mvts_Stock |
| USERS / Itc / Service | SELECT identifiants, libellés, ... | N_User / N_Itc / N_Service |
Autres tables : DEPOT, DEVISE, EGXS_lsf_Demande_Conge, ENTREE_SORTIE, ENTREE_SORTIE_DETAIL, Famille_Client, FAMILLE_AFFAIRE, Frais, Rfac_cli, SCD_FOUR, Scde_cli, SS_BR, Treso.
ETL_GSM_DAG — Gestion de stock (MariaDB)
Configuration :
- Schedule :
0 */3 * * *(toutes les 3 heures) - Retry : 1 tentative avec délai de 5 secondes
- Catchup : désactivé
- Tags :
['gsm', 'etl', 'teams']
Tables extraites avec requêtes personnalisées :
| Table source | Requête personnalisée | Clé primaire |
|---|---|---|
| t_affaires | SELECT ID, Societe, Sigle FROM t_affaires | Id |
| t_br | SELECT ID, noBr, affaire, transfert, ... FROM t_br | Id |
| t_emplacements | SELECT *, CONCAT(...) AS Emplacement_Concat FROM t_emplacements | Id |
| t_journal | SELECT ID, Dates, Type, Commentaire, ... FROM t_journal | Id |
| t_manquants_prepa_affaire | SELECT ID, composition, ref3, designation, qte FROM t_manquants_prepa_affaire | Id |
| t_places | SELECT ID, jour AS Date, reference, ... FROM t_places | Id |
| t_produits | SELECT Id, Designation, Famille, Reference, ... FROM t_produits | Id |
| t_sommes | SELECT Id, reference, quantite, prix, affaire FROM t_sommes | Id |
Workflow d'exécution d'un DAG
1. SCHEDULER détecte l'heure d'exécution (cron 0 */3 * * *)
│
▼
2. CRÉATION VIRTUALENV (@task.virtualenv)
├─ Création environnement Python isolé
├─ Installation des dépendances (pandas, sqlalchemy, pyodbc...)
├─ Configuration du PYTHONPATH
└─ Exécution subprocess.run() avec le script ETL
│
▼
3. SCRIPT ETL PRINCIPAL (ETL_GX_main.py / ETL_GSM_main.py)
├─ Initialisation du logging (fichier horodaté)
├─ Chargement de la configuration (config.py)
├─ Établissement des connexions source et cible
└─ Boucle sur chaque table configurée:
│
├─ EXTRACT: extract_table_data()
│ └─ Exécution de la requête SQL (personnalisée ou SELECT *)
│
├─ TRANSFORM: transform_dataframe()
│ ├─ convert_datetime_to_date()
│ ├─ convert_numeric()
│ ├─ rename_column_title()
│ ├─ split_affaire_column() (si applicable)
│ └─ dropna(axis=0, how='all')
│
└─ LOAD: load_table()
├─ Création staging table temporaire (stg_{table}_{uuid})
├─ Insertion des données transformées
├─ Calcul des hash MD5 pour comparaison
├─ Exécution INSERT/UPDATE/DELETE
├─ Logging des métriques (BEFORE/AFTER upsert)
└─ Suppression staging table
│
▼
4. FIN DU VIRTUALENV (nettoyage automatique)
│
▼
5. NOTIFICATION TEAMS
├─ Succès → notify_success_to_teams() → MessageCard verte
└─ Échec → on_failure_callback → MessageCard rouge avec détailsModule ETL Python — Détail des fonctions
Structure du module ETL_Modulaire
/opt/airflow/src/
├── ETL_Modulaire/
│ ├── __init__.py
│ ├── connexion.py # Gestionnaire de connexions multi-bases
│ ├── config.py # Configuration centralisée (tables, requêtes, PK)
│ └── data_manipulation.py # Fonctions ETL (extract, transform, load)
├── ETL_GX_main.py # Point d'entrée DAG GX
├── ETL_GSM_main.py # Point d'entrée DAG GSM
└── Logs/
├── GX/ # Logs ETL GX par jour
└── GSM/ # Logs ETL GSM par jourScript principal ETL (ETL_GX_main.py / ETL_GSM_main.py)
Le script charge la configuration via get_config("GX") (ou"GSM") qui fournit les requêtes personnalisées, la liste des tables, les correspondances source→cible et les clés primaires. Il ouvre les connexions source et cible, puis itère sur chaque table avec un pipeline extract → transform → load :
with source.connect() as cnx_src, target.connect() as cnx_tgt:
for table in tables:
df = extract_table_data(table, cnx_src, custom_queries)
df = transform_dataframe(df, table)
load_table(df, cnx_tgt, correspondance[table], pk_map[...])Les erreurs sont capturées par un try/except global qui log la stack trace et termine le script avec sys.exit(1) pour qu'Airflow déclenche le callback Teams. Logging configuré par jour dans /opt/airflow/src/Logs/GX/.
connexion.py — Gestion des connexions
Trois fabriques de moteurs SQLAlchemy exposées par le module : Connexion_GX() (SQL Server via ODBC Driver 17), Connexion_GSM() et Connexion_SYM_BD_TEST() (MariaDB via PyMySQL). Identifiants chargés depuis .env avec python-dotenv :
def Connexion_GX():
engine = create_engine(
f"mssql+pyodbc://{user}:{pwd}@{host}:{port}/{db}"
"?driver=ODBC+Driver+17+for+SQL+Server"
)
with engine.connect() as cnx:
cnx.execute(text("SELECT 1")) # validation
return engineLes deux connexions MariaDB suivent le même pattern avec le driver mysql+pymysql. Chaque fonction valide la connexion avec un SELECT 1 avant de retourner l'engine.
data_manipulation.py — Fonctions de transformation
1. convert_numeric() — Conversion intelligente des types numériques :
def convert_numeric(dataframe: pd.DataFrame) -> pd.DataFrame:
# Convertit toutes les colonnes numériques du DataFrame en types adaptés (Int64 pour les entiers, float64 sinon)
for column in dataframe.select_dtypes(include=[np.number]).columns:
# Si toutes les valeurs sont entières → Int64 (nullable)
if (dataframe[column].dropna() % 1 == 0).all():
dataframe[column] = dataframe[column].astype('Int64')
else:
dataframe[column] = dataframe[column].astype('float64')
return dataframe2. convert_datetime_to_date() — Conversion des datetime en dates :
def convert_datetime_to_date(dataframe: pd.DataFrame) -> pd.DataFrame:
# Convertit toutes les colonnes de type datetime du DataFrame en simples dates (sans l'heure)
for column in dataframe.columns:
if pd.api.types.is_datetime64_any_dtype(dataframe[column]):
dataframe[column] = dataframe[column].dt.date
return dataframe3. split_affaire_column() — Séparation de la colonne Affaire :
def split_affaire_column(dataframe: pd.DataFrame, nom_colonne: str) -> pd.DataFrame:
# Sépare la colonne donnée en deux nouvelles colonnes : 'N_Affaire' (5 premiers caractères) et 'Designation' (à partir du 8e), puis supprime l'ancienne
valeurs = dataframe[nom_colonne].astype(str)
dataframe['N_Affaire'] = valeurs.str[:5]
dataframe['Designation'] = valeurs.str[7:]
return dataframe.drop(nom_colonne, axis=1)4. replace_dot_with_comma() — Nettoyage des formats numériques européens :
def replace_dot_with_comma(dataframe: pd.DataFrame) -> pd.DataFrame:
# Nettoie les colonnes texte : remplace les virgules par des points pour les nombres,
# supprime les espaces et convertit les valeurs numériques en float quand c'est possible
for column in dataframe.select_dtypes(include='object'):
# Remplace les virgules par des points (format FR → EN)
# Supprime les espaces parasites
# Tente la conversion en float si le pattern correspond
dataframe[column] = dataframe[column].apply(clean_and_convert)
return dataframe5. rename_column_title() — Mise en forme des noms de colonnes :
def rename_column_title(dataframe: pd.DataFrame) -> pd.DataFrame:
# Met en forme les noms de colonnes en capitalisant la première lettre de chaque mot (style "Title Case")
try:
dataframe.columns = [col.title() for col in dataframe.columns]
except Exception as e:
logging.error(f"Erreur lors du renommage des colonnes: {e}")
return dataframe6. extract_table_data() — Extraction des données :
def extract_table_data(table_name: str, engine, custom_queries: dict = {}) -> pd.DataFrame:
# Extrait les données d'une table SQL (ou d'une requête personnalisée) et retourne le résultat sous forme de DataFrame
query = custom_queries.get(table_name, f"SELECT * FROM {table_name}")
dataframe = pd.read_sql(query, engine)
logging.info(f"{len(dataframe)} lignes extraites de {table_name}")
return dataframe7. transform_dataframe() — Pipeline de transformation :
def transform_dataframe(dataframe: pd.DataFrame, table_name: str) -> pd.DataFrame:
# Applique une série de transformations standardisées sur un DataFrame selon la table traitée (nettoyage, conversion, renommage, etc.)
logging.info(f"Transformation de la table {table_name}")
if table_name == 't_br':
dataframe = replace_dot_with_comma(dataframe)
cleaned_dataframe = (
dataframe.pipe(convert_datetime_to_date)
.pipe(convert_numeric)
.pipe(rename_column_title)
)
cleaned_dataframe = cleaned_dataframe.dropna(axis=0, how='all')
if table_name in ('SYMETRIE.dbo.Affaire', 'Affaire'):
cleaned_dataframe = split_affaire_column(cleaned_dataframe, 'N_Affaire')
return cleaned_dataframe8. _compute_row_hash_for_dataframe() — Hashing MD5 pour détection des modifications :
def _compute_row_hash_for_dataframe(dataframe: pd.DataFrame, exclude_column=None) -> pd.Series:
# Génère un hash unique (MD5) pour chaque ligne du DataFrame, en ignorant certaines colonnes si spécifié
if exclude_column is None:
exclude_column = []
columns = [column for column in list(dataframe.columns) if column not in exclude_column]
columns = sorted(columns, key=lambda x: str(x).lower())
def row_hash(row):
# Concaténation des valeurs avec séparateur
values = '|'.join([safe_str(row[c]) for c in columns])
# Hash MD5 hexadécimal
return hashlib.md5(values.encode('utf-8')).hexdigest()
return dataframe.apply(row_hash, axis=1)load_table() — Algorithme de chargement intélligent
Étapes détaillées :
- _prepare_dataframe() — Ajout de la colonne
row_hashMD5 - _create_insert_staging_table() — Création table temporaire
stg_{table}_{uuid} - _create_target_table() — Création table cible si inexistante (DDL auto-généré)
- _check_primary_key_possible() — Vérification unicité de la clé primaire
- _put_primary_key() — Ajout contrainte PK si absente
- _compute_pre_update_metrics() — Calcul des différences :
- Nouvelles lignes (INSERT) : PK présente dans staging, absente dans cible
- Lignes modifiées (UPDATE) : PK identique mais hash différent
- Lignes à supprimer (DELETE) : PK présente dans cible, absente dans staging
- Lignes inchangées : PK et hash identiques
- _execute_update_target_table() — Exécution des opérations SQL :
_insert_rows()—INSERT INTO target SELECT * FROM staging WHERE..._update_rows()—UPDATE target SET ... FROM staging WHERE pk = pk AND hash != hash_delete_rows()—DELETE FROM target WHERE pk NOT IN (SELECT pk FROM staging)
- _compute_post_update_metrics() — Vérification du nombre de lignes final
- _drop_staging_table() — Nettoyage de la table temporaire
Système de notifications Microsoft Teams
Fonctions de notification
Les callbacks postent un MessageCard JSON sur le webhook Teams (rouge en cas d'échec, vert en succès) avec DAG, task, timestamp et lien vers les logs. Exemple du callback d'échec :
def notify_failure_to_teams(context):
payload = {
"@type": "MessageCard",
"themeColor": "FF0000",
"sections": [{
"activityTitle": f"Task Failed: {context['task_instance'].task_id}",
"facts": [
{"name": "DAG", "value": context['dag'].dag_id},
{"name": "Log URL", "value": context['task_instance'].log_url},
],
}],
}
requests.post(TEAMS_WEBHOOK_URL, json=payload)notify_success_to_teams() suit la même structure avec themeColor: "00FF00" et un message de confirmation.
Intégration dans les DAGs
on_failure_callback=notify_failure_to_teams— Au niveau du DAG (toutes les tasks)notify_success— PythonOperator en fin de chaîne (etl_task >> notify_success)
Métriques et monitoring
Logs ETL (exemple réel)
2026-01-15 08:09:22,531 | INFO | Début du script ETL GX. 2026-01-15 08:09:22,722 | INFO | --- Debut traitement pour Activite --- 2026-01-15 08:09:22,791 | INFO | 106 lignes extraites de Activite 2026-01-15 08:09:22,791 | INFO | Transformation de la table Activite 2026-01-15 08:09:22,816 | INFO | [activite] début load staging `stg_activite_98ac4126` 2026-01-15 08:09:22,990 | INFO | [stg_activite_98ac4126] staging créé avec 106 lignes. 2026-01-15 08:09:22,999 | INFO | [stg_activite_98ac4126] 0 NULL(s) et 0 doublon(s) détectés. 2026-01-15 08:09:23,060 | INFO | [stg_activite_98ac4126] clé primaire ajoutée sur `N_Activites`. 2026-01-15 08:09:23,098 | INFO | [activite] BEFORE upsert: staging=106, target=106, matching=106, to_insert=0, to_update=0, to_delete=0 2026-01-15 08:09:23,103 | INFO | [activite] insert : 0 nouvelles lignes insérées. 2026-01-15 08:09:23,110 | INFO | [activite] update : 0 lignes modifiées. 2026-01-15 08:09:23,116 | INFO | [activite] delete : 0 lignes supprimées. 2026-01-15 08:09:23,124 | INFO | [activite] AFTER upsert: total=106 2026-01-15 08:09:23,148 | INFO | [activite] staging `stg_activite_98ac4126` supprimée. 2026-01-15 08:09:23,148 | INFO | [activite] load terminé. 2026-01-15 08:09:23,149 | INFO | --- Fin traitement pour Activite --- 2026-01-15 08:09:23,149 | INFO | --- Debut traitement pour Affaire --- 2026-01-15 08:09:23,759 | INFO | 5844 lignes extraites de Affaire 2026-01-15 08:09:23,761 | INFO | Transformation de la table Affaire 2026-01-15 08:09:24,147 | INFO | [affaire] début load staging `stg_affaire_78e5a9b9` 2026-01-15 08:09:35,968 | INFO | [stg_affaire_78e5a9b9] staging créé avec 5844 lignes. 2026-01-15 08:09:35,995 | INFO | [stg_affaire_78e5a9b9] 0 NULL(s) et 0 doublon(s) détectés. 2026-01-15 08:09:36,404 | INFO | [stg_affaire_78e5a9b9] clé primaire ajoutée sur `N_Affaire`. 2026-01-15 08:09:36,630 | INFO | [affaire] BEFORE upsert: staging=5844, target=5844, matching=5844, to_insert=0, to_update=3, to_delete=0 2026-01-15 08:09:36,658 | INFO | [affaire] insert : 0 nouvelles lignes insérées. 2026-01-15 08:09:36,699 | INFO | [affaire] update : 3 lignes modifiées. 2026-01-15 08:09:36,745 | INFO | [affaire] delete : 0 lignes supprimées. 2026-01-15 08:09:36,756 | INFO | [affaire] AFTER upsert: total=5844 2026-01-15 08:09:36,805 | INFO | [affaire] staging supprimée. 2026-01-15 08:09:36,806 | INFO | [affaire] load terminé. 2026-01-15 08:09:36,808 | INFO | --- Fin traitement pour Affaire --- 2026-01-15 08:09:36,809 | INFO | --- Debut traitement pour AFFAIRE_VUE_LISTE_HEURES --- 2026-01-15 08:09:58,956 | INFO | 163161 lignes extraites de AFFAIRE_VUE_LISTE_HEURES 2026-01-15 08:11:17,284 | INFO | [heures] BEFORE upsert: staging=163161, target=163156, matching=163156, to_insert=5, to_update=0, to_delete=0 2026-01-15 08:11:17,762 | INFO | [heures] insert : 5 nouvelles lignes insérées. 2026-01-15 08:11:18,938 | INFO | [heures] load terminé. 2026-01-15 08:11:18,959 | INFO | --- Fin traitement pour AFFAIRE_VUE_LISTE_HEURES ---
KPIs du projet
| Métrique | Valeur |
|---|---|
| Tables synchronisées | 35 tables métier (27 GX + 8 GSM) |
| Fréquence d'exécution | 8 fois/jour (toutes les 3h) |
| Temps moyen par DAG GX | ~2-3 minutes (27 tables) |
| Volume par exécution GX | ~180 000+ lignes traitées |
| Plus grosse table | AFFAIRE_VUE_LISTE_HEURES (163 161 lignes) |
| Taux de succès | >99% |
Compétences techniques mobilisées
| Domaine | Technologies / Compétences |
|---|---|
| Data Engineering | Conception pipelines ETL, modélisation données, synchronisation incrémentale |
| Python | Pandas, SQLAlchemy, hashlib, logging, modules réutilisables |
| SQL | Requêtes complexes, jointures, UPSERT, transactions, DDL dynamique |
| Bases de données | SQL Server, MariaDB/MySQL, PostgreSQL, drivers ODBC |
| DevOps | Docker, Docker Compose, volumes, réseaux |
| Orchestration | Apache Airflow (DAGs, Operators, Scheduling, Callbacks) |
| Intégration | APIs REST, Webhooks Microsoft Teams, MessageCards |
| Administration | Configuration ODBC, gestion des drivers, permissions Linux |