-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Asset consumer DAG "0 of N assets updated" counter does not update on first-ever run #64616
Description
Apache Airflow version
3.1.7+astro.2
What happened and how to reproduce it?
When an asset-scheduled consumer DAG has never run before, the "X of N assets updated" counter in the UI stays at "0 of N" even as individual producer DAGs emit their assets one by one. The counter does not reflect partial updates and it remains 0 until all required assets have emitted and the consumer DAG triggers for the very first time. After the first time consumer dag trigger, the asset counter update works fine.
Reproduction Steps:
- Create a shared
assetsfile and 3producerDAGs and 1consumerDAG as per the code below (assets.py,consumer.pyandproducer_1.py,producer_2.py,producer_3.py)
assets.py:
from airflow.sdk import Asset
asset_1 = Asset(name="asset_1")
asset_2 = Asset(name="asset_2")
asset_3 = Asset(name="asset_3")
consumer.py:
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
from dags.assets import asset_1, asset_2, asset_3
with DAG(
dag_id="consumer",
schedule=[asset_1, asset_2, asset_3],
start_date=datetime(2021, 1, 1),
catchup=False,
):
PythonOperator(task_id="dummy", python_callable=lambda: None)
producer_1.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
from dags.assets import asset_1
with DAG(
dag_id="producer_1",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
PythonOperator(task_id="emit_1", python_callable=lambda: None, outlets=[asset_1])
producer_2.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
from dags.assets import asset_2
with DAG(
dag_id="producer_2",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
PythonOperator(task_id="emit_2", python_callable=lambda: None, outlets=[asset_2])
producer_3.py
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
from dags.assets import asset_3
with DAG(
dag_id="producer_3",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
PythonOperator(task_id="emit_3", python_callable=lambda: None, outlets=[asset_3])
- Deploy all 5 files and wait for them to parse
- Trigger only
producer_1and checkconsumerDAG, counter shows"0 of 3 assets updated" - Trigger
producer_2, counter still shows"0 of 3 assets updated"
- Trigger
producer_3, all 3 assets now emitted,consumerDAG triggers - After the
consumerDAG completes its first run, counter inconsumerdag still shows0 out of 3which is valid as previous asset updation is consumer and no fresh assets are updated at this point in time, now trigger onlyproducer_1again - Counter now correctly shows
"1 of 3 assets updated"
The counter only starts working correctly after step 6, i.e. after the consumer DAG has completed its first ever run.
What you think should happen instead?
The counter should increment as each producer emits its asset, for example "1 of 3 assets updated" after the first producer runs, "2 of 3" after the second, and so on, even on the first run. This is the expected behavior and is indeed what happens correctly on all subsequent runs after the consumerDAG has completed its first successful run.
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
apache-airflow-providers-celery==3.16.0
apache-airflow-providers-common-compat==1.13.1
apache-airflow-providers-common-io==1.7.1
apache-airflow-providers-common-sql==1.31.0
apache-airflow-providers-elasticsearch==6.4.4
apache-airflow-providers-openlineage==2.10.2
apache-airflow-providers-smtp==2.4.2
apache-airflow-providers-standard==1.11.1
Deployment
Astronomer
Deployment details
Astronomer Runtime 3.1-13 based on Apache Airflow 3.1.7+astro.2
Anything else?
This was originally observed on a production deployment where a consumer DAG scheduled on 4 assets was showing "0 of 4 assets updated" even though 3 of the 4 assets had already emitted events. The counter only started working correctly after the consumer DAG completed its first run. The issue is consistently reproducible on a fresh consumer DAG that has never run before.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct