-
Apache Airflow versionOther Airflow 2 version (please specify below) If "Other Airflow 2 version" selected, which one?2.5.3 What happened?Is this the intended behaviour? It seems that airflow clones the task group object which is different from regular python semantic, leading to potentially hidden bugs. I got bitten by it and spent a good few hours before realizing. I can try newer airflow versions too, if requested. class RunnerTaskGroup(TaskGroup):
def __init__(self, affinity: str):
super().__init__()
self.rand_attribute = True
@task(task_group=self)
def some_task():
if random.random() < 0.5:
self.rand_attribute = False # 50% chance to set this to False
(
some_task
)
# .... in main dag .....
affinities: list[str]= ["""...some list where each item will create a TaskGroup..."""]
with DAG(dag_id="some_dag"):
@task
def some_task(_groups: list[str]):
for group in _groups:
print(f"{group.rand_attribute=}")
# group.rand_attribute=True <- always TRUE regardless of the number of times I run the DAG and for all the inner TaskGroups
groups = []
for affinity in affinities:
runner_tg = RunnerTaskGroup(affinity)
groups.append(runner_tg)
(
groups >>
some_task(groups)
) What you think should happen instead?Same TaskGroup object is maintained during task group execution and main dag execution. If I were to make these regular python classes, it'd work like that. How to reproducerun the code above Operating SystemUbuntu 22.04.3 LTS Versions of Apache Airflow ProvidersNo response DeploymentDocker-Compose Deployment detailsNo response Anything else?No response Are you willing to submit PR?
Code of Conduct
|
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
If I understand your question correctly, you must understand Airflow architecture. Airflow - by definition - is a distributed task execution engine. This means that every task is a result of parsing the same DAG with completely different Python interpreter (potentially forked multiple time) and potentially on a completely different machine. Each interpreter is run separately and they do not communicate with each other nor share any memory for the "parsed" DAG objects. So yes things are different than running and parsing this single Python file in a single interpreter - if that's what you expected. You can read more about Airflow architecture here: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html BTW. If you are not sure if you have an airflow issue, please create a discussion instead. Those are better suited to asking questions if you understand things wrongly or whether things are a real "issue" in Airflow. Discussion allows you to get feedback before you decide to create an issue. I converted your issue to discussion now, but for the future, please bear it in mind. |
Beta Was this translation helpful? Give feedback.
-
Alright, fair enough about the objects being different due to their distributed nature with the tg being replicated; that makes totally sense (even on a local machine where multiprocessing is probably used to run the task groups and tasks). My issue was related to the fact that the main airflow process never receives the updated attributes back from the task group after execution to sync the object state. I guess my proposal is that some sort of implicit xcom should happen behind the scenes or a warning/exception should be thrown when accessing attributes from the main DAG after their execution. A thir option would be to disallow creating and updating attributes of the task group object via In a larger codebase this kind of behavior can become a foot gun that's easy to miss in reviews (i.e. you assume that the main DAG properly has access to tg.attribute and it's updated during execution, when in fact it's not the same attribute at all, it was just copied during multiprocessing creation and remains to the default value in reality as there's no sync). PS: updated the code a bit to remove some bloat to better observe the potential issue |
Beta Was this translation helpful? Give feedback.
If you have an idea how to do it then of course you can try to implement something like that. But remember that this is a distributed system and in such a system you cannot absolutely rely that everything is done in a single interpreter.
I think what you propose is against the basic architecture assumptions of airflow and you have cluster policies( look it up in the documentation) that handle modification of objects in a way that they can be applied in different interpreters consistently.