Skip to content

Commit ad52a76

Browse files
committed
elasticapm: properly cleanup buffer and data views
With Python 3.13 our pattern of buffering revealed some issues because the underlying BytesIO fileobj may get released before gzip.GzipFile. This requires a fix in CPython but also some improvements on our side by properly closing the GzipFile in case of error and also releasing the memoryview we can from the BytesIO buffer. These problems manifests as following warnings from unraisable exceptions running tests: The closing of the gzip buffer helps with: Traceback (most recent call last): File "/usr/lib/python3.13/gzip.py", line 362, in close fileobj.write(self.compress.flush()) ~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^ ValueError: I/O operation on closed file. Exception ignored in: <_io.BytesIO object at 0x7fbc4335fbf0> Traceback (most recent call last): File "/venv313/lib/python3.13/site-packages/ecs_logging/_stdlib.py", line 272, in _record_attribute def _record_attribute( BufferError: Existing exports of data: object cannot be re-sized Python 3.12 shows the same warnings with the `-X dev` flag.
1 parent 0f70a26 commit ad52a76

File tree

2 files changed

+15
-5
lines changed

2 files changed

+15
-5
lines changed

elasticapm/transport/base.py

+3
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ def _flush(self, buffer, forced_flush=False) -> None:
250250
"""
251251
if not self.state.should_try():
252252
logger.error("dropping flushed data due to transport failure back-off")
253+
buffer.close()
253254
else:
254255
fileobj = buffer.fileobj # get a reference to the fileobj before closing the gzip file
255256
buffer.close()
@@ -261,6 +262,8 @@ def _flush(self, buffer, forced_flush=False) -> None:
261262
except Exception as e:
262263
self.handle_transport_fail(e)
263264

265+
data.release()
266+
264267
def start_thread(self, pid=None) -> None:
265268
super(Transport, self).start_thread(pid=pid)
266269
if (not self._thread or self.pid != self._thread.pid) and not self._closed:

tests/transports/test_base.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,25 @@ def test_empty_queue_flush(mock_send, elasticapm_client):
107107
transport.close()
108108

109109

110-
@mock.patch("elasticapm.transport.base.Transport.send")
110+
@mock.patch("elasticapm.transport.base.Transport._flush")
111111
@pytest.mark.parametrize("elasticapm_client", [{"api_request_time": "5s"}], indirect=True)
112-
def test_metadata_prepended(mock_send, elasticapm_client):
112+
def test_metadata_prepended(mock_flush, elasticapm_client):
113113
transport = Transport(client=elasticapm_client, compress_level=0)
114114
transport.start_thread()
115115
transport.queue("error", {}, flush=True)
116116
transport.close()
117-
assert mock_send.call_count == 1
118-
args, kwargs = mock_send.call_args
119-
data = gzip.decompress(args[0])
117+
assert mock_flush.call_count == 1
118+
args, kwargs = mock_flush.call_args
119+
buffer = args[0]
120+
# this used to mock send but after we fixed a leak of not releasing a memoryview containing
121+
# the data we cannot read it anymore. Soreimplement _flush and read the data ourselves
122+
fileobj = buffer.fileobj
123+
buffer.close()
124+
compressed_data = fileobj.getbuffer()
125+
data = gzip.decompress(compressed_data)
120126
data = data.decode("utf-8").split("\n")
121127
assert "metadata" in data[0]
128+
compressed_data.release()
122129

123130

124131
@mock.patch("elasticapm.transport.base.Transport.send")

0 commit comments

Comments
 (0)