From 7ec624a4dc3ed13e5242512d0ed6b16551dbdddb Mon Sep 17 00:00:00 2001 From: Yuri Victorovich Date: Sat, 31 Jan 2026 01:09:43 -0800 Subject: [PATCH] Re-run system calls when EINTR occurs This prevents "Interrupted system call" failures when signals are delivered to the process. --- src/devpoll.cpp | 5 ++++- src/ip.cpp | 22 ++++++++++++++++------ src/ipc_listener.cpp | 11 ++++++++--- src/norm_engine.cpp | 5 ++++- src/poll.cpp | 5 ++++- src/select.cpp | 5 ++++- src/signaler.cpp | 17 ++++++++++++----- src/socket_poller.cpp | 12 +++++++++--- src/tcp.cpp | 15 ++++++++++++--- src/tcp_listener.cpp | 13 +++++++++---- src/tipc_listener.cpp | 11 ++++++++--- src/udp_engine.cpp | 24 ++++++++++++++++-------- src/vmci_listener.cpp | 5 ++++- src/ws_listener.cpp | 13 +++++++++---- src/zmq.cpp | 5 ++++- 15 files changed, 123 insertions(+), 45 deletions(-) diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 6b808db1c2..7fe74072f6 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -37,7 +37,10 @@ zmq::devpoll_t::~devpoll_t () void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) { struct pollfd pfd = {fd_, events_, 0}; - ssize_t rc = write (devpoll_fd, &pfd, sizeof pfd); + ssize_t rc; + do { + rc = write (devpoll_fd, &pfd, sizeof pfd); + } while (rc == -1 && errno == EINTR); zmq_assert (rc == sizeof pfd); } diff --git a/src/ip.cpp b/src/ip.cpp index 721acb1de7..f25612d47f 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -447,8 +447,10 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_) // Connect writer to the listener. if (rc != SOCKET_ERROR) { - rc = connect (*w_, reinterpret_cast (&addr), - sizeof addr); + do { + rc = connect (*w_, reinterpret_cast (&addr), + sizeof addr); + } while (rc == SOCKET_ERROR && errno == EINTR); } // Accept connection from writer. @@ -730,10 +732,14 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on); errno_assert (rc != -1); - rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr); + do { + rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr); + } while (rc == -1 && errno == EINTR); errno_assert (rc != -1); - *r_ = accept (listener, NULL, NULL); + do { + *r_ = accept (listener, NULL, NULL); + } while (*r_ == -1 && errno == EINTR); errno_assert (*r_ != -1); close (listener); @@ -772,10 +778,14 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on); errno_assert (rc != -1); - rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr); + do { + rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr); + } while (rc == -1 && errno == EINTR); errno_assert (rc != -1); - *r_ = accept (listener, NULL, NULL); + do { + *r_ = accept (listener, NULL, NULL); + } while (*r_ == -1 && errno == EINTR); errno_assert (*r_ != -1); close (listener); diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 92293da792..67b5c4cee1 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -275,7 +275,10 @@ zmq::fd_t zmq::ipc_listener_t::accept () // resources is considered valid and treated by ignoring the connection. zmq_assert (_s != retired_fd); #if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4 - fd_t sock = ::accept4 (_s, NULL, NULL, SOCK_CLOEXEC); + fd_t sock; + do { + sock = ::accept4 (_s, NULL, NULL, SOCK_CLOEXEC); + } while (sock == retired_fd && errno == EINTR); #else struct sockaddr_storage ss; memset (&ss, 0, sizeof (ss)); @@ -285,8 +288,10 @@ zmq::fd_t zmq::ipc_listener_t::accept () socklen_t ss_len = sizeof (ss); #endif - const fd_t sock = - ::accept (_s, reinterpret_cast (&ss), &ss_len); + fd_t sock; + do { + sock = ::accept (_s, reinterpret_cast (&ss), &ss_len); + } while (sock == retired_fd && errno == EINTR); #endif if (sock == retired_fd) { #if defined ZMQ_HAVE_WINDOWS diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index 8bb3da7d95..668790aba2 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -414,8 +414,11 @@ void zmq::norm_engine_t::in_event () // This means a NormEvent is pending, so call NormGetNextEvent() and handle NormEvent event; #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER - int rc = recv (wrapper_read_fd, reinterpret_cast (&event), + int rc; + do { + rc = recv (wrapper_read_fd, reinterpret_cast (&event), sizeof (event), 0); + } while (rc == -1 && errno == EINTR); errno_assert (rc == sizeof (event)); #else if (!NormGetNextEvent (norm_instance, &event)) { diff --git a/src/poll.cpp b/src/poll.cpp index dd230ccfda..24d6c35238 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -125,8 +125,11 @@ void zmq::poll_t::loop () } // Wait for events. - int rc = poll (&pollset[0], static_cast (pollset.size ()), + int rc; + do { + rc = poll (&pollset[0], static_cast (pollset.size ()), timeout ? timeout : -1); + } while (rc == -1 && errno == EINTR); if (rc == -1) { errno_assert (errno == EINTR); continue; diff --git a/src/select.cpp b/src/select.cpp index 8673215bf9..b427e74093 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -403,8 +403,11 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_, return; fds_set_t local_fds_set = family_entry_.fds_set; - int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write, + int rc; + do { + rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write, &local_fds_set.error, use_timeout_ ? &tv_ : NULL); + } while (rc == -1 && errno == EINTR); #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); diff --git a/src/signaler.cpp b/src/signaler.cpp index 64692db6b3..8a8fd98f83 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -216,7 +216,10 @@ int zmq::signaler_t::wait (int timeout_) const struct pollfd pfd; pfd.fd = _r; pfd.events = POLLIN; - const int rc = poll (&pfd, 1, timeout_); + int rc; + do { + rc = poll (&pfd, 1, timeout_); + } while (rc < 0 && errno == EINTR); if (unlikely (rc < 0)) { errno_assert (errno == EINTR); return -1; @@ -249,12 +252,16 @@ int zmq::signaler_t::wait (int timeout_) const timeout.tv_usec = timeout_ % 1000 * 1000; } #ifdef ZMQ_HAVE_WINDOWS - int rc = - select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL); + int rc; + do { + rc = select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL); + } while (rc == SOCKET_ERROR && WSAGetLastError () == WSAEINTR); wsa_assert (rc != SOCKET_ERROR); #else - int rc = - select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL); + int rc; + do { + rc = select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL); + } while (rc < 0 && errno == EINTR); if (unlikely (rc < 0)) { errno_assert (errno == EINTR); return -1; diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index b9eab821a8..7bfb6aa0f6 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -555,7 +555,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, static_cast (std::min (end - now, INT_MAX)); // Wait for events. - const int rc = poll (_pollfds, _pollset_size, timeout); + int rc; + do { + rc = poll (_pollfds, _pollset_size, timeout); + } while (rc == -1 && errno == EINTR); if (rc == -1 && errno == EINTR) { return -1; } @@ -615,8 +618,11 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, valid_pollset_bytes (*_pollset_out.get ())); memcpy (errset.get (), _pollset_err.get (), valid_pollset_bytes (*_pollset_err.get ())); - const int rc = select (static_cast (_max_fd + 1), inset.get (), - outset.get (), errset.get (), ptimeout); + int rc; + do { + rc = select (static_cast (_max_fd + 1), inset.get (), + outset.get (), errset.get (), ptimeout); + } while (rc == -1 && errno == EINTR); #if defined ZMQ_HAVE_WINDOWS if (unlikely (rc == SOCKET_ERROR)) { errno = wsa_error_to_errno (WSAGetLastError ()); diff --git a/src/tcp.cpp b/src/tcp.cpp index adebea62d5..ebe9619465 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -187,7 +187,10 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) { #ifdef ZMQ_HAVE_WINDOWS - const int nbytes = send (s_, (char *) data_, static_cast (size_), 0); + int nbytes; + do { + nbytes = send (s_, (char *) data_, static_cast (size_), 0); + } while (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEINTR); // If not a single byte can be written to the socket in non-blocking mode // we'll get an error (this may happen during the speculative write). @@ -212,7 +215,10 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) return nbytes; #else - ssize_t nbytes = send (s_, static_cast (data_), size_, 0); + ssize_t nbytes; + do { + nbytes = send (s_, static_cast (data_), size_, 0); + } while (nbytes == -1 && errno == EINTR); // Several errors are OK. When speculative write is being done we may not // be able to write a single byte from the socket. Also, SIGSTOP issued @@ -269,7 +275,10 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_) #else - const ssize_t rc = recv (s_, static_cast (data_), size_, 0); + ssize_t rc; + do { + rc = recv (s_, static_cast (data_), size_, 0); + } while (rc == -1 && errno == EINTR); // Several errors are OK. When speculative read is being done we may not // be able to read a single byte from the socket. Also, SIGSTOP issued diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index bb9804b08b..3d47a6e198 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -175,11 +175,16 @@ zmq::fd_t zmq::tcp_listener_t::accept () socklen_t ss_len = sizeof (ss); #endif #if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4 - fd_t sock = ::accept4 (_s, reinterpret_cast (&ss), - &ss_len, SOCK_CLOEXEC); + fd_t sock; + do { + sock = ::accept4 (_s, reinterpret_cast (&ss), + &ss_len, SOCK_CLOEXEC); + } while (sock == retired_fd && errno == EINTR); #else - const fd_t sock = - ::accept (_s, reinterpret_cast (&ss), &ss_len); + fd_t sock; + do { + sock = ::accept (_s, reinterpret_cast (&ss), &ss_len); + } while (sock == retired_fd && errno == EINTR); #endif if (sock == retired_fd) { diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp index c0cef6b43a..85780f81bc 100644 --- a/src/tipc_listener.cpp +++ b/src/tipc_listener.cpp @@ -129,10 +129,15 @@ zmq::fd_t zmq::tipc_listener_t::accept () zmq_assert (_s != retired_fd); #ifdef ZMQ_HAVE_VXWORKS - fd_t sock = ::accept (_s, (struct sockaddr *) &ss, (int *) &ss_len); + fd_t sock; + do { + sock = ::accept (_s, (struct sockaddr *) &ss, (int *) &ss_len); + } while (sock == -1 && errno == EINTR); #else - fd_t sock = - ::accept (_s, reinterpret_cast (&ss), &ss_len); + fd_t sock; + do { + sock = ::accept (_s, reinterpret_cast (&ss), &ss_len); + } while (sock == -1 && errno == EINTR); #endif if (sock == -1) { errno_assert (errno == EAGAIN || errno == EWOULDBLOCK diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index fe0a536a6e..6c88727ba6 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -457,13 +457,19 @@ void zmq::udp_engine_t::out_event () errno_assert (rc == 0); #ifdef ZMQ_HAVE_WINDOWS - rc = sendto (_fd, _out_buffer, static_cast (size), 0, _out_address, - _out_address_len); + do { + rc = sendto (_fd, _out_buffer, static_cast (size), 0, _out_address, + _out_address_len); + } while (rc < 0 && WSAGetLastError () == WSAEINTR); #elif defined ZMQ_HAVE_VXWORKS - rc = sendto (_fd, reinterpret_cast (_out_buffer), size, 0, - (sockaddr *) _out_address, _out_address_len); + do { + rc = sendto (_fd, reinterpret_cast (_out_buffer), size, 0, + (sockaddr *) _out_address, _out_address_len); + } while (rc < 0 && errno == EINTR); #else - rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len); + do { + rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len); + } while (rc < 0 && errno == EINTR); #endif if (rc < 0) { #ifdef ZMQ_HAVE_WINDOWS @@ -507,9 +513,11 @@ void zmq::udp_engine_t::in_event () zmq_socklen_t in_addrlen = static_cast (sizeof (sockaddr_storage)); - const int nbytes = - recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0, - reinterpret_cast (&in_address), &in_addrlen); + int nbytes; + do { + nbytes = recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0, + reinterpret_cast (&in_address), &in_addrlen); + } while (nbytes < 0 && errno == EINTR); if (nbytes < 0) { #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/vmci_listener.cpp b/src/vmci_listener.cpp index 94eabce1e9..939fc210a0 100644 --- a/src/vmci_listener.cpp +++ b/src/vmci_listener.cpp @@ -150,7 +150,10 @@ zmq::fd_t zmq::vmci_listener_t::accept () // The situation where connection cannot be accepted due to insufficient // resources is considered valid and treated by ignoring the connection. zmq_assert (_s != retired_fd); - fd_t sock = ::accept (_s, NULL, NULL); + fd_t sock; + do { + sock = ::accept (_s, NULL, NULL); + } while (sock == retired_fd && errno == EINTR); #ifdef ZMQ_HAVE_WINDOWS if (sock == INVALID_SOCKET) { diff --git a/src/ws_listener.cpp b/src/ws_listener.cpp index 1516813742..dc0db48482 100644 --- a/src/ws_listener.cpp +++ b/src/ws_listener.cpp @@ -223,11 +223,16 @@ zmq::fd_t zmq::ws_listener_t::accept () socklen_t ss_len = sizeof (ss); #endif #if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4 - fd_t sock = ::accept4 (_s, reinterpret_cast (&ss), - &ss_len, SOCK_CLOEXEC); + fd_t sock; + do { + sock = ::accept4 (_s, reinterpret_cast (&ss), + &ss_len, SOCK_CLOEXEC); + } while (sock == retired_fd && errno == EINTR); #else - const fd_t sock = - ::accept (_s, reinterpret_cast (&ss), &ss_len); + fd_t sock; + do { + sock = ::accept (_s, reinterpret_cast (&ss), &ss_len); + } while (sock == retired_fd && errno == EINTR); #endif if (sock == retired_fd) { diff --git a/src/zmq.cpp b/src/zmq.cpp index 2919679eb9..459eda4074 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1045,8 +1045,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) return -1; } #else - int rc = select (maxfd + 1, inset.get (), outset.get (), + int rc; + do { + rc = select (maxfd + 1, inset.get (), outset.get (), errset.get (), ptimeout); + } while (rc == -1 && errno == EINTR); if (unlikely (rc == -1)) { errno_assert (errno == EINTR || errno == EBADF); return -1;