Workflow n8n

Automatisation Airflow avec n8n : gestion des exécutions de DAG

Ce workflow n8n a pour objectif de gérer les exécutions de DAG dans Airflow, permettant ainsi une automatisation efficace des processus de données. Dans un contexte où la gestion des flux de travail est cruciale pour les entreprises, ce workflow s'avère particulièrement utile pour les équipes techniques et les data engineers qui souhaitent surveiller et contrôler l'état de leurs tâches automatisées. En intégrant des appels HTTP pour interagir avec l'API d'Airflow, ce workflow permet de vérifier l'état des DAG et de réagir en conséquence, garantissant ainsi une gestion fluide des opérations.

  • Étape 1 : Le workflow débute par un appel HTTP à l'API d'Airflow pour déclencher un DAG.
  • Étape 2 : Ensuite, il vérifie l'état du DAG en cours d'exécution. Si l'état est 'queued', le workflow attend un certain temps avant de vérifier à nouveau.
  • Étape 3 : Si le DAG prend trop de temps à s'exécuter, une erreur est générée.
  • Étape 4 : En cas de succès, le workflow récupère les résultats de l'exécution. Les noeuds 'if' et 'switch' permettent de gérer les différentes conditions et de prendre des décisions basées sur l'état du DAG. Les bénéfices business de ce workflow incluent une réduction des temps d'attente et une meilleure visibilité sur l'état des processus, ce qui permet aux équipes de réagir rapidement en cas de problème. En automatisant la gestion des DAG, les entreprises peuvent améliorer leur efficacité opérationnelle et réduire les risques d'erreurs humaines.
Tags clés :automatisationAirflown8ngestion de donnéesworkflow
Catégorie: Webhook · Tags: automatisation, Airflow, n8n, gestion de données, workflow0

Workflow n8n Airflow, gestion de données : vue d'ensemble

Schéma des nœuds et connexions de ce workflow n8n, généré à partir du JSON n8n.

Workflow n8n Airflow, gestion de données : détail des nœuds

  • Airflow: dag_run

    Ce noeud envoie une requête HTTP pour déclencher un DAG dans Airflow.

  • Airflow: dag_run - state

    Ce noeud effectue une requête HTTP pour obtenir l'état d'un DAG dans Airflow.

  • count

    Ce noeud exécute un code JavaScript pour compter ou traiter des données.

  • dag run fail

    Ce noeud arrête le workflow et génère une erreur avec un message spécifique.

  • if state == queued

    Ce noeud évalue une condition pour vérifier si l'état est 'queued'.

  • dag run wait too long

    Ce noeud arrête le workflow et génère une erreur si le temps d'attente est trop long.

  • Airflow: dag_run - get result

    Ce noeud envoie une requête HTTP pour obtenir le résultat d'un DAG dans Airflow.

  • Switch: state

    Ce noeud permet de diriger le flux en fonction de règles définies sur l'état.

  • in data

    Ce noeud déclenche l'exécution d'un autre workflow avec des entrées spécifiques.

  • Wait

    Ce noeud introduit une pause dans le workflow pour une durée spécifiée.

  • If count > wait_time

    Ce noeud évalue une condition pour vérifier si le compte est supérieur à un temps d'attente donné.

  • airflow-api

    Ce noeud définit des valeurs ou des options à utiliser dans le workflow.

Inscris-toi pour voir l'intégralité du workflow

Inscription gratuite

S'inscrire gratuitementBesoin d'aide ?
{
  "id": "Y5URlIlbX4HDzWKA",
  "meta": {
    "instanceId": "6ae0aa8b6c9099f1f8ed1265281802eab47aaf5b2845f317791e4ac7ee5b7279",
    "templateCredsSetupCompleted": true
  },
  "name": "airflow dag_run",
  "tags": [
    {
      "id": "YSelDQ3zfWB0LeVn",
      "name": "airflow",
      "createdAt": "2025-02-25T04:17:21.943Z",
      "updatedAt": "2025-02-25T04:17:21.943Z"
    }
  ],
  "nodes": [
    {
      "id": "0d4457ef-7a88-4755-8bd2-b0e8148f86f4",
      "name": "Airflow: dag_run",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        380,
        -40
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns",
        "method": "POST",
        "options": {},
        "jsonBody": "={\n  \"conf\": {{ $('in data').item.json.conf }}\n}",
        "sendBody": true,
        "specifyBody": "json",
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "id": "vTR4WWA7czRn2ULn",
          "name": "Airflow"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "acf477a0-aad5-402a-a5a0-543f3e277333",
      "name": "Airflow: dag_run - state",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        840,
        60
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}",
        "options": {},
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "id": "vTR4WWA7czRn2ULn",
          "name": "Airflow"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "26982a6f-6281-4140-a05c-ea6f3f2c0cb2",
      "name": "count",
      "type": "n8n-nodes-base.code",
      "position": [
        1180,
        40
      ],
      "parameters": {
        "jsCode": "try {\n  $('count').first().json.count += 1\n  return {count:$('count').first().json.count};\n}\ncatch (error) {\n  return {count:1};\n}"
      },
      "typeVersion": 2
    },
    {
      "id": "613718f6-ba7e-415c-8e07-0123224e1ac6",
      "name": "dag run fail",
      "type": "n8n-nodes-base.stopAndError",
      "position": [
        1180,
        400
      ],
      "parameters": {
        "errorMessage": "dag run fail"
      },
      "typeVersion": 1
    },
    {
      "id": "66ba0e85-4608-418b-b27b-8cbc50f78319",
      "name": "if state == queued",
      "type": "n8n-nodes-base.if",
      "position": [
        520,
        60
      ],
      "parameters": {
        "options": {},
        "conditions": {
          "options": {
            "version": 2,
            "leftValue": "",
            "caseSensitive": true,
            "typeValidation": "strict"
          },
          "combinator": "and",
          "conditions": [
            {
              "id": "0ae67986-09c0-443d-9262-075bfe94ca2e",
              "operator": {
                "name": "filter.operator.equals",
                "type": "string",
                "operation": "equals"
              },
              "leftValue": "={{ $json.state }}",
              "rightValue": "queued"
            }
          ]
        }
      },
      "typeVersion": 2.2
    },
    {
      "id": "92176e9a-0ea7-48b0-9ca0-ea4ea8442cee",
      "name": "dag run wait too long",
      "type": "n8n-nodes-base.stopAndError",
      "position": [
        1500,
        40
      ],
      "parameters": {
        "errorMessage": "dag run wait too long"
      },
      "typeVersion": 1
    },
    {
      "id": "6a05471f-232e-44d6-9dbb-583400537507",
      "name": "Airflow: dag_run - get result",
      "type": "n8n-nodes-base.httpRequest",
      "position": [
        1180,
        -140
      ],
      "parameters": {
        "url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}/taskInstances/{{ $('in data').item.json.task_id }}/xcomEntries/return_value",
        "options": {},
        "authentication": "genericCredentialType",
        "genericAuthType": "httpBasicAuth"
      },
      "credentials": {
        "httpBasicAuth": {
          "id": "vTR4WWA7czRn2ULn",
          "name": "Airflow"
        }
      },
      "typeVersion": 4.2
    },
    {
      "id": "fb2211d5-cef2-4be2-b281-de315aa07093",
      "name": "Switch: state",
      "type": "n8n-nodes-base.switch",
      "position": [
        980,
        -40
      ],
      "parameters": {
        "rules": {
          "values": [
            {
              "outputKey": "=success",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "4d4ecf8a-c02b-4722-9528-1919cdf9b83e",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "success"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "queued",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "operator": {
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "queued"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "running",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "fa5d8524-334a-4ab1-b543-bba75cafd0ec",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "running"
                  }
                ]
              },
              "renameOutput": true
            },
            {
              "outputKey": "failed",
              "conditions": {
                "options": {
                  "version": 2,
                  "leftValue": "",
                  "caseSensitive": true,
                  "typeValidation": "strict"
                },
                "combinator": "and",
                "conditions": [
                  {
                    "id": "dd853677-c51c-4c06-8680-3c9d1829d6bd",
                    "operator": {
                      "name": "filter.operator.equals",
                      "type": "string",
                      "operation": "equals"
                    },
                    "leftValue": "={{ $json.state }}",
                    "rightValue": "failed"
                  }
                ]
              },
              "renameOutput": true
            }
          ]
        },
        "options": {
          "fallbackOutput": 3
        }
      },
      "typeVersion": 3.2
    },
    {
      "id": "5941496a-a64d-432c-9103-e7bcae4b8d67",
      "name": "in data",
      "type": "n8n-nodes-base.executeWorkflowTrigger",
      "position": [
        100,
        -40
      ],
      "parameters": {
        "workflowInputs": {
          "values": [
            {
              "name": "dag_id"
            },
            {
              "name": "task_id"
            },
            {
              "name": "conf"
            },
            {
              "name": "wait",
              "type": "number"
            },
            {
              "name": "wait_time",
              "type": "number"
            }
          ]
        }
      },
      "typeVersion": 1.1
    },
    {
      "id": "e77fed4a-b61a-4126-8be3-43ef8a832cfe",
      "name": "Wait",
      "type": "n8n-nodes-base.wait",
      "position": [
        700,
        -40
      ],
      "webhookId": "3d164954-2926-4174-afc1-261dfdfa9e46",
      "parameters": {
        "amount": "={{ $('in data').item.json.hasOwnProperty('wait') ? $('in data').item.json.wait : 10 }}"
      },
      "typeVersion": 1.1
    },
    {
      "id": "8ffae842-4400-4667-85bb-50644ef10fc0",
      "name": "If count > wait_time",
      "type": "n8n-nodes-base.if",
      "position": [
        1320,
        140
      ],
      "parameters": {
        "options": {},
        "conditions": {
          "options": {
            "version": 2,
            "leftValue": "",
            "caseSensitive": true,
            "typeValidation": "strict"
          },
          "combinator": "and",
          "conditions": [
            {
              "id": "1829d538-5633-4f5c-ad1b-285b084b35ee",
              "operator": {
                "type": "number",
                "operation": "gt"
              },
              "leftValue": "={{ $json.count }}",
              "rightValue": "={{ $('in data').item.json.hasOwnProperty('wait_time') ? $('in data').item.json.wait_time : 12 }}"
            }
          ]
        }
      },
      "typeVersion": 2.2
    },
    {
      "id": "c49d4a1b-6f25-471b-9c21-d3defb709dda",
      "name": "airflow-api",
      "type": "n8n-nodes-base.set",
      "position": [
        240,
        60
      ],
      "parameters": {
        "options": {},
        "assignments": {
          "assignments": [
            {
              "id": "40a5ab5b-dee1-44c4-910a-d6b85af75aed",
              "name": "prefix",
              "type": "string",
              "value": "https://airflow.example.com"
            }
          ]
        }
      },
      "typeVersion": 3.4
    }
  ],
  "active": false,
  "pinData": {
    "in data": [
      {
        "json": {
          "conf": "{\n  \"image\": \"nginx\",\n  \"tag\": \"latest\",\n  \"tag_requested\": 1000\n}",
          "wait": 10,
          "dag_id": "image_tag_related",
          "task_id": "image_tag_related",
          "wait_time": 18
        }
      }
    ]
  },
  "settings": {
    "executionOrder": "v1"
  },
  "versionId": "57fdbcfc-7950-4aff-9ac7-3c2a99a2c8c8",
  "connections": {
    "Wait": {
      "main": [
        [
          {
            "node": "Airflow: dag_run - state",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "count": {
      "main": [
        [
          {
            "node": "If count > wait_time",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "in data": {
      "main": [
        [
          {
            "node": "airflow-api",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "airflow-api": {
      "main": [
        [
          {
            "node": "Airflow: dag_run",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Switch: state": {
      "main": [
        [
          {
            "node": "Airflow: dag_run - get result",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "count",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "count",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "dag run fail",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run": {
      "main": [
        [
          {
            "node": "if state == queued",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "if state == queued": {
      "main": [
        [
          {
            "node": "Wait",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "dag run fail",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "If count > wait_time": {
      "main": [
        [
          {
            "node": "dag run wait too long",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Wait",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run - state": {
      "main": [
        [
          {
            "node": "Switch: state",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Airflow: dag_run - get result": {
      "main": [
        []
      ]
    }
  }
}

Workflow n8n Airflow, gestion de données : pour qui est ce workflow ?

Ce workflow s'adresse principalement aux équipes techniques, aux data engineers et aux responsables de la gestion des données au sein des entreprises. Il est conçu pour les organisations qui utilisent Airflow pour orchestrer leurs flux de travail et qui cherchent à automatiser la surveillance et la gestion de leurs DAG. Un niveau technique intermédiaire est recommandé pour une mise en œuvre efficace.

Workflow n8n Airflow, gestion de données : problème résolu

Ce workflow résout le problème de la gestion manuelle des exécutions de DAG dans Airflow, qui peut être source de frustration et de perte de temps. En automatisant le suivi de l'état des DAG, il élimine les risques d'erreurs humaines et permet une réaction rapide en cas de problème. Les utilisateurs bénéficient d'une meilleure visibilité sur leurs processus, ce qui leur permet d'optimiser leur temps et de se concentrer sur des tâches à plus forte valeur ajoutée.

Workflow n8n Airflow, gestion de données : étapes du workflow

Étape 1 : Le workflow commence par un appel HTTP à l'API d'Airflow pour déclencher un DAG.

  • Étape 1 : Il vérifie ensuite l'état du DAG en cours d'exécution.
  • Étape 2 : Si l'état est 'queued', le workflow attend un certain temps avant de vérifier à nouveau.
  • Étape 3 : Si le DAG prend trop de temps, une erreur est générée.
  • Étape 4 : En cas de succès, le workflow récupère les résultats de l'exécution. Les noeuds conditionnels permettent de gérer les différentes branches du flux en fonction de l'état du DAG.

Workflow n8n Airflow, gestion de données : guide de personnalisation

Pour personnaliser ce workflow, vous pouvez modifier l'URL de l'API d'Airflow dans les noeuds HTTP. Assurez-vous également d'ajuster les conditions dans les noeuds 'if' et 'switch' selon vos besoins spécifiques. Si vous souhaitez intégrer d'autres outils, vous pouvez ajouter des noeuds supplémentaires pour interagir avec des services tiers. Pensez à sécuriser vos appels API en utilisant des méthodes d'authentification appropriées et à surveiller les performances du workflow pour garantir son bon fonctionnement.