Skip to content

Commit e56f617

Browse files
authored
adds more signal options (#3248)
* adds option in load that prevents draining pool on signal * adds runtime pipeline option to not intercept signals * refactors signal module * tests new cases * describes signal handling in running in prod docs * bumps dlt to 1.18.0 * fixes tests forked * removes logging and buffered console output from signals * adds retry count to load job metrics, generates started_at in init of runnable load job * allows to update existing metrics in load step * finalized jobs require start and finish dates * generates metrics in each job state and in each completed loop, does not complete package if pool drained but jobs left, adds detailed tests for metrics * fixes remote metrics * replaces event with package bound semaphore to complete load jobs early * fixes dashboard to on windows * improves signals docs * renames delayed_signals to intercepted_signals
1 parent 449d914 commit e56f617

File tree

31 files changed

+689
-215
lines changed

31 files changed

+689
-215
lines changed

.github/workflows/test_tools_dashboard.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ jobs:
114114
- name: Run dashboard e2e windows
115115
run: |
116116
start marimo run --headless dlt/_workspace/helpers/dashboard/dlt_dashboard.py -- -- --pipelines-dir _storage\.dlt\pipelines\ --with_test_identifiers true
117-
timeout /t 2 /nobreak >NUL
117+
timeout /t 6 /nobreak
118118
pytest --browser chromium tests/e2e
119119
if: matrix.python-version != '3.9' && matrix.python-version != '3.14.0-beta.4' && matrix.os == 'windows-latest'
120120

dlt/_workspace/cli/_pipeline_command.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
141141
if operation == "show":
142142
from dlt.common.runtime import signals
143143

144-
with signals.delayed_signals():
144+
with signals.intercepted_signals():
145145
streamlit_cmd = [
146146
"streamlit",
147147
"run",

dlt/common/destination/client.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from abc import ABC, abstractmethod
22
import dataclasses
3-
3+
import contextlib
4+
from threading import BoundedSemaphore
45
from types import TracebackType
56
from typing import (
67
ClassVar,
@@ -349,6 +350,7 @@ def metrics(self) -> Optional[LoadJobMetrics]:
349350
self._finished_at,
350351
self.state(),
351352
None,
353+
self._parsed_file_name.retry_count,
352354
)
353355

354356

@@ -371,13 +373,16 @@ def __init__(self, file_path: str) -> None:
371373
# ensure file name
372374
super().__init__(file_path)
373375
self._state: TLoadJobState = "ready"
376+
self._started_at = pendulum.now()
374377
self._exception: BaseException = None
375378

376379
# variables needed by most jobs, set by the loader in set_run_vars
377380
self._schema: Schema = None
378381
self._load_table: PreparedTableSchema = None
379382
self._load_id: str = None
383+
# set by run_managed method
380384
self._job_client: "JobClientBase" = None
385+
self._done_event: BoundedSemaphore = None
381386

382387
def set_run_vars(self, load_id: str, schema: Schema, load_table: PreparedTableSchema) -> None:
383388
"""
@@ -394,21 +399,21 @@ def load_table_name(self) -> str:
394399
def run_managed(
395400
self,
396401
job_client: "JobClientBase",
402+
done_event: BoundedSemaphore,
403+
/,
397404
) -> None:
398405
"""
399406
wrapper around the user implemented run method
400407
"""
401-
from dlt.common.runtime import signals
402-
403408
# only jobs that are not running or have not reached a final state
404409
# may be started
405410
assert self._state in ("ready", "retry")
406411
self._job_client = job_client
412+
self._done_event = done_event
407413

408414
# filepath is now moved to running
409415
try:
410416
self._state = "running"
411-
self._started_at = pendulum.now()
412417
self._job_client.prepare_load_job_execution(self)
413418
self.run()
414419
self._state = "completed"
@@ -423,12 +428,14 @@ def run_managed(
423428
f"Transient exception in job {self.job_id()} in file {self._file_path}"
424429
)
425430
finally:
426-
self._finished_at = pendulum.now()
427431
# sanity check
428432
assert self._state in ("completed", "retry", "failed")
429433
if self._state != "retry":
434+
self._finished_at = pendulum.now()
430435
# wake up waiting threads
431-
signals.wake_all()
436+
if self._done_event:
437+
with contextlib.suppress(ValueError):
438+
self._done_event.release()
432439

433440
@abstractmethod
434441
def run(self) -> None:

dlt/common/metrics.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class LoadJobMetrics(NamedTuple):
9191
finished_at: datetime.datetime
9292
state: Optional[str]
9393
remote_url: Optional[str]
94+
retry_count: Optional[int]
9495

9596

9697
class LoadMetrics(StepMetrics):

dlt/common/pipeline.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,14 +413,27 @@ def _step_info_start_load_id(self, load_id: str) -> None:
413413
self._current_load_started = precise_time()
414414
self._load_id_metrics.setdefault(load_id, [])
415415

416-
def _step_info_complete_load_id(self, load_id: str, metrics: TStepMetrics) -> None:
416+
def _step_info_update_metrics(
417+
self, load_id: str, metrics: TStepMetrics, immutable: bool = False
418+
) -> None:
419+
metrics["started_at"] = ensure_pendulum_datetime_utc(self._current_load_started)
420+
step_metrics = self._load_id_metrics[load_id]
421+
if immutable or len(step_metrics) == 0:
422+
step_metrics.append(metrics)
423+
else:
424+
step_metrics[0] = metrics
425+
426+
def _step_info_complete_load_id(self, load_id: str, finished: bool = True) -> None:
417427
assert self._current_load_id == load_id, (
418428
f"Current load id mismatch {self._current_load_id} != {load_id} when completing step"
419429
" info"
420430
)
421-
metrics["started_at"] = ensure_pendulum_datetime_utc(self._current_load_started)
422-
metrics["finished_at"] = ensure_pendulum_datetime_utc(precise_time())
423-
self._load_id_metrics[load_id].append(metrics)
431+
# metrics must be present
432+
metrics = self._load_id_metrics[load_id][-1]
433+
# update finished at
434+
assert metrics["finished_at"] is None
435+
if finished:
436+
metrics["finished_at"] = ensure_pendulum_datetime_utc(precise_time())
424437
self._current_load_id = None
425438
self._current_load_started = None
426439

dlt/common/runtime/signals.py

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import sys
23
import threading
34
import signal
@@ -6,19 +7,20 @@
67
from types import FrameType
78
from typing import Any, Callable, Dict, Iterator, Optional, Union
89

9-
from dlt.common import logger
1010
from dlt.common.exceptions import SignalReceivedException
1111

1212
_received_signal: int = 0
1313
exit_event = Event()
1414
_signal_counts: Dict[int, int] = {}
1515
_original_handlers: Dict[int, Union[int, Callable[[int, Optional[FrameType]], Any]]] = {}
1616

17+
# NOTE: do not use logger and print in signal handlers
1718

18-
def signal_receiver(sig: int, frame: FrameType) -> None:
19+
20+
def _signal_receiver(sig: int, frame: FrameType) -> None:
1921
"""Handle POSIX signals with two-stage escalation.
2022
21-
This handler is installed by delayed_signals(). On the first occurrence of a
23+
This handler is installed by intercepted_signals(). On the first occurrence of a
2224
supported signal (eg. SIGINT, SIGTERM) it requests a graceful shutdown by
2325
setting a process-wide flag and waking sleeping threads via exit_event.
2426
A second occurrence of the same signal escalates by delegating to the
@@ -34,49 +36,61 @@ def signal_receiver(sig: int, frame: FrameType) -> None:
3436
Worker threads must cooperatively observe shutdown via raise_if_signalled()
3537
or the signal-aware sleep().
3638
"""
37-
global _received_signal
38-
3939
# track how many times this signal type has been received
4040
_signal_counts[sig] = _signal_counts.get(sig, 0) + 1
4141

4242
if _signal_counts[sig] == 1:
4343
# first signal of this type: set flag and wake threads
44-
_received_signal = sig
45-
if sig == signal.SIGINT:
46-
sig_desc = "CTRL-C"
47-
else:
48-
sig_desc = f"Signal {sig}"
49-
msg = (
50-
f"{sig_desc} received. Trying to shut down gracefully. It may take time to drain job"
51-
f" pools. Send {sig_desc} again to force stop."
52-
)
44+
set_received_signal(sig)
5345
if sys.stdin.isatty():
54-
# log to console
55-
sys.stderr.write(msg)
56-
sys.stderr.flush()
57-
else:
58-
logger.warning(msg)
46+
# log to console using low level functions that are safe for signal handlers
47+
if sig == signal.SIGINT:
48+
sig_desc = "CTRL-C"
49+
else:
50+
sig_desc = f"Signal {sig}"
51+
msg = (
52+
f"{sig_desc} received. Trying to shut down gracefully. It may take time to drain"
53+
f" job pools. Send {sig_desc} again to force stop."
54+
)
55+
try:
56+
os.write(sys.stderr.fileno(), msg.encode(encoding="utf-8"))
57+
except OSError:
58+
pass
5959
elif _signal_counts[sig] >= 2:
60-
# Second signal of this type: call original handler
61-
logger.debug(f"Second signal {sig} received, calling default handler")
60+
# second signal of this type: call original handler
6261
original_handler = _original_handlers.get(sig, signal.SIG_DFL)
6362
if callable(original_handler):
6463
original_handler(sig, frame)
6564
elif original_handler == signal.SIG_DFL:
66-
# Restore default and re-raise to trigger default behavior
65+
# restore default and re-raise to trigger default behavior
6766
signal.signal(sig, signal.SIG_DFL)
6867
signal.raise_signal(sig)
6968

7069
exit_event.set()
71-
logger.debug("Sleeping threads signalled")
70+
71+
72+
def _clear_signals() -> None:
73+
global _received_signal
74+
75+
_received_signal = 0
76+
_signal_counts.clear()
77+
_original_handlers.clear()
78+
79+
80+
def set_received_signal(sig: int) -> None:
81+
"""Called when signal was received"""
82+
global _received_signal
83+
84+
_received_signal = sig
7285

7386

7487
def raise_if_signalled() -> None:
75-
if _received_signal:
88+
"""Raises `SignalReceivedException` if signal was received."""
89+
if was_signal_received():
7690
raise SignalReceivedException(_received_signal)
7791

7892

79-
def signal_received() -> bool:
93+
def was_signal_received() -> bool:
8094
"""check if a signal was received"""
8195
return True if _received_signal else False
8296

@@ -93,18 +107,10 @@ def wake_all() -> None:
93107
exit_event.set()
94108

95109

96-
def _clear_signals() -> None:
97-
global _received_signal
98-
99-
_received_signal = 0
100-
_signal_counts.clear()
101-
_original_handlers.clear()
102-
103-
104110
@contextmanager
105-
def delayed_signals() -> Iterator[None]:
106-
"""Will delay signalling until `raise_if_signalled` is explicitly used or when
107-
a second signal with the same int value arrives.
111+
def intercepted_signals() -> Iterator[None]:
112+
"""Will intercept SIGINT and SIGTERM and will delay calling signal handlers until
113+
`raise_if_signalled` is explicitly used or when a second signal with the same int value arrives.
108114
109115
A no-op when not called on main thread.
110116
@@ -115,7 +121,7 @@ def delayed_signals() -> Iterator[None]:
115121
# check if handlers are already installed (nested call)
116122
current_sigint_handler = signal.getsignal(signal.SIGINT)
117123

118-
if current_sigint_handler is signal_receiver:
124+
if current_sigint_handler is _signal_receiver:
119125
# already installed, this is a nested call - just yield
120126
yield
121127
return
@@ -129,14 +135,16 @@ def delayed_signals() -> Iterator[None]:
129135
_original_handlers[signal.SIGTERM] = original_sigterm_handler
130136

131137
try:
132-
signal.signal(signal.SIGINT, signal_receiver)
133-
signal.signal(signal.SIGTERM, signal_receiver)
138+
signal.signal(signal.SIGINT, _signal_receiver)
139+
signal.signal(signal.SIGTERM, _signal_receiver)
134140
yield
135141
finally:
136142
signal.signal(signal.SIGINT, original_sigint_handler)
137143
signal.signal(signal.SIGTERM, original_sigterm_handler)
138144
_clear_signals()
139145

140146
else:
147+
from dlt.common import logger
148+
141149
logger.info("Running in daemon thread, signals not enabled")
142150
yield

dlt/destinations/impl/clickhouse/clickhouse.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from dlt.common.destination.client import (
1717
PreparedTableSchema,
1818
SupportsStagingDestination,
19-
TLoadJobState,
2019
HasFollowupJobs,
2120
RunnableLoadJob,
2221
FollowupJobRequest,
@@ -53,7 +52,7 @@
5352
SqlJobClientBase,
5453
SqlJobClientWithStagingDataset,
5554
)
56-
from dlt.destinations.job_impl import ReferenceFollowupJobRequest, FinalizedLoadJobWithFollowupJobs
55+
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
5756
from dlt.destinations.sql_client import SqlClientBase
5857
from dlt.destinations.sql_jobs import SqlMergeFollowupJob
5958
from dlt.destinations.utils import get_deterministic_temp_table_name

dlt/destinations/impl/dremio/dremio.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,14 @@
55
from dlt.common.destination.client import (
66
HasFollowupJobs,
77
PreparedTableSchema,
8-
TLoadJobState,
98
RunnableLoadJob,
109
SupportsStagingDestination,
1110
FollowupJobRequest,
1211
LoadJob,
1312
)
1413
from dlt.common.schema import TColumnSchema, Schema
15-
from dlt.common.schema.typing import TColumnType, TTableFormat
14+
from dlt.common.schema.typing import TColumnType
1615
from dlt.common.storages.file_storage import FileStorage
17-
from dlt.common.utils import uniq_id
1816
from dlt.destinations.exceptions import LoadJobTerminalException
1917
from dlt.destinations.impl.dremio.configuration import DremioClientConfiguration
2018
from dlt.destinations.impl.dremio.sql_client import DremioSqlClient

dlt/destinations/impl/ducklake/ducklake.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ def __init__(self, file_path: str) -> None:
7575
def metrics(self) -> Optional[LoadJobMetrics]:
7676
"""Generate remote url metrics which point to the table in storage"""
7777
m = super().metrics()
78+
# job client not available before run_managed called
79+
if not self._job_client:
80+
return m
7881
# TODO: read location from catalog. ducklake supports customized table layouts
7982
return m._replace(
8083
remote_url=str(

dlt/destinations/impl/filesystem/filesystem.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ def make_remote_url(self) -> str:
146146

147147
def metrics(self) -> Optional[LoadJobMetrics]:
148148
m = super().metrics()
149+
# job client not available before run_managed called
150+
if not self._job_client:
151+
return m
149152
return m._replace(remote_url=self.make_remote_url())
150153

151154

0 commit comments

Comments
 (0)