fix triggerer logger's file descriptor closed when it removed#62103
fix triggerer logger's file descriptor closed when it removed#62103wjddn279 wants to merge 1 commit intoapache:mainfrom
Conversation
|
Can you add a test to avoid regression? |
|
sure. I'll add it |
|
@eladkal Done! |
amoghrajesh
left a comment
There was a problem hiding this comment.
Thanks for taking this on, similar problem for dag processor was solved in #47574
| self.bound_logger = logger | ||
| return logger | ||
|
|
||
| def __del__(self): |
There was a problem hiding this comment.
The one concern I mainly have with using del is that if an exception occurs during cleanup, it will quietly exit
|
|
||
| if file_handle and not file_handle.closed: | ||
| file_handle.flush() | ||
| file_handle.close() |
There was a problem hiding this comment.
Can we handle it similar to how it is done for DAG processor: #47574
In short something like this, where we store the handler and clear it to avoid diversion from that approach?
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py (revision 10cd08dff8916b93f8c3f94bc34265bb7544fde4)
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py (date 1771510381992)
@@ -30,7 +30,7 @@
from datetime import datetime
from socket import socket
from traceback import format_exception
-from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
+from typing import IO, TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
import anyio
import attrs
@@ -302,6 +302,8 @@
bound_logger: WrappedLogger = attrs.field(init=False, repr=False)
+ _filehandle: IO[Any] = attrs.field(init=False, repr=False)
+
def __call__(self, processors: Iterable[structlog.typing.Processor]) -> WrappedLogger:
if hasattr(self, "bound_logger"):
return self.bound_logger
@@ -312,13 +314,20 @@
pretty_logs = False
if pretty_logs:
- underlying_logger: WrappedLogger = structlog.WriteLogger(log_file.open("w", buffering=1))
+ self._filehandle = log_file.open("w", buffering=1)
+ underlying_logger: WrappedLogger = structlog.WriteLogger(self._filehandle)
else:
- underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+ self._filehandle = log_file.open("wb")
+ underlying_logger = structlog.BytesLogger(self._filehandle)
logger = structlog.wrap_logger(underlying_logger, processors=processors).bind()
self.bound_logger = logger
return logger
+ def close(self):
+ """Explicitly close the underlying log file handle."""
+ if hasattr(self, "_filehandle"):
+ self._filehandle.close()
+
def upload_to_remote(self):
from airflow.sdk.log import upload_to_remote
@@ -421,10 +430,9 @@
for id in msg.finished or ():
self.running_triggers.discard(id)
self.cancelling_triggers.discard(id)
# Remove logger from the cache, and since structlog doesn't have an explicit close method, we
# only need to remove the last reference to it to close the open FH
if factory := self.logger_cache.pop(id, None):
factory.upload_to_remote()
+ factory.close()
response = messages.TriggerStateSync(
to_create=[],
There was a problem hiding this comment.
In short something like this, where we store the handler and clear it to avoid diversion from that approach?
I agree of this and it looks better! Thanks!
But if we separate it with such an explicit method close, that method must always be called after upload_to_remote is invoked — which could lead to mistakes in future development.
What do you think about doing a try-catch in __del__? I actually considered that beforehand too, but I was worried it could cause a silent fd leak if it fails.
06c01e3 to
ec1c806
Compare
ec1c806 to
505d5eb
Compare
|
Can I get more review about this? |
| # Explicitly close the file descriptor when the logger is garbage collected. | ||
| if hasattr(self, "_filehandle") and self._filehandle: | ||
| self._filehandle.close() | ||
|
|
There was a problem hiding this comment.
__del__ is not guaranteed to run promptly (or at all during interpreter shutdown / reference cycles), and exceptions inside it are silently swallowed. This is the exact same class of problem the DAG processor had, solved in #47574 (by @tirkarthi ) with an explicit close() method + explicit call at the cleanup site.
I'd prefer we follow the same pattern here for consistency and reliability:
| def close(self): | |
| """Explicitly close the underlying log file handle.""" | |
| if hasattr(self, "_filehandle") and self._filehandle and not self._filehandle.closed: | |
| self._filehandle.close() |
And then call factory.close() after factory.upload_to_remote() in _handle_request.
If you want to keep __del__ as a safety net that's fine, but the primary cleanup path should be the explicit call.
| self._filehandle.close() | ||
|
|
||
| def upload_to_remote(self): | ||
| from airflow.sdk.log import upload_to_remote |
There was a problem hiding this comment.
Should also add a not self._filehandle.closed guard here — if close() is ever called explicitly before GC runs __del__, this would attempt to double-close. It's a no-op for CPython files but the guard is good practice.
| trigger_runner_supervisor._service_subprocess(0.1) | ||
|
|
||
| mock_file.close.assert_called_once() | ||
|
|
There was a problem hiding this comment.
This assertion depends on __del__ being triggered by GC during the loop, which is non-deterministic across Python implementations. If you switch to an explicit close() method (called from _handle_request), this test becomes deterministic — you're verifying the cleanup lifecycle, not GC timing.
|
One more thing — the comment on this line says "since structlog doesn't have an explicit close method, we only need to remove the last reference to it to close the open FH". This PR exists because that assumption was wrong. Please update the comment to reflect the new behavior once you add the explicit |
closed: #61916
An issue was reported where file descriptors were not being properly closed when the logger was destroyed in the existing triggerer.
As shown below, the number of open file descriptors was continuously increasing before this fix. After applying the logic in this PR, file descriptors are now properly closed and the count decreases as expected.
Additionally, we confirmed through added logging that the logger is properly garbage collected after
upload_to_remoteis called, triggering the destructor as expected.AS-IS (3.1.7)
TO-BE (patched)
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.