Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/devpoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ zmq::devpoll_t::~devpoll_t ()
void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_)
{
struct pollfd pfd = {fd_, events_, 0};
ssize_t rc = write (devpoll_fd, &pfd, sizeof pfd);
ssize_t rc;
do {
rc = write (devpoll_fd, &pfd, sizeof pfd);
} while (rc == -1 && errno == EINTR);
zmq_assert (rc == sizeof pfd);
}

Expand Down
22 changes: 16 additions & 6 deletions src/ip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,10 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)

// Connect writer to the listener.
if (rc != SOCKET_ERROR) {
rc = connect (*w_, reinterpret_cast<struct sockaddr *> (&addr),
sizeof addr);
do {
rc = connect (*w_, reinterpret_cast<struct sockaddr *> (&addr),
sizeof addr);
} while (rc == SOCKET_ERROR && errno == EINTR);
}

// Accept connection from writer.
Expand Down Expand Up @@ -730,10 +732,14 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
errno_assert (rc != -1);

rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
do {
rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
} while (rc == -1 && errno == EINTR);
errno_assert (rc != -1);

*r_ = accept (listener, NULL, NULL);
do {
*r_ = accept (listener, NULL, NULL);
} while (*r_ == -1 && errno == EINTR);
errno_assert (*r_ != -1);

close (listener);
Expand Down Expand Up @@ -772,10 +778,14 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
errno_assert (rc != -1);

rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
do {
rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
} while (rc == -1 && errno == EINTR);
errno_assert (rc != -1);

*r_ = accept (listener, NULL, NULL);
do {
*r_ = accept (listener, NULL, NULL);
} while (*r_ == -1 && errno == EINTR);
errno_assert (*r_ != -1);

close (listener);
Expand Down
11 changes: 8 additions & 3 deletions src/ipc_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
// resources is considered valid and treated by ignoring the connection.
zmq_assert (_s != retired_fd);
#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
fd_t sock = ::accept4 (_s, NULL, NULL, SOCK_CLOEXEC);
fd_t sock;
do {
sock = ::accept4 (_s, NULL, NULL, SOCK_CLOEXEC);
} while (sock == retired_fd && errno == EINTR);
#else
struct sockaddr_storage ss;
memset (&ss, 0, sizeof (ss));
Expand All @@ -285,8 +288,10 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
socklen_t ss_len = sizeof (ss);
#endif

const fd_t sock =
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
fd_t sock;
do {
sock = ::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
} while (sock == retired_fd && errno == EINTR);
#endif
if (sock == retired_fd) {
#if defined ZMQ_HAVE_WINDOWS
Expand Down
5 changes: 4 additions & 1 deletion src/norm_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,11 @@ void zmq::norm_engine_t::in_event ()
// This means a NormEvent is pending, so call NormGetNextEvent() and handle
NormEvent event;
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
int rc = recv (wrapper_read_fd, reinterpret_cast<char *> (&event),
int rc;
do {
rc = recv (wrapper_read_fd, reinterpret_cast<char *> (&event),
sizeof (event), 0);
} while (rc == -1 && errno == EINTR);
errno_assert (rc == sizeof (event));
#else
if (!NormGetNextEvent (norm_instance, &event)) {
Expand Down
5 changes: 4 additions & 1 deletion src/poll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ void zmq::poll_t::loop ()
}

// Wait for events.
int rc = poll (&pollset[0], static_cast<nfds_t> (pollset.size ()),
int rc;
do {
rc = poll (&pollset[0], static_cast<nfds_t> (pollset.size ()),
timeout ? timeout : -1);
} while (rc == -1 && errno == EINTR);
if (rc == -1) {
errno_assert (errno == EINTR);
continue;
Expand Down
5 changes: 4 additions & 1 deletion src/select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,11 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
return;

fds_set_t local_fds_set = family_entry_.fds_set;
int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
int rc;
do {
rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
&local_fds_set.error, use_timeout_ ? &tv_ : NULL);
} while (rc == -1 && errno == EINTR);

#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
Expand Down
17 changes: 12 additions & 5 deletions src/signaler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ int zmq::signaler_t::wait (int timeout_) const
struct pollfd pfd;
pfd.fd = _r;
pfd.events = POLLIN;
const int rc = poll (&pfd, 1, timeout_);
int rc;
do {
rc = poll (&pfd, 1, timeout_);
} while (rc < 0 && errno == EINTR);
if (unlikely (rc < 0)) {
errno_assert (errno == EINTR);
return -1;
Expand Down Expand Up @@ -249,12 +252,16 @@ int zmq::signaler_t::wait (int timeout_) const
timeout.tv_usec = timeout_ % 1000 * 1000;
}
#ifdef ZMQ_HAVE_WINDOWS
int rc =
select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
int rc;
do {
rc = select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
} while (rc == SOCKET_ERROR && WSAGetLastError () == WSAEINTR);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc =
select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
int rc;
do {
rc = select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
} while (rc < 0 && errno == EINTR);
if (unlikely (rc < 0)) {
errno_assert (errno == EINTR);
return -1;
Expand Down
12 changes: 9 additions & 3 deletions src/socket_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
static_cast<int> (std::min<uint64_t> (end - now, INT_MAX));

// Wait for events.
const int rc = poll (_pollfds, _pollset_size, timeout);
int rc;
do {
rc = poll (_pollfds, _pollset_size, timeout);
} while (rc == -1 && errno == EINTR);
if (rc == -1 && errno == EINTR) {
return -1;
}
Expand Down Expand Up @@ -615,8 +618,11 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
valid_pollset_bytes (*_pollset_out.get ()));
memcpy (errset.get (), _pollset_err.get (),
valid_pollset_bytes (*_pollset_err.get ()));
const int rc = select (static_cast<int> (_max_fd + 1), inset.get (),
outset.get (), errset.get (), ptimeout);
int rc;
do {
rc = select (static_cast<int> (_max_fd + 1), inset.get (),
outset.get (), errset.get (), ptimeout);
} while (rc == -1 && errno == EINTR);
#if defined ZMQ_HAVE_WINDOWS
if (unlikely (rc == SOCKET_ERROR)) {
errno = wsa_error_to_errno (WSAGetLastError ());
Expand Down
15 changes: 12 additions & 3 deletions src/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
{
#ifdef ZMQ_HAVE_WINDOWS

const int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0);
int nbytes;
do {
nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0);
} while (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEINTR);

// If not a single byte can be written to the socket in non-blocking mode
// we'll get an error (this may happen during the speculative write).
Expand All @@ -212,7 +215,10 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
return nbytes;

#else
ssize_t nbytes = send (s_, static_cast<const char *> (data_), size_, 0);
ssize_t nbytes;
do {
nbytes = send (s_, static_cast<const char *> (data_), size_, 0);
} while (nbytes == -1 && errno == EINTR);

// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
Expand Down Expand Up @@ -269,7 +275,10 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)

#else

const ssize_t rc = recv (s_, static_cast<char *> (data_), size_, 0);
ssize_t rc;
do {
rc = recv (s_, static_cast<char *> (data_), size_, 0);
} while (rc == -1 && errno == EINTR);

// Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued
Expand Down
13 changes: 9 additions & 4 deletions src/tcp_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,16 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
socklen_t ss_len = sizeof (ss);
#endif
#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
fd_t sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
&ss_len, SOCK_CLOEXEC);
fd_t sock;
do {
sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
&ss_len, SOCK_CLOEXEC);
} while (sock == retired_fd && errno == EINTR);
#else
const fd_t sock =
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
fd_t sock;
do {
sock = ::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
} while (sock == retired_fd && errno == EINTR);
#endif

if (sock == retired_fd) {
Expand Down
11 changes: 8 additions & 3 deletions src/tipc_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,15 @@ zmq::fd_t zmq::tipc_listener_t::accept ()

zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_VXWORKS
fd_t sock = ::accept (_s, (struct sockaddr *) &ss, (int *) &ss_len);
fd_t sock;
do {
sock = ::accept (_s, (struct sockaddr *) &ss, (int *) &ss_len);
} while (sock == -1 && errno == EINTR);
#else
fd_t sock =
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
fd_t sock;
do {
sock = ::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
} while (sock == -1 && errno == EINTR);
#endif
if (sock == -1) {
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
Expand Down
24 changes: 16 additions & 8 deletions src/udp_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,19 @@ void zmq::udp_engine_t::out_event ()
errno_assert (rc == 0);

#ifdef ZMQ_HAVE_WINDOWS
rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address,
_out_address_len);
do {
rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address,
_out_address_len);
} while (rc < 0 && WSAGetLastError () == WSAEINTR);
#elif defined ZMQ_HAVE_VXWORKS
rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0,
(sockaddr *) _out_address, _out_address_len);
do {
rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0,
(sockaddr *) _out_address, _out_address_len);
} while (rc < 0 && errno == EINTR);
#else
rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len);
do {
rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len);
} while (rc < 0 && errno == EINTR);
#endif
if (rc < 0) {
#ifdef ZMQ_HAVE_WINDOWS
Expand Down Expand Up @@ -507,9 +513,11 @@ void zmq::udp_engine_t::in_event ()
zmq_socklen_t in_addrlen =
static_cast<zmq_socklen_t> (sizeof (sockaddr_storage));

const int nbytes =
recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);
int nbytes;
do {
nbytes = recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);
} while (nbytes < 0 && errno == EINTR);

if (nbytes < 0) {
#ifdef ZMQ_HAVE_WINDOWS
Expand Down
5 changes: 4 additions & 1 deletion src/vmci_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ zmq::fd_t zmq::vmci_listener_t::accept ()
// The situation where connection cannot be accepted due to insufficient
// resources is considered valid and treated by ignoring the connection.
zmq_assert (_s != retired_fd);
fd_t sock = ::accept (_s, NULL, NULL);
fd_t sock;
do {
sock = ::accept (_s, NULL, NULL);
} while (sock == retired_fd && errno == EINTR);

#ifdef ZMQ_HAVE_WINDOWS
if (sock == INVALID_SOCKET) {
Expand Down
13 changes: 9 additions & 4 deletions src/ws_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,16 @@ zmq::fd_t zmq::ws_listener_t::accept ()
socklen_t ss_len = sizeof (ss);
#endif
#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
fd_t sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
&ss_len, SOCK_CLOEXEC);
fd_t sock;
do {
sock = ::accept4 (_s, reinterpret_cast<struct sockaddr *> (&ss),
&ss_len, SOCK_CLOEXEC);
} while (sock == retired_fd && errno == EINTR);
#else
const fd_t sock =
::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
fd_t sock;
do {
sock = ::accept (_s, reinterpret_cast<struct sockaddr *> (&ss), &ss_len);
} while (sock == retired_fd && errno == EINTR);
#endif

if (sock == retired_fd) {
Expand Down
5 changes: 4 additions & 1 deletion src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1045,8 +1045,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
return -1;
}
#else
int rc = select (maxfd + 1, inset.get (), outset.get (),
int rc;
do {
rc = select (maxfd + 1, inset.get (), outset.get (),
errset.get (), ptimeout);
} while (rc == -1 && errno == EINTR);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
Expand Down
Loading