-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Open
Description
Description
When triggering a dag through TriggerDagRunOperator, it would be useful for the dag that initiated the trigger to fetch information produced in the triggered dag .
Example:
@dag()
def child_dag():
@task
def run():
return "hello world"
run()
@dag()
def top_dag():
@task
def postprocess(trigger_task):
context = get_current_context()
child_run_id = context["task_instance"].xcom_pull(task_ids="trigger_child", key="trigger_run_id")
child_dag_return_value = context["task_instance"].xcom_pull(
dag_id="child_dag",
task_id="run",
DESIRED_PARAMETER_TO_SET_RUN_ID=child_run_id # <---------------------------------------
)
assert child_dag_return_value == "hello world"
t_trigger = TriggerDagRunOperator(
task_id="trigger_child",
trigger_dag_id="child_dag",
wait_for_completion=True
)
postprocess(t_trigger)While xcom_pull does allow fetching from a different dag_id, it has no parameter to control the exact run_id, it assumes the execution_date is the same, or with include_prior_dates=True it takes the latest found. (In the example above this doesn't make any difference because the return value is static, in more realistic scenarios, the output is based on parameters passed from the top_dag and need to be in sync)
Use case/motivation
No response
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable