Skip to content

Commit c79a0d2

Browse files
committed
chore: limit symdb uploaders under spawn
We use file-based IPC to ensure that Symbol DB has as most 2 active uploader processes under more general circumstances than fork, such as spawn.
1 parent d1b2ce2 commit c79a0d2

File tree

3 files changed

+121
-29
lines changed

3 files changed

+121
-29
lines changed

ddtrace/internal/ipc.py

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -102,32 +102,42 @@ def open_file(path, mode):
102102
class SharedStringFile:
103103
"""A simple shared-file implementation for multiprocess communication."""
104104

105-
def __init__(self) -> None:
106-
self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8)) if TMPDIR is not None else None
105+
def __init__(self, name: typing.Optional[str] = None) -> None:
106+
self.filename: typing.Optional[str] = (
107+
str(TMPDIR / (name or secrets.token_hex(8))) if TMPDIR is not None else None
108+
)
109+
if self.filename is not None:
110+
Path(self.filename).touch(exist_ok=True)
111+
112+
def put_unlocked(self, f: typing.BinaryIO, data: str) -> None:
113+
f.seek(0, os.SEEK_END)
114+
dt = (data + "\x00").encode()
115+
if f.tell() + len(dt) <= MAX_FILE_SIZE:
116+
f.write(dt)
107117

108118
def put(self, data: str) -> None:
109119
"""Put a string into the file."""
110120
if self.filename is None:
111121
return
112122

113123
try:
114-
with open_file(self.filename, "ab") as f, WriteLock(f):
115-
f.seek(0, os.SEEK_END)
116-
dt = (data + "\x00").encode()
117-
if f.tell() + len(dt) <= MAX_FILE_SIZE:
118-
f.write(dt)
124+
with self.lock_exclusive() as f:
125+
self.put_unlocked(f, data)
119126
except Exception: # nosec
120127
pass
121128

129+
def peekall_unlocked(self, f: typing.BinaryIO) -> typing.List[str]:
130+
f.seek(0)
131+
return data.decode().split("\x00") if (data := f.read().strip(b"\x00")) else []
132+
122133
def peekall(self) -> typing.List[str]:
123134
"""Peek at all strings from the file."""
124135
if self.filename is None:
125136
return []
126137

127138
try:
128-
with open_file(self.filename, "r+b") as f, ReadLock(f):
129-
f.seek(0)
130-
return f.read().strip(b"\x00").decode().split("\x00")
139+
with self.lock_shared() as f:
140+
return self.peekall_unlocked(f)
131141
except Exception: # nosec
132142
return []
133143

@@ -137,13 +147,39 @@ def snatchall(self) -> typing.List[str]:
137147
return []
138148

139149
try:
140-
with open_file(self.filename, "r+b") as f, WriteLock(f):
141-
f.seek(0)
142-
strings = f.read().strip(b"\x00").decode().split("\x00")
150+
with self.lock_exclusive() as f:
151+
try:
152+
return self.peekall_unlocked(f)
153+
finally:
154+
self.clear_unlocked(f)
155+
except Exception: # nosec
156+
return []
157+
158+
def clear_unlocked(self, f: typing.BinaryIO) -> None:
159+
f.seek(0)
160+
f.truncate()
143161

144-
f.seek(0)
145-
f.truncate()
162+
def clear(self) -> None:
163+
"""Clear all strings from the file."""
164+
if self.filename is None:
165+
return
146166

147-
return strings
167+
try:
168+
with self.lock_exclusive() as f:
169+
self.clear_unlocked(f)
148170
except Exception: # nosec
149-
return []
171+
pass
172+
173+
@contextmanager
174+
def lock_shared(self):
175+
"""Context manager to acquire a shared/read lock on the file."""
176+
with open_file(self.filename, "rb") as f, ReadLock(f):
177+
yield f
178+
179+
@contextmanager
180+
def lock_exclusive(self):
181+
"""Context manager to acquire an exclusive/write lock on the file."""
182+
if self.filename is None:
183+
return
184+
with open_file(self.filename, "r+b") as f, WriteLock(f):
185+
yield f

ddtrace/internal/symbol_db/remoteconfig.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import typing as t
33

44
from ddtrace.internal.forksafe import has_forked
5+
from ddtrace.internal.ipc import SharedStringFile
56
from ddtrace.internal.logger import get_logger
67
from ddtrace.internal.products import manager as product_manager
78
from ddtrace.internal.remoteconfig import Payload
@@ -18,20 +19,34 @@
1819

1920
log = get_logger(__name__)
2021

22+
# Use a shared file to keep track of which PIDs have Symbol DB enabled. This way
23+
# we can ensure that at most two processes are emitting symbols under a large
24+
# range of scenarios.
25+
shared_pid_file = SharedStringFile(f"{os.getppid()}-symdb-pids")
26+
27+
MAX_CHILD_UPLOADERS = 1 # max one child
28+
2129

2230
def _rc_callback(data: t.Sequence[Payload]):
23-
if get_ancestor_runtime_id() is not None and has_forked():
24-
log.debug("[PID %d] SymDB: Disabling Symbol DB in forked process", os.getpid())
25-
# We assume that forking is being used for spawning child worker
26-
# processes. Therefore, we avoid uploading the same symbols from each
27-
# child process. We restrict the enablement of Symbol DB to just the
28-
# parent process and the first fork child.
29-
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
30-
31-
if SymbolDatabaseUploader.is_installed():
32-
SymbolDatabaseUploader.uninstall()
33-
34-
return
31+
with shared_pid_file.lock_exclusive() as f:
32+
if (get_ancestor_runtime_id() is not None and has_forked()) or len(
33+
set(shared_pid_file.peekall_unlocked(f))
34+
) >= MAX_CHILD_UPLOADERS:
35+
log.debug("[PID %d] SymDB: Disabling Symbol DB in child process", os.getpid())
36+
# We assume that forking is being used for spawning child worker
37+
# processes. Therefore, we avoid uploading the same symbols from each
38+
# child process. We restrict the enablement of Symbol DB to just the
39+
# parent process and the first fork child.
40+
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
41+
42+
if SymbolDatabaseUploader.is_installed():
43+
SymbolDatabaseUploader.uninstall()
44+
45+
return
46+
47+
# Store the PID of the current process so that we know which processes
48+
# have Symbol DB enabled.
49+
shared_pid_file.put_unlocked(f, str(os.getpid()))
3550

3651
for payload in data:
3752
if payload.metadata is None:

tests/internal/symbol_db/test_symbols.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@
1515
from ddtrace.internal.symbol_db.symbols import SymbolType
1616

1717

18+
@pytest.fixture(autouse=True, scope="function")
19+
def pid_file_teardown():
20+
from ddtrace.internal.symbol_db.remoteconfig import shared_pid_file
21+
22+
yield
23+
24+
shared_pid_file.clear()
25+
26+
1827
def test_symbol_from_code():
1928
def foo(a, b, c=None):
2029
loc = 42
@@ -320,3 +329,35 @@ def test_symbols_fork_uploads():
320329

321330
for pid in pids:
322331
os.waitpid(pid, 0)
332+
333+
334+
def spawn_target(results):
335+
from ddtrace.internal.remoteconfig import ConfigMetadata
336+
from ddtrace.internal.remoteconfig import Payload
337+
from ddtrace.internal.symbol_db.remoteconfig import _rc_callback
338+
from ddtrace.internal.symbol_db.symbols import SymbolDatabaseUploader
339+
340+
SymbolDatabaseUploader.install()
341+
342+
rc_data = [Payload(ConfigMetadata("test", "symdb", "hash", 0, 0), "test", None)]
343+
_rc_callback(rc_data)
344+
results.append(SymbolDatabaseUploader.is_installed())
345+
346+
347+
def test_symbols_spawn_uploads():
348+
import multiprocessing
349+
350+
mc_context = multiprocessing.get_context("spawn")
351+
manager = multiprocessing.Manager()
352+
returns = manager.list()
353+
jobs = []
354+
355+
for _ in range(10):
356+
p = mc_context.Process(target=spawn_target, args=(returns,))
357+
p.start()
358+
jobs.append(p)
359+
360+
for p in jobs:
361+
p.join()
362+
363+
assert sum(returns) == 1, returns

0 commit comments

Comments
 (0)