Skip to content

Commit efe7e83

Browse files
committed
Add fast path for net_io_counters on Windows to reduce idle CPU usage (#9161)
1 parent 3b0ba32 commit efe7e83

3 files changed

Lines changed: 265 additions & 2 deletions

File tree

distributed/_windows_net_io.py

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
# Windows-only fast path for retrieving net_io_counters.
2+
# Part of fix for Issue #9161 (high CPU when idle, caused by expensive psutil.net_io_counters on Windows).
3+
4+
import ctypes
5+
import logging
6+
from collections import namedtuple
7+
from ctypes import wintypes
8+
9+
logger = logging.getLogger("distributed.system_monitor")
10+
11+
# Define GAA flags to skip unnecessary data and speed up the call
12+
GAA_FLAG_SKIP_UNICAST = 0x0001
13+
GAA_FLAG_SKIP_ANYCAST = 0x0002
14+
GAA_FLAG_SKIP_MULTICAST = 0x0004
15+
GAA_FLAG_SKIP_DNS_SERVER = 0x0008
16+
17+
# Define compatible namedtuple for psutil's snetio
18+
win_net_io_counters = namedtuple(
19+
"win_net_io_counters",
20+
[
21+
"bytes_sent",
22+
"bytes_recv",
23+
"packets_sent",
24+
"packets_recv",
25+
"errin",
26+
"errout",
27+
"dropin",
28+
"dropout",
29+
],
30+
)
31+
32+
33+
class IP_ADAPTER_ADDRESSES(ctypes.Structure):
34+
pass
35+
36+
37+
# We only define fields up to FriendlyName. Since we only read the memory
38+
# returned by GetAdaptersAddresses, this is safe and avoids mapping all fields.
39+
IP_ADAPTER_ADDRESSES._fields_ = [
40+
("Length", ctypes.c_ulong),
41+
("IfIndex", ctypes.c_ulong),
42+
("Next", ctypes.POINTER(IP_ADAPTER_ADDRESSES)),
43+
("AdapterName", ctypes.c_char_p),
44+
("FirstUnicastAddress", ctypes.c_void_p),
45+
("FirstAnycastAddress", ctypes.c_void_p),
46+
("FirstMulticastAddress", ctypes.c_void_p),
47+
("FirstDnsServerAddress", ctypes.c_void_p),
48+
("DnsSuffix", ctypes.c_wchar_p),
49+
("Description", ctypes.c_wchar_p),
50+
("FriendlyName", ctypes.c_wchar_p),
51+
]
52+
53+
54+
class MIB_IF_ROW2(ctypes.Structure):
55+
_fields_ = [
56+
("InterfaceLuid", ctypes.c_uint64),
57+
("InterfaceIndex", ctypes.c_ulong),
58+
("InterfaceGuid", ctypes.c_byte * 16),
59+
("Alias", ctypes.c_wchar * 257),
60+
("Description", ctypes.c_wchar * 257),
61+
("PhysicalAddressLength", ctypes.c_ulong),
62+
("PhysicalAddress", ctypes.c_byte * 32),
63+
("PermanentPhysicalAddress", ctypes.c_byte * 32),
64+
("Mtu", ctypes.c_ulong),
65+
("Type", ctypes.c_ulong),
66+
("TunnelType", ctypes.c_ulong),
67+
("MediaType", ctypes.c_ulong),
68+
("PhysicalMediumType", ctypes.c_ulong),
69+
("AccessType", ctypes.c_ulong),
70+
("DirectionType", ctypes.c_ulong),
71+
("InterfaceAndOperStatusFlags", ctypes.c_byte),
72+
# 3 bytes padding (added automatically by ctypes alignment)
73+
("OperStatus", ctypes.c_ulong),
74+
("AdminStatus", ctypes.c_ulong),
75+
("MediaConnectState", ctypes.c_ulong),
76+
("NetworkGuid", ctypes.c_byte * 16),
77+
("ConnectionType", ctypes.c_ulong),
78+
# 4 bytes padding (added automatically by ctypes alignment)
79+
("TransmitLinkSpeed", ctypes.c_uint64),
80+
("ReceiveLinkSpeed", ctypes.c_uint64),
81+
("InOctets", ctypes.c_uint64),
82+
("InUcastPkts", ctypes.c_uint64),
83+
("InNUcastPkts", ctypes.c_uint64),
84+
("InDiscards", ctypes.c_uint64),
85+
("InErrors", ctypes.c_uint64),
86+
("InUnknownProtos", ctypes.c_uint64),
87+
("InUcastOctets", ctypes.c_uint64),
88+
("InMulticastOctets", ctypes.c_uint64),
89+
("InBroadcastOctets", ctypes.c_uint64),
90+
("OutOctets", ctypes.c_uint64),
91+
("OutUcastPkts", ctypes.c_uint64),
92+
("OutNUcastPkts", ctypes.c_uint64),
93+
("OutDiscards", ctypes.c_uint64),
94+
("OutErrors", ctypes.c_uint64),
95+
("OutUcastOctets", ctypes.c_uint64),
96+
("OutMulticastOctets", ctypes.c_uint64),
97+
("OutBroadcastOctets", ctypes.c_uint64),
98+
("OutQLen", ctypes.c_uint64),
99+
]
100+
101+
102+
# Setup Windows API calls
103+
iphlpapi = ctypes.windll.iphlpapi
104+
105+
GetAdaptersAddresses = iphlpapi.GetAdaptersAddresses
106+
GetAdaptersAddresses.argtypes = [
107+
wintypes.ULONG,
108+
wintypes.ULONG,
109+
ctypes.c_void_p,
110+
ctypes.POINTER(IP_ADAPTER_ADDRESSES),
111+
ctypes.POINTER(wintypes.ULONG),
112+
]
113+
GetAdaptersAddresses.restype = wintypes.ULONG
114+
115+
GetIfEntry2 = iphlpapi.GetIfEntry2
116+
GetIfEntry2.argtypes = [ctypes.POINTER(MIB_IF_ROW2)]
117+
GetIfEntry2.restype = wintypes.ULONG
118+
119+
# Cached buffer size for GetAdaptersAddresses
120+
_ADAPTER_ADDRESSES_BUF_SIZE = 16384
121+
122+
123+
def _fast_net_io_counters() -> win_net_io_counters:
124+
"""Low-overhead Windows-only network I/O stats querying using Win32 API."""
125+
global _ADAPTER_ADDRESSES_BUF_SIZE
126+
size = wintypes.ULONG(_ADAPTER_ADDRESSES_BUF_SIZE)
127+
buf = ctypes.create_string_buffer(size.value)
128+
129+
flags = (
130+
GAA_FLAG_SKIP_UNICAST
131+
| GAA_FLAG_SKIP_ANYCAST
132+
| GAA_FLAG_SKIP_MULTICAST
133+
| GAA_FLAG_SKIP_DNS_SERVER
134+
)
135+
136+
ret = GetAdaptersAddresses(
137+
0, # AF_UNSPEC
138+
flags,
139+
None,
140+
ctypes.cast(buf, ctypes.POINTER(IP_ADAPTER_ADDRESSES)),
141+
ctypes.byref(size),
142+
)
143+
144+
# If the buffer was too small, update the cached size, allocate, and call again
145+
if ret == 111: # ERROR_BUFFER_OVERFLOW
146+
_ADAPTER_ADDRESSES_BUF_SIZE = size.value
147+
buf = ctypes.create_string_buffer(size.value)
148+
ret = GetAdaptersAddresses(
149+
0,
150+
flags,
151+
None,
152+
ctypes.cast(buf, ctypes.POINTER(IP_ADAPTER_ADDRESSES)),
153+
ctypes.byref(size),
154+
)
155+
156+
if ret != 0:
157+
raise OSError(f"GetAdaptersAddresses failed with error {ret}")
158+
159+
curr_ptr = ctypes.cast(buf, ctypes.POINTER(IP_ADAPTER_ADDRESSES))
160+
161+
bytes_recv = 0
162+
bytes_sent = 0
163+
packets_recv = 0
164+
packets_sent = 0
165+
errin = 0
166+
errout = 0
167+
dropin = 0
168+
dropout = 0
169+
170+
while curr_ptr:
171+
curr = curr_ptr.contents
172+
ifIndex = curr.IfIndex
173+
174+
row = MIB_IF_ROW2()
175+
row.InterfaceIndex = ifIndex
176+
177+
status = GetIfEntry2(ctypes.byref(row))
178+
if status == 0:
179+
bytes_recv += row.InOctets
180+
bytes_sent += row.OutOctets
181+
packets_recv += row.InUcastPkts + row.InNUcastPkts
182+
packets_sent += row.OutUcastPkts + row.OutNUcastPkts
183+
errin += row.InErrors
184+
errout += row.OutErrors
185+
dropin += row.InDiscards
186+
dropout += row.OutDiscards
187+
188+
curr_ptr = curr.Next
189+
190+
return win_net_io_counters(
191+
bytes_sent=bytes_sent,
192+
bytes_recv=bytes_recv,
193+
packets_sent=packets_sent,
194+
packets_recv=packets_recv,
195+
errin=errin,
196+
errout=errout,
197+
dropin=dropin,
198+
dropout=dropout,
199+
)
200+
201+
202+
def fast_net_io_counters():
203+
"""Wrapper that falls back to psutil.net_io_counters on error."""
204+
try:
205+
return _fast_net_io_counters()
206+
except Exception as e:
207+
logger.debug(
208+
"Windows fast path net_io_counters failed, falling back to psutil: %r",
209+
e,
210+
exc_info=True,
211+
)
212+
import psutil
213+
214+
return psutil.net_io_counters()

distributed/system_monitor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
from distributed.diagnostics import nvml
1515
from distributed.metrics import monotonic, time
1616

17+
if sys.platform == "win32":
18+
# Win32 fast path for Issue #9161
19+
from distributed._windows_net_io import fast_net_io_counters
20+
else:
21+
fast_net_io_counters = psutil.net_io_counters
22+
1723

1824
class SystemMonitor:
1925
proc: psutil.Process
@@ -63,7 +69,7 @@ def __init__(
6369
}
6470

6571
try:
66-
self._last_net_io_counters = psutil.net_io_counters()
72+
self._last_net_io_counters = fast_net_io_counters()
6773
except Exception:
6874
# FIXME is this possible?
6975
self.monitor_net_io = False # pragma: nocover
@@ -165,7 +171,7 @@ def update(self) -> dict[str, Any]:
165171
}
166172

167173
if self.monitor_net_io:
168-
net_ioc = psutil.net_io_counters()
174+
net_ioc = fast_net_io_counters()
169175
last = self._last_net_io_counters
170176
result["host_net_io.read_bps"] = (
171177
net_ioc.bytes_recv - last.bytes_recv

distributed/tests/test_system_monitor.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,46 @@ def test_gil_contention():
129129
sm = SystemMonitor()
130130
a = sm.update()
131131
assert "gil_contention" not in a
132+
133+
134+
def test_windows_fast_net_io_counters():
135+
import sys
136+
137+
if sys.platform != "win32":
138+
pytest.skip("Windows only test")
139+
140+
from distributed._windows_net_io import _fast_net_io_counters
141+
142+
res1 = _fast_net_io_counters()
143+
assert hasattr(res1, "bytes_recv")
144+
assert hasattr(res1, "bytes_sent")
145+
assert isinstance(res1.bytes_recv, int)
146+
assert isinstance(res1.bytes_sent, int)
147+
assert res1.bytes_recv >= 0
148+
assert res1.bytes_sent >= 0
149+
150+
res2 = _fast_net_io_counters()
151+
assert res2.bytes_recv >= res1.bytes_recv
152+
assert res2.bytes_sent >= res1.bytes_sent
153+
154+
155+
def test_windows_fast_net_io_counters_fallback(monkeypatch):
156+
import distributed._windows_net_io
157+
from distributed._windows_net_io import fast_net_io_counters
158+
159+
def mock_fast_net_io_counters():
160+
raise RuntimeError("Simulated ctypes error")
161+
162+
monkeypatch.setattr(
163+
distributed._windows_net_io, "_fast_net_io_counters", mock_fast_net_io_counters
164+
)
165+
166+
# Calling fast_net_io_counters should fall back to psutil without raising
167+
import psutil
168+
169+
expected = psutil.net_io_counters()
170+
res = fast_net_io_counters()
171+
172+
# Check that it returns a valid net_io_counters namedtuple or similar from psutil
173+
assert hasattr(res, "bytes_recv")
174+
assert hasattr(res, "bytes_sent")

0 commit comments

Comments
 (0)