默认在airflow中每个task都是独⽴的进程,⽆法进⾏数据交换,但airflow还提供了⼀个XCom功能,以满⾜⽤户的类似需求 下⾯我们创建两个dag,其中⼀个push数据,⼀个pull数据,如下的dag中的task(push)执⾏完后会推送⼀条数据到xcom表,key=push ,value=True dag = DAG(
四象限探测器dag_id='migrate_mongo', default_args=args,
schedule_interval='0 17 * * *',
)
def push(**kwargs):
kwargs['ti'].xcom_push(key='status', value=True)
task1 = PythonOperator(task_id='push',
python_callable=push,
provide_context=True,
dag=dag)
心兽另起⼀个dag,创建⼀个task(pull)可以将上⾯的dag中的数据获取到
dag = DAG(
dag_id='migrate_mongo2',
default_args=args,
schedule_interval='0 17 * * *',刘功臣
catchup=False
)
reason模型def pull(**kwargs):
execution_date = kwargs['execution_date']
v1 = _one(dag_id='migrate_mongo', task_id='push', execution_date=execution_date)
print(v1)
task2 = PythonOperator(task_id='pull',
python_callable=pull,
provide_context=True,
dag=dag)
热传导方程两个dag之间其实通过airflow底层的⼀张xcom表进⾏数据交换的