调度工具 Airflow

1. 什么是 Airflow

 Airflow 是 Airbnb 开源的 data pipeline 调度和监控工作流的平台,用于用来创建、监控和调整 data pipeline(ETL)。

2. 简单的定时任务 cron

  假设我们想要定时调用一个程序,比如说:每天定时从 Web 抓数据,我们可以使用 cron。cron 是一个 Linux 下的后台服务,用来定期的执行一些任务,在/etc/crontab 中设置后即可,基本写法如下:

1
2
# 分钟 小时 日 月 周 用户  命令 
17 * * * * root date >> /tmp/time.log

  它的意思是每个小时的第 18 分钟,将当前时间写入 log 文件,注意各值的取值范围(分钟 0 - 59,小时 0 - 23,天 1 - 31,月 1 - 12,星期 0 - 6,0 表示星期天)修改/etc/crontab 后,还需要用 $ sudo service cron restart 命令重启 crontab 任务,才能生效。

3. 为什么要用 Airflow

  有了 cron 为什么还需要 airflow?以抓取 web 数据为例,可能在某天抓取数据时,网断或者关机了,当天的数据没抓进来,这种情况下,只能通过写日志定时分析日志,以及在程序中定时重连的方式保证数据完整,相对比较零碎和麻烦。另外,如果 crontab 设置文件中有几十上百条任务时,就比较头疼了。

 Airflow 支持图形界面和命令行两种方式,管理起来比较方便,另外,它可以把几个相互依赖的任务编成一组,并监督执行是否正常,如果不正常,调用程序重试等等。

  当然,Airflow 也不全是优点,比如需要使用 python 脚本来定义任务间的依赖关系,相对于手动编辑 crontab 文件,相对难一些。因此,如果只调用简单的任务,使用 cron 即可,复杂的再考虑 airflow。

4. Airflow 的基础概念

 Airflow 中最基本的两个概念是:DAG 和 task。DAG 的全称是 Directed Acyclic Graph 是所有你想执行的任务的集合,在这个集合中可以定义了他们的依赖关系,一个 DAG object 可以用 Python 脚本中配置完成。每个 DAG object 代表了一个 workflow,每个 workflow 都可以包含任意个 task,task 就是具体的任务。

5. Airflow 安装和使用

(1) 安装 airflow

1
$ sudo pip install airflow 

  可以通过环境变量 AIRFLOW_HOME 设置 airflow 的工作目录,默认为 $HOME/airflow/

(2) Mysql 支持

  如果想使用 mysql 存储 airflow 内容,请按如下方法设置 mysql;如果不设置,airflow 在其工作目录下建立 db 文件,以 sqlite 方式存储。

1
2
3
4
5
6
$ mysql -u root -p

mysql> create database airflow default charset utf8 collate utf8_general_ci;
mysql> create user airflow@'localhost' identified by 'airflow';
mysql> grant all on airflow.* to airflow@'localhost';
mysql> flush privileges;

  修改配置文件 $AIRFLOW_HOME/airflow.cfg,把 sql_alchemy_conn 对应语句替换成:

1
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

(3) 运行

1
2
3
4
$ airflow initdb
$ airflow worker
$ airflow webserver -p 8080 # 一直运行
$ airflow scheduler # 一直运行

  此时在浏览器中输入:http://localhost:8080,即可看到 airflow 界面,其中有很多 demo 可以参考。

(4) 建立第一个 DAG:Hellow world

1
2
$ mkdir $AIRFLOW_HOME/dags/
$ vi $AIRFLOW_HOME/dags/hello_word.py # 内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 # -*- coding: utf-8 -*-

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

default_args = {
'owner': 'yan.xie',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 5, # 重试次数
'retry_delay': timedelta(minutes=1), # 运行间隔时间
}

dag = DAG(
'test_my_dag', # DAG名字
default_args=default_args,
description='my first DAG',
schedule_interval=timedelta(days=1))

task1 = BashOperator(
task_id='task_1', # TASK名
bash_command='date', # 运行命令
dag=dag)

task2 = BashOperator(
task_id='task_2',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)

def print_hello():
return 'Hello world!'

test3 = PythonOperator(
task_id='task_3',
python_callable=print_hello, # 运行python程序
dag=dag)

task2.set_upstream(task1) # 设置依赖关系
test3.set_upstream(task1)

  保存之后,再浏览器刷新一下界面,即可在 list 中看到该 DAG,点 On 后,即可运行。

  点开 DAG 可以看到各 Task 间的依赖关系

  以及树型关系

(5) 调试

  有时候,怕不能一次写对,可以运行以下命令调试单个 Task

1
$ airflow test test_my_dag task_3 20181027

(6) 清除全部 DAG 重置数据库

1
$ airflow resetdb 

  并删除 $AIRFLOW_HOME/dags/ 下所有 DAG 文件,然后重启 webserver。

  在 Airflow 中,如果改了一个 DAG 的名字,它会新建一个 DAG,而不仅是改名,所以旧的 DAG 还在数据库和列表中存在,可以用 “$ airflow delete_dag DAG 名” 的方式删除它,但不是每个 airflow 版本都支持 delete_dag 命令。此时可以只用 resetdb 不删除 dags 目录下文件的方式,删除目录中没有对应文件的 DAG(删除有风险,操作须谨慎)。

6. 参考

(1) Ubuntu 下 crontab 命令的用法

https://www.cnblogs.com/daxian2012/articles/2589894.html

(2) 使用 Airflow 替代你的 crontab

https://www.juhe.cn/news/index/id/2365