Skip to content

Commit 35aec34

Browse files
committed
Make select/poll interruptible with a dummy wake socket
1 parent 638e8ab commit 35aec34

File tree

6 files changed

+45
-11
lines changed

6 files changed

+45
-11
lines changed

src/common/sockman.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ void SockMan::ThreadSocketHandler()
332332
if (io_readiness.events_per_sock.empty() ||
333333
// WaitMany() may as well be a static method, the context of the first Sock in the vector is not relevant.
334334
!io_readiness.events_per_sock.begin()->first->WaitMany(SELECT_TIMEOUT,
335-
io_readiness.events_per_sock)) {
335+
io_readiness.events_per_sock, nullptr)) {
336336
interruptNet.sleep_for(SELECT_TIMEOUT);
337337
}
338338

src/test/pcp_tests.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class PCPTestSock final : public Sock
206206
return true;
207207
}
208208

209-
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override
209+
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, std::shared_ptr<const Sock> wakesock) const override
210210
{
211211
return false;
212212
}

src/test/util/net.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* o
208208
return true;
209209
}
210210

211-
bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
211+
bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, std::shared_ptr<const Sock> wakesock) const
212212
{
213213
for (auto& [sock, events] : events_per_sock) {
214214
(void)sock;
@@ -362,14 +362,14 @@ bool DynSock::Wait(std::chrono::milliseconds timeout,
362362
{
363363
EventsPerSock ev;
364364
ev.emplace(this, Events{requested});
365-
const bool ret{WaitMany(timeout, ev)};
365+
const bool ret{WaitMany(timeout, ev, nullptr)};
366366
if (occurred != nullptr) {
367367
*occurred = ev.begin()->second.occurred;
368368
}
369369
return ret;
370370
}
371371

372-
bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
372+
bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, std::shared_ptr<const Sock> wakesock) const
373373
{
374374
const auto deadline = std::chrono::steady_clock::now() + timeout;
375375
bool at_least_one_event_occurred{false};
@@ -392,6 +392,10 @@ bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_
392392
}
393393
}
394394
}
395+
if (wakesock) {
396+
std::array<std::byte, 1024> bytes;
397+
[[maybe_unused]] size_t read_bytes = wakesock->Recv(bytes.data(), bytes.size(), MSG_DONTWAIT);
398+
}
395399

396400
if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
397401
break;

src/test/util/net.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ class ZeroSock : public Sock
180180
Event requested,
181181
Event* occurred = nullptr) const override;
182182

183-
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override;
183+
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, std::shared_ptr<const Sock> wakesock) const override;
184184

185185
private:
186186
ZeroSock& operator=(Sock&& other) override;
@@ -333,7 +333,7 @@ class DynSock : public ZeroSock
333333
Event requested,
334334
Event* occurred = nullptr) const override;
335335

336-
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const override;
336+
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, std::shared_ptr<const Sock> wakesock) const override;
337337

338338
private:
339339
DynSock& operator=(Sock&&) override;

src/util/sock.cpp

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur
145145

146146
EventsPerSock events_per_sock{std::make_pair(shared, Events{requested})};
147147

148-
if (!WaitMany(timeout, events_per_sock)) {
148+
if (!WaitMany(timeout, events_per_sock, nullptr)) {
149149
return false;
150150
}
151151

@@ -156,7 +156,7 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur
156156
return true;
157157
}
158158

159-
bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
159+
bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, std::shared_ptr<const Sock> wakesock) const
160160
{
161161
#ifdef USE_POLL
162162
std::vector<pollfd> pfds;
@@ -172,11 +172,27 @@ bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per
172172
}
173173
}
174174

175+
if (wakesock) {
176+
pfds.emplace_back();
177+
auto& pfd = pfds.back();
178+
pfd.fd = wakesock->m_socket;
179+
pfd.events |= POLLIN;
180+
}
181+
175182
if (poll(pfds.data(), pfds.size(), count_milliseconds(timeout)) == SOCKET_ERROR) {
176183
return false;
177184
}
178185

179-
assert(pfds.size() == events_per_sock.size());
186+
if (wakesock) {
187+
assert(pfds.size() == events_per_sock.size() + 1);
188+
if (pfds.back().revents & POLLIN) {
189+
std::array<std::byte, 1024> bytes;
190+
[[maybe_unused]] size_t read_bytes = wakesock->Recv(bytes.data(), bytes.size(), MSG_DONTWAIT);
191+
}
192+
} else {
193+
assert(pfds.size() == events_per_sock.size());
194+
}
195+
180196
size_t i{0};
181197
for (auto& [sock, events] : events_per_sock) {
182198
assert(sock->m_socket == static_cast<SOCKET>(pfds[i].fd));
@@ -218,12 +234,26 @@ bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per
218234
socket_max = std::max(socket_max, s);
219235
}
220236

237+
if (wakesock) {
238+
if (!wakesock->IsSelectable()) {
239+
return false;
240+
}
241+
const auto& s = wakesock->m_socket;
242+
FD_SET(s, &recv);
243+
socket_max = std::max(socket_max, s);
244+
}
245+
221246
timeval tv = MillisToTimeval(timeout);
222247

223248
if (select(socket_max + 1, &recv, &send, &err, &tv) == SOCKET_ERROR) {
224249
return false;
225250
}
226251

252+
if (wakesock && FD_ISSET(wakesock->m_socket, &recv)) {
253+
std::array<std::byte, 1024> bytes;
254+
[[maybe_unused]] size_t read_bytes = wakesock->Recv(bytes.data(), bytes.size(), MSG_DONTWAIT);
255+
}
256+
227257
for (auto& [sock, events] : events_per_sock) {
228258
const auto& s = sock->m_socket;
229259
events.occurred = 0;

src/util/sock.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class Sock
216216
* false otherwise
217217
*/
218218
[[nodiscard]] virtual bool WaitMany(std::chrono::milliseconds timeout,
219-
EventsPerSock& events_per_sock) const;
219+
EventsPerSock& events_per_sock, std::shared_ptr<const Sock> wakesock) const;
220220

221221
/* Higher level, convenience, methods. These may throw. */
222222

0 commit comments

Comments
 (0)