Getting XComs from cleared tasks in Airflow
I have slack alerts for when tasks fail, but I also want to have recovery messages as well.
When the task initially fails, in its
on_failure_callback it does an
xcom_push. What I save here is available in the next DAG run using:
context['ti'].xcom_pull(key='my_task_state', task_ids=context['task'].task_id, include_prior_dates=True)
However, if I clear the failed task so that it re-runs, in its
on_success_callback I try this to get the value I saved in the initial attempt:
context['ti'].xcom_pull(key='my_task_state', task_ids=context['task'].task_id, include_prior_dates=False)
None. If I set
include_prior_dates=True it’ll return the value from the previous DAG run, but not the current one where the task was cleared.
Am I doing something wrong, or is there a workaround that I can use to get the XCom value I’m looking for?
In my understanding:
Xcom is designed to for inter-exchange message between tasks. And Xcom status is depends on the task instance. If the task instance is cleared(delted), the xcom history info belong to this instance will be deleted as well.
That's why you get
none when include_prior_dates=False, (task instance deleted, there is no such xcom record)
last dag info when include_prior_dates=True,(task instance delted, but another dags tasks instance xcome was poped(most recented).
Here is show case by default example_xcom:
- clear one task status:
Thats' why your program get none.
if you find the answer is help, pls vote it up. Thanks