airflow简单入门介绍

airflow简单⼊门介绍
1. airflow 介绍
1.1 airflow 是什么
Airflow is a platform to programmatically author, schedule and monitor workflows.
airflow 是⼀个编排、调度监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。airflow 将workflow编排为tasks组成的DAGs,调度器在⼀组workers上按照指定的依赖关系执⾏tasks。同时,airflow 提供了丰富的命令⾏⼯具和简单易⽤的⽤户界⾯以便⽤户查看和操作,并且airflow提供了监控和报警系统。
一氧化氮合酶
1.2 airflow 核⼼概念
1. DAGs:即有向⽆环图(Directed Acyclic Graph),将所有需要运⾏的tasks按照依赖关系组织起来,描述的是所有tasks执⾏的顺
序。
2. Operators:可以简单理解为⼀个class,描述了DAG中⼀个具体的task具体要做的事。其中,airflow内置了很多operators,
如BashOperator 执⾏⼀个bash 命令,PythonOperator 调⽤任意的Python 函数,EmailOperator ⽤于发送邮件,HTTPOperator ⽤于发送HTTP请求, SqlOperator ⽤于执⾏SQL命令...同时,⽤户可以⾃定义Operator,这给⽤户提供了极⼤的便利性。
3. Tasks:Task 是 Operator的⼀个实例,也就是DAGs中的⼀个node。
4. Task Instance:task的⼀次运⾏。task instance 有⾃⼰的状态,包括"running", "success", "failed", "skipped", "up for
retry"等。
5. Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 TaskA >> TaskB,表明TaskB依赖于TaskA。
通过将DAGs和Operators结合起来,⽤户就可以创建各种复杂的 workflow了。
1.3 其它概念
1. Connections: 管理外部系统的连接信息,如外部MySQL、HTTP服务等,连接信息包括conn_id/hostname / login /
password/schema 等,可以通过界⾯查看和管理,编排workflow时,使⽤conn_id 进⾏使⽤。
2. Pools: ⽤来控制tasks执⾏的并⾏数。将⼀个task赋给⼀个指定的pool,并且指明priority_weight,可以⼲涉tasks的执⾏顺序。
3. XComs:在airflow中,operator⼀般(not always)是原⼦的,也就是说,他们⼀般独⽴执⾏,同时也不需要和其他operator共享信
息,如果两个operators需要共享信息,如filename之类的, 推荐将这两个operators组合成⼀个operator。如果实在不能避免,则可以使⽤XComs (cross-communication)来实现。XComs⽤来在不同tasks之间交换信息。
4. Trigger Rules:指task的触发条件。默认情况下是task的直接上游执⾏成功后开始执⾏,airflow允许更复杂的依赖设置,包
括all_success(所有的⽗节点执⾏成功),all_failed(所有⽗节点处于failed或upstream_failed状态),all_done(所有⽗节点执⾏完成),one_failed(⼀旦有⼀个⽗节点执⾏失败就触发,不必等所有⽗节点执
⾏完成),one_success(⼀旦有⼀个⽗节点执⾏成功就触发,不必等所有⽗节点执⾏完成),dummy(依赖关系只是⽤来查看的,可以任意触发)。另外,airflow提供了depends_on_past,设置为True时,只有上⼀次调度成功了,才可以触发。
2. ⽰例
先来看⼀个简单的DAG。图中每个节点表⽰⼀个task,所有tasks组成⼀个DAG,各个tasks之间的依赖关系可以根据节点之间的线看出来。
2.1 实例化DAG
# -*- coding: UTF-8 -*-
## 导⼊airflow需要的modules
from airflow import DAG
from datetime import datetime, timedelta
default_args ={
'owner':'xxx',
'depends_on_past':False,# 如上⽂依赖关系所⽰
'start_date': datetime(2018,1,17),# DAGs都有个参数start_date,表⽰调度器调度的起始时间
'email':['xxxxx@qq'],# ⽤于alert
'email_on_failure':True,
'email_on_retry':False,
'retries':3,# 重试策略
'retry_delay': timedelta(minutes=5)
}
dag = DAG('example-dag', default_args=default_args, schedule_interval='0 0 * * *')
在创建DAGs时,我们可以显⽰的给每个Task传递参数,但通过default_args,我们可以定义⼀个默认参数⽤于创建tasks。
校园记趣注意,schedule_interval 跟官⽅⽂档不⼀致,官⽅⽂档的⽅式已经被deprecated。
2.2 定义依赖关系
这个依赖关系是我⾃⼰定义的,key表⽰某个taskId,value⾥的每个元素也表⽰⼀个taskId,其中,key依赖value⾥的所有task。
"dependencies": {
"goods_sale_2": ["goods_sale_1"], # goods_sale_2 依赖 goods_sale1
"shop_sale_1_2": ["shop_sale_1_1"],
"shop_sale_2_2": ["shop_sale_2_1"],
"shop_sale_2_3": ["shop_sale_2_2"],
"etl_task": ["shop_info", "shop_sale_2_3", "shop_sale_realtime_1", "goods_sale_2", "shop_sale_1_2"],
"goods_sale_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_sale_1_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_sale_realtime_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_sale_2_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_info": ["timelySalesCheck", "productDaySalesCheck"]
}
2.3 定义tasks和依赖关系
⾸先,实例化operators,构造tasks。如代码所⽰,其中,EtlTask、MySQLToWebDataTransfer、MySQLSelector 是⾃定义的三种Operator,根据taskType实例化operator,并存放到taskDict中,便于后期建⽴tasks之间的依赖关系。
for taskConf in tasksConfs:
taskType = ("taskType")
if taskType == "etlTask":
task = EtlTask(
task_("taskId"),
httpConnId=httpConn,
("etlId"),
dag=dag)
("taskId")] = task
elif taskType == "MySQLToWebDataTransfer":
task = MySqlToWebdataTransfer(
task_id = ("taskId"),
郭林新气功sql= ("sql"),
("tableName"),
mysqlConnId =mysqlConn,
httpConnId=httpConn,
dag=dag
)
("taskId")] = task
elif taskType == "MySQLSelect":
task = StatusChecker(
task_id = ("taskId"),
mysqlConnId = mysqlConn,
sql = ("sql"),
dag = dag
)
("taskId")] = task
else:
<("error. TaskType is illegal.")
构建tasks之间的依赖关系,其中,dependencies中定义了上⾯的依赖关系,A >> B 表⽰A是B的⽗节点,相应的,A << B 表⽰A是B的⼦节点。
钢段
for sourceKey in dependencies:
destTask = (sourceKey)
sourceTaskKeys = (sourceKey)
for key in sourceTaskKeys:
sourceTask = (key)
if (sourceTask != None and destTask != None):
sourceTask >> destTask
3. 常⽤命令
命令⾏输⼊airflow -h,得到帮助⽂档
backfill            Run subsections of a DAG for a specified date range
list_tasks          List the tasks within a DAG
clear              Clear a set of task instance, as if they never ran
蛇药片pause              Pause a DAG
unpause            Resume a paused DAG
trigger_dag        Trigger a DAG run
pool                CRUD operations on pools
variables          CRUD operations on variables
kerberos            Start a kerberos ticket renewer
render              Render a task instance's template(s)
run                Run a single task instance
initdb              Initialize the metadata database
list_dags          List all the DAGs
dag_state          Get the status of a dag run
task_failed_deps    Returns the unmet dependencies for a task instance
from the perspective of the scheduler. In other words,
why a task instance doesn't get scheduled and then
queued by the scheduler, and then run by an executor).
task_state          Get the status of a task instance
serve_logs          Serve logs generate by worker
test                Test a task instance. This will run a task without
checking for dependencies or recording it's state in
the database.
webserver          Start a Airflow webserver instance
resetdb            Burn down and rebuild the metadata database
upgradedb          Upgrade the metadata database to latest version
scheduler          Start a scheduler instance
worker              Start a Celery worker node
flower              Start a Celery Flower
version            Show the version约会荷花
connections        List/Add/Delete connections
其中,使⽤较多的是backfill、run、test、webserver、scheduler。其他操作在web界⾯操作更⽅便。另外,initdb ⽤于初始化metadata,使⽤⼀次即可;resetdb会重置metadata,清除掉数据(如connection数据), 需要慎⽤。
4. 问题
在使⽤airflow过程中,曾把DAGs⾥的task拆分得很细,这样的话,如果某个task失败,重跑的代价会⽐较低。但是,在实践中发
现,tasks太多时,airflow在调度tasks会很低效,airflow⼀直处于选择待执⾏的task的过程中,会长时间没有具体task在执⾏,从⽽整体执⾏效率⼤幅降低。
5. 总结
airflow 很好很强⼤。如果只是简单的ETL之类的⼯作,可以很容易的编排。调度灵活,⽽且监控和报警系统完备,可以很⽅便的投⼊⽣产环节。
6. 参阅

本文发布于:2024-09-21 22:56:42,感谢您对本站的认可!

本文链接:https://www.17tex.com/xueshu/200376.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:依赖   关系   信息   监控   调度   节点   系统   定义
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议