-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Callable deserialization causing error with DatabricksExecutionTrigger #64609
Description
Apache Airflow version
Reported on 2.10.5 but could happen on any version.
What happened and how to reproduce it?
This is an issue specific to the Databricks provider.
The retry_args passed to the DatabricksExecutionTrigger is a dictionary that may contain callables in its values. Triggers are serialized and stored in the Airflow DB, then deserialized for execution. Airflow does not support proper serialization/ deserialization of callables passed as arguments to triggers. In this case, the arguments are deserialized as strings. If a string object is treated as a function and called, an exception will be raised:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers
result = details["task"].result()
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py", line 630, in run_trigger
async for event in trigger.run():
File "/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/triggers/databricks.py", line 88, in run
run_state = await self.hook.a_get_run_state(self.run_id)
File "/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks.py", line 450, in a_get_run_state
response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
File "/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 730, in _a_do_api_call
async for attempt in self._a_get_retry_object():
File "/usr/local/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 166, in __anext__
do = await self.iter(retry_state=self._retry_state)
File "/usr/local/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 153, in iter
result = await action(retry_state)
File "/usr/local/lib/python3.10/site-packages/tenacity/asyncio/__init__.py", line 135, in _run_wait
sleep = await _utils.wrap_to_async_func(self.wait)(retry_state)
File "/usr/local/lib/python3.10/site-packages/tenacity/_utils.py", line 99, in inner
return call(*args, **kwargs)
TypeError: 'str' object is not callable
This results in a task failure if Airflow attempts to retry a call to the Databricks API.
This could be reproduced by passing a custom retry_args dictionary containing a function as one of its values to the DatabricksSubmitRunOperator in a deferrable task. I observed this issue when these values were passed:
{
"wait": wait_incrementing(start=30, increment=30, max=300),
"stop": stop_after_attempt(self.databricks_retry_limit),
"reraise": True
}
You would also have to make sure that the first call to the Databricks API fails so that it can be retried in order to reproduce this. In the case that I observed, the first try was failing intermittently (possibly due to a networking blip) and subsequent retries were failing with a TypeError due to the custom retry_args. When the custom retry_args were removed, the API calls could retry successfully.
What you think should happen instead?
Serialization/ deserialization of callables in triggers is known to be unsupported in Airflow. This should not result in unexpected failures for deferrable tasks. There are two options for addressing this:
- Pass the callable path to the trigger instead of the callable itself. When the Databricks hook retries the API call, it could import the callable from the path provided to the trigger and run that. The path can be provided as a string, so serializing it should not be an issue.
- If the above option is not feasible, there should be a check in the operator code that raises an exception if custom retry args are passed when deferrable=True. This will at least prevent unexpected failures due to API call retries.
Operating System
No response
Versions of Apache Airflow Providers
This was reported in version 7.0.0 of the Databricks provider but has not been fixed even in the latest version.
Deployment
Astronomer
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct