怎么使用 Airflow 调度每日定时执行的 Python 数据分析任务

文章导读
使用 Apache Airflow 调度每日 Python 数据分析任务,核心是通过定义 DAG(有向无环图)文件并配置 schedule 参数实现自动化触发。该方案适合需要任务依赖管理、错误重试及可视化监控的 ETL 或分析场景,但需注意服务器时区配置与执行器资源隔离,避免任务堆积。
📋 目录
  1. 命令速用版
  2. 为什么会这样
  3. 分步处理
  4. 怎么验证是否生效
  5. 常见坑
  6. 常见问题
  7. 参考来源
A A

使用 Apache Airflow 调度每日 Python 数据分析任务,核心是通过定义 DAG(有向无环图)文件并配置 schedule 参数实现自动化触发。该方案适合需要任务依赖管理、错误重试及可视化监控的 ETL 或分析场景,但需注意服务器时区配置与执行器资源隔离,避免任务堆积。

先说结论:Airflow 通过 Python 代码定义工作流,比 crontab 更适合复杂数据分析任务的依赖管理与状态追踪。

  • 适合:需要任务依赖、重试机制及可视化监控的每日 ETL 或分析流程。
  • 先看:确认 Python 环境版本与 Airflow 版本兼容性,建议固定版本号安装。
  • 建议:生产环境使用 CeleryExecutor 或 KubernetesExecutor 分布式执行,避免单点阻塞。

命令速用版

以下是基于 Python 环境快速安装与初始化的核心命令,适用于本地调试或单节点部署:

pip install "apache-airflow==2.10.2" `--constraint` "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.9.txt"
airflow db init
airflow users create `--username` admin `--password` admin `--firstname` Admin `--lastname` User `--role` Admin `--email` admin@example.com
airflow webserver -p 8080 &
airflow scheduler

注意:安装时需指定 constraints 文件以防止依赖冲突,版本号可根据实际需求调整,如 2.5.1 或 2.10.2。

为什么会这样

Airflow 的核心优势在于将任务逻辑抽象为 DAG,由 Scheduler 统一解析并触发执行,解决了脚本间依赖混乱的问题。

传统 crontab 只能按时间触发,无法感知上游任务是否成功,容易导致数据重算或中断。Airflow 的 Scheduler 会检测 DAG 文件更新,将任务实例存入元数据数据库,Executor 根据策略分配资源。公开资料中没有看到可靠的量化数据表明具体性能提升比例,但架构设计支持任务状态回溯与自动重试,降低了人工运维成本。

分步处理

步骤 1:环境准备与安装

使用 Miniconda 创建独立 Python 环境(如 Python 3.8 或 3.11),避免依赖冲突。安装 Airflow 时务必指定版本与约束文件,例如 apache-airflow==2.10.2。

步骤 2:初始化数据库与用户

执行 airflow db init 初始化元数据数据库(默认 SQLite,生产建议 PostgreSQL)。创建管理员用户以便登录 Web UI。

步骤 3:编写 DAG 文件

在 dags/ 目录下创建 Python 文件,文件名不能含空格。必须包含全局变量 dag = DAG(),并设置 schedule 参数(如 '@daily' 或 cron 表达式 '0 2 * * *')。

步骤 4:定义任务与依赖

怎么使用 Airflow 调度每日定时执行的 Python 数据分析任务

使用 PythonOperator 封装数据分析函数,通过 >> 操作符设置任务依赖(如 extract >> transform >> load)。敏感参数通过 Airflow Connections 管理,不要硬编码在代码中。

步骤 5:启动服务

分别启动 webserver 和 scheduler 进程。Web 界面默认运行在 8080 端口。

怎么验证是否生效

登录 Airflow Web UI(http://localhost:8080),在 DAGs 列表页查看任务状态指示灯是否为绿色。

点击具体 DAG 进入 Graph View,确认任务节点依赖关系是否符合预期。查看 Task Instance 日志,确认 Python 函数是否按顺序执行且无报错。若任务未触发,检查 scheduler 日志确认是否解析到最新 DAG 文件。

常见坑

时区问题:start_date 和 schedule 默认使用 UTC 时间,若需本地时间需在配置中设置 default_timezone,否则任务可能在北京时间下午执行。

Catchup 机制:默认 catchup=True 会补跑历史任务,可能导致瞬间大量任务堆积。新建 DAG 建议设置 catchup=False。

XCom 限制:任务间传递数据默认通过 XCom,但底层数据库有大小限制,大数据量应通过文件路径或数据库表传递,不要直接返回大对象。

依赖冲突:不同项目需要不同 Python 版本时,务必使用虚拟环境隔离,不要在全局环境混用包。

常见问题

schedule 参数支持哪些格式?

支持 cron 表达式(如 '0 2 * * *')或预设宏(如 '@daily', '@hourly'),Airflow 2.0+ 推荐使用 schedule 参数替代旧的 schedule_interval。

任务失败后如何自动重试?

在 default_args 中设置 retries=3 和 retry_delay=timedelta(minutes=5),任务失败后会自动按指定间隔重试指定次数。

如何在任务中获取数据库密码?

不要在代码中硬编码密码,应在 Web UI 配置 Connections,代码中通过 BaseHook.get_connection('conn_id').password 获取。

参考来源

  • 使用 Apache Airflow 调度 Python 数据分析任务
  • Apache Airflow 定时任务完整指南:schedule 参数从入门到生产
  • Python 任务调度应用:Airflow
  • Python 使用 Airflow 实现自动化任务调度的构建方式【教程】