diff --git a/connectd/connectd.c b/connectd/connectd.c index 69fe522a1e7c..068e4ab56986 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -147,6 +147,7 @@ static struct peer *new_peer(struct daemon *daemon, peer->sent_to_peer = NULL; peer->urgent = false; peer->draining = false; + peer->peer_in_lastmsg = -1; peer->peer_outq = msg_queue_new(peer, false); peer->last_recv_time = time_now(); peer->is_websocket = is_websocket; diff --git a/connectd/connectd.h b/connectd/connectd.h index d43179d966dd..8b4a224b3acb 100644 --- a/connectd/connectd.h +++ b/connectd/connectd.h @@ -96,6 +96,10 @@ struct peer { /* Last time we received traffic */ struct timeabs last_recv_time; + /* How long have we been ignoring peer input? */ + struct timemono peer_in_lasttime; + int peer_in_lastmsg; + /* Ratelimits for onion messages. One token per msec. */ size_t onionmsg_incoming_tokens; struct timemono onionmsg_last_incoming; diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 49eccbb3d4cc..6ca9dc17dda7 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -1073,6 +1073,16 @@ static struct io_plan *write_to_subd(struct io_conn *subd_conn, /* Tell them to read again. */ io_wake(&subd->peer->peer_in); + if (subd->peer->peer_in_lastmsg != -1) { + u64 msec = time_to_msec(timemono_between(time_mono(), + subd->peer->peer_in_lasttime)); + if (msec > 1000) + status_peer_broken(&subd->peer->id, + "wake delay for %s: %"PRIu64"msec", + peer_wire_name(subd->peer->peer_in_lastmsg), + msec); + subd->peer->peer_in_lastmsg = -1; + } /* Wait for them to wake us */ return msg_queue_wait(subd_conn, subd->outq, @@ -1242,8 +1252,20 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn, close_subd_timeout, subd)); } - /* Wait for them to wake us */ - return io_wait(peer_conn, &peer->peer_in, next_read, peer); + /* We used to io_wait after every message, but that means we don't read + * *non-channel* messages (gossip, pings) either. So as a compromise, + * we allow a handful of messages to be queued before we ignore the + * peer until we've drained the outgoing queue. */ + if (msg_queue_length(subd->outq) > 5) { + /* Wait for them to wake us (oldest packet) */ + if (peer->peer_in_lastmsg == -1) { + peer->peer_in_lastmsg = type; + peer->peer_in_lasttime = time_mono(); + } + + return io_wait(peer_conn, &peer->peer_in, next_read, peer); + } + return next_read(peer_conn, peer); } static struct io_plan *read_body_from_peer(struct io_conn *peer_conn, diff --git a/devtools/gossipwith.c b/devtools/gossipwith.c index c6f427f544e2..f506787cd4e3 100644 --- a/devtools/gossipwith.c +++ b/devtools/gossipwith.c @@ -30,6 +30,7 @@ static bool no_init = false; static bool handle_pings = false; static bool hex = false; static bool explicit_network = false; +static bool no_early_close = false; static int timeout_after = -1; static u8 *features; @@ -154,6 +155,8 @@ static u8 *sync_crypto_read(const tal_t *ctx, int peer_fd, struct crypto_state * if (!read_all(peer_fd, hdr, sizeof(hdr))) { status_debug("Failed reading header: %s", strerror(errno)); + if (no_early_close) + exit(1); exit(0); } @@ -237,8 +240,12 @@ static struct io_plan *handshake_success(struct io_conn *conn, u8 *msg; if (poll(pollfd, ARRAY_SIZE(pollfd), - timeout_after < 0 ? -1 : timeout_after * 1000) == 0) - return 0; + timeout_after < 0 ? -1 : timeout_after * 1000) == 0) { + /* Timeout */ + if (no_early_close) + exit(1); + exit(0); + } /* We always to stdin first if we can */ if (pollfd[0].revents & POLLIN) { @@ -288,6 +295,8 @@ static struct io_plan *handshake_success(struct io_conn *conn, err(1, "failed to shutdown write to peer: %s", strerror(errno)); while (sync_crypto_read(NULL, peer_fd, cs)); + if (max_messages != 0 && no_early_close) + exit(1); exit(0); } @@ -368,6 +377,8 @@ int main(int argc, char *argv[]) "Select the network parameters (bitcoin, testnet, signet," " regtest, liquid, liquid-regtest, litecoin or" " litecoin-testnet)"); + opt_register_noarg("--must-get-max-messages", opt_set_bool, &no_early_close, + "Fail with exit code 1 unless we reach maximum messages"); opt_register_noarg("--help|-h", opt_usage_and_exit, "id@addr[:port] [hex-msg-tosend...]\n" "Connect to a lightning peer and relay gossip messages from it", @@ -436,5 +447,6 @@ int main(int argc, char *argv[]) initiator_handshake(conn, &us, &them, &addr, NULL, NORMAL_SOCKET, handshake_success, argv+2); - exit(0); + /* Unreachable */ + abort(); }