Python使用Airflow实现自动化任务调度的构建方式【教程】

admin 百科 12
Airflow通过DAG文件定义任务调度,需满足文件命名、全局dag变量、必要导入等要求;用PythonOperator封装函数,设置依赖、重试、触发规则及敏感参数管理;支持本地调试与生产部署。

Python使用Airflow实现自动化任务调度的构建方式【教程】-第1张图片-佛山资讯网

用 Airflow 实现 Python 任务的自动化调度,核心是把业务逻辑封装成可被 Airflow 管理的 Operator,再通过 DAG 定义执行顺序、触发条件和重试策略。它不直接运行脚本,而是调度“任务实例”,靠 Scheduler 和 Executor 协同驱动。

定义一个基础 DAG 文件

DAG 是 Airflow 的调度蓝图,本质是一个 Python 文件(通常放在 dags/ 目录下),需满足几个硬性要求:

  • 文件名不能含空格或特殊字符,推荐小写加下划线(如 etl_daily_job.py
  • 必须包含一个全局变量 dag = DAG(...),且变量名固定为 dag
  • 需导入必要模块:from airflow import DAGfrom airflow.operators.python import PythonOperator
  • DAG 参数中 schedule_interval(新版推荐用 schedule)决定触发频率,支持 cron 表达式(如 "0 2 * * *" 表示每天凌晨2点)或 timedelta(如 timedelta(days=1)

用 PythonOperator 封装你的函数

这是最常用的方式,适合已有现成的 Python 函数。Airflow 会在任务运行时调用它,并自动传入上下文(**context):

  • 函数本身不能带括号调用,只写函数名(例如 task1 = PythonOperator(task_id='run_clean', python_callable=clean_data)
  • 若需传参,用 op_kwargs 字典(如 op_kwargs={"table": "users", "days_back": 7}),函数签名要匹配
  • 函数返回值默认被序列化进 XCom,供下游任务读取(用 context["ti"].xcom_pull(task_ids="upstream_task")
  • 避免在函数里写长时间阻塞操作(如 time.sleep(300)),应拆成多个短任务或改用 TimeDeltaSensor 等传感器

设置依赖关系与容错机制

任务不是孤立运行的,DAG 要明确谁先谁后、失败怎么处理:

标签: word python 编码 ai stream kubernetes

发布评论 0条评论)

还木有评论哦,快来抢沙发吧~