Skip to content

Commit f00eb2e

Browse files
committed
[event] Workqueue monitoring for OpenBSD.
Simply speaking, the global concurrent queue has a thread per cpu and schedules functions or blocks on one of these threads. If these threads are all busy, without workqueue monitoring, we cannot schedule a new function or block until one of those threads returns. If thread(s) have not returned however because they are asleep (say), then we could overcommit the global concurrent queue by scheduling more work. The Linux implementation checks each registered thread ID to see if the corresponding thread is in a runnable state according to procfs every second. On some platforms without procfs, this same information is obtainable essentially via the relevant sysctls. Libraries may exist to facilitate issuing the correct sysctls (c.f. `kvm_getprocs`) but these were not used here so to avoid complications with the right invocations when linking. A unit test is also added to test the overcommit functionality. In this test, we tie up all cpus to prevent the scheduled threads from returning and then attempt to run one more task. If the overcommit monitoring is working as intended, then Dispatch will detect all the threads are simply asleep and will attempt to overcommit. If not, the test will notice the function we attempted to overcommit did not run and fail the test.
1 parent 93c000c commit f00eb2e

File tree

4 files changed

+125
-1
lines changed

4 files changed

+125
-1
lines changed

src/event/workqueue.c

+46
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,52 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
242242

243243
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
244244
}
245+
#elif defined(__OpenBSD__)
246+
#include <sys/param.h>
247+
#include <sys/sysctl.h>
248+
#include <sys/proc.h>
249+
250+
static void
251+
_dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
252+
{
253+
struct kinfo_proc kp[WORKQ_MAX_TRACKED_TIDS] = {0};
254+
size_t size, len;
255+
int mib[] = {CTL_KERN, KERN_PROC, KERN_PROC_PID, (int)getpid(), (int)sizeof(struct kinfo_proc), 0};
256+
if (sysctl(mib, 6, NULL, &size, NULL, 0) < 0) {
257+
_dispatch_debug("workq: Failed to sysctl1");
258+
return;
259+
}
260+
261+
size = size > sizeof(kp)? sizeof(kp): size;
262+
len = size / sizeof(struct kinfo_proc);
263+
mib[5] = (int)len;
264+
if (sysctl(mib, 6, kp, &size, NULL, 0) < 0) {
265+
_dispatch_debug("workq: Failed to sysctl2");
266+
return;
267+
}
268+
269+
int running_count = 0;
270+
271+
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
272+
273+
for (int i = 0; i < mon->num_registered_tids; i++) {
274+
dispatch_tid tid = mon->registered_tids[i];
275+
for (size_t j = 0; j < len; j++) {
276+
if ((dispatch_tid)kp[j].p_tid != tid) {
277+
continue;
278+
}
279+
280+
if (kp[j].p_stat == SRUN || kp[j].p_stat == SIDL || kp[j].p_stat == SONPROC) {
281+
running_count++;
282+
break;
283+
}
284+
}
285+
}
286+
287+
mon->num_runnable = running_count;
288+
289+
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
290+
}
245291
#else
246292
#error must define _dispatch_workq_count_runnable_workers
247293
#endif

src/event/workqueue_internal.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
void _dispatch_workq_worker_register(dispatch_queue_global_t root_q);
3131
void _dispatch_workq_worker_unregister(dispatch_queue_global_t root_q);
3232

33-
#if defined(__linux__) || defined(_WIN32)
33+
#if defined(__linux__) || defined(_WIN32) || defined(__OpenBSD__)
3434
#define HAVE_DISPATCH_WORKQ_MONITORING 1
3535
#else
3636
#define HAVE_DISPATCH_WORKQ_MONITORING 0

tests/CMakeLists.txt

+6
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,12 @@ set_tests_properties(dispatch_io_pipe_close PROPERTIES TIMEOUT 5)
183183
add_unit_test(dispatch_c99 SOURCES dispatch_c99.c)
184184
add_unit_test(dispatch_plusplus SOURCES dispatch_plusplus.cpp)
185185

186+
if (DISPATCH_USE_INTERNAL_WORKQUEUE)
187+
add_unit_test(dispatch_workqueue
188+
SOURCES
189+
dispatch_workqueue.c)
190+
endif()
191+
186192
# test-specific link options
187193
if(WIN32)
188194
target_link_libraries(dispatch_io_muxed PRIVATE WS2_32)

tests/dispatch_workqueue.c

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#include <bsdtests.h>
2+
#include "dispatch_test.h"
3+
4+
struct test_context {
5+
uint32_t ncpu;
6+
int flag;
7+
};
8+
9+
static void
10+
timeout(void *context)
11+
{
12+
struct test_context *ctx = (struct test_context *)context;
13+
sleep(2); // Give the monitor the best chance of firing.
14+
test_int32_format(ctx->flag, 1, "flag");
15+
test_stop();
16+
}
17+
18+
static void
19+
raise_flag(void *context)
20+
{
21+
struct test_context *ctx = (struct test_context *)context;
22+
ctx->flag++;
23+
}
24+
25+
static void
26+
spin(void *context)
27+
{
28+
struct test_context *ctx = (struct test_context *)context;
29+
sleep(ctx->ncpu * 2);
30+
}
31+
32+
static uint32_t
33+
activecpu(void)
34+
{
35+
uint32_t activecpu;
36+
#if defined(__linux__) || defined(__OpenBSD__)
37+
activecpu = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN);
38+
#elif defined(_WIN32)
39+
SYSTEM_INFO si;
40+
GetSystemInfo(&si);
41+
activecpu = si.dwNumberOfProcessors;
42+
#else
43+
size_t s = sizeof(activecpu);
44+
sysctlbyname("hw.activecpu", &activecpu, &s, NULL, 0);
45+
#endif
46+
return activecpu;
47+
}
48+
49+
int
50+
main(void)
51+
{
52+
uint32_t ncpu = activecpu();
53+
54+
dispatch_test_start("Dispatch workqueue");
55+
56+
dispatch_queue_t global = dispatch_get_global_queue(0, 0);
57+
test_ptr_notnull("dispatch_get_global_queue", global);
58+
59+
struct test_context ctx = {ncpu, 0};
60+
dispatch_async_f(global, &ctx, timeout);
61+
62+
for(int i = 0; i < (int)ncpu - 1; i++) {
63+
dispatch_async_f(global, &ctx, spin);
64+
}
65+
66+
// All cpus are tied up at this point. Workqueue
67+
// should execute this function by overcommit.
68+
dispatch_async_f(global, &ctx, raise_flag);
69+
70+
dispatch_main();
71+
return 0;
72+
}

0 commit comments

Comments
 (0)