airflow dag run

该工作流通过调用 Apache Airflow 的 REST API,自动触发和监控指定的 DAG 运行,实时获取任务执行结果。它内置状态判断和超时机制,能够智能处理不同状态,确保工作流的稳定性和可控性。适合需要远程触发和监控数据管道任务的场景,提高工作效率,减少人为干预,保证任务流程的顺利进行。

流程图
airflow dag_run 工作流程图

工作流名称

airflow dag_run

主要功能和亮点

该工作流通过调用 Apache Airflow 的 REST API,实现自动触发指定 DAG(有向无环图)运行,并实时监控其运行状态,最终获取任务执行结果。它内置了状态判断和超时机制,能够智能处理排队、运行中、成功及失败等多种状态,确保工作流运行的稳定性和可控性。

解决的核心问题

自动化触发和监控 Airflow DAG 运行过程,解决了手动操作繁琐、状态跟踪困难及异常处理不及时的问题。尤其适合需要远程或集成式调用 Airflow 任务并确保任务执行结果及时反馈的场景。

应用场景

  • 数据工程师需要远程触发和监控数据管道任务
  • 自动化运维触发和检查 Airflow 任务状态
  • 业务系统集成 Airflow 作业执行结果进行后续处理
  • 流程编排平台与 Airflow 的无缝对接

主要流程步骤

  1. 输入参数接收:接收 DAG ID、任务 ID、配置参数(conf)、等待间隔(wait)及最大等待时间(wait_time)等输入。
  2. 设置 Airflow API 地址:配置 Airflow 服务器 API 前缀地址。
  3. 触发 DAG 运行:通过 HTTP POST 请求调用 Airflow API,启动指定 DAG。
  4. 判断 DAG 状态:根据返回的 DAG 运行状态,进入不同分支处理:
    • 如果状态为 queued,等待指定时间后继续轮询;
    • 如果状态为 running,同样继续轮询;
    • 如果状态为 success,获取任务执行结果;
    • 如果状态为 failed,立即停止并报错。
  5. 超时机制:维护轮询计数,超过最大等待时间则停止运行并报错。
  6. 返回结果:成功时获取并输出指定任务的 XCom 返回值。

涉及的系统或服务

  • Apache Airflow:通过其 REST API 进行 DAG 运行和状态查询。
  • n8n:作为自动化工作流引擎,调度和控制整个流程。

适用人群或使用价值

适用于数据工程师、DevOps 工程师及自动化平台开发者,帮助他们实现 Airflow 任务的自动化触发和状态监控,提升工作效率,减少人为干预,保证数据管道和任务流程的稳定运行。尤其适合需要将 Airflow 作业集成到更大自动化生态中的技术团队。