One minute
Week1038_tip
Tip - Aiflow注解生成Task的易错点
Situation
在使用Airflow配置任务时候,使用了Task group 和 task注解来实现任务配置,使用一组注解task用来检测数据是否完整,完整的话就继续走以后逻辑。
Task
在这样的情况下,本应该检查完数据才进行下一步,结果没有实际检查,因此要查具体原因。
Action
查看日志,实际执行检查会打出相应日志Success criteria met. Exiting.
, 但我的任务日志并没有检查,
任务代码架构如下:
with DAG () as dag:
with TaskGroup(...) as group:
@task
def check_xxx():
S3KeySensor(...)
...
start >> group >> logical_task >> end
找更有经验的同事询问。 为何这样会导致S3Keysor没有执行?告知,可能是bug吧,没遇见过。
找实现过类似功能的代码。 找了别人写的非注解方式实现的代码, 改造此功能后如下:
with DAG () as dag:
check_a = PythonOperator(...)
check_b = PythonOperator(...)
...
start >> [check_a, check_b] >> logical_task >> end
发现是可以正常执行的,能打出相应日志。
对比实现。 对比两个实现版本, 我写的不同点是引入TaskGroup, 和注解 @task, TaskGroup不太可能出问题,因为我有其他这样写的任务没有报错,问题大概率出现在@task上。
验证问题 @task 其实是对PythonOperator简化写法, 去掉@task改成原生模式:
def check_a_callable():
S3KeySensor(...)
def check_b_callable():
S3KeySensor(...)
with DAG () as dag:
def
with TaskGroup(...) as group:
p1 = PythonOperator(... callable=check_axxx)
p2 = ...
...
start >> group >> logical_task >> end
这里其实已经能看出问题了, check_a_callable()中,实际执行的是方法体的 S3KeySenor构造方法,并没有执行S3KeySensor内部的方法。
Result 结果
去官网找S3KeySensor的文档和用法, 发现官网写的很明白, S3KeySensor就是一种Operator, 并不需要对外层再包一个Operator!
所以,最后的写法就是去掉@task :
with DAG () as dag:
with TaskGroup(...) as group:
p1 = S3KeySensor(...)
p2 = S3KeySensor(...)
...
start >> group >> logical_task >> end
Read other posts