Skip to content

splice: Implement start_batch #8335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 124 additions & 37 deletions channeld/channeld.c
Original file line number Diff line number Diff line change
Expand Up @@ -1197,12 +1197,7 @@ static u8 *send_commit_part(const tal_t *ctx,
(int)splice_amnt, (int)remote_splice_amnt,
remote_index);

if (batch_size > 1) {
cs_tlv->splice_info = tal(cs_tlv, struct tlv_commitment_signed_tlvs_splice_info);

cs_tlv->splice_info->batch_size = batch_size;
cs_tlv->splice_info->funding_txid = funding->txid;
}
cs_tlv->splice_info = tal_dup(cs_tlv, struct bitcoin_txid, &funding->txid);
}

txs = channel_txs(tmpctx, funding, funding_sats, &htlc_map,
Expand Down Expand Up @@ -1274,6 +1269,78 @@ static s64 sats_diff(struct amount_sat a, struct amount_sat b)
return (s64)a.satoshis - (s64)b.satoshis; /* Raw: splicing numbers can wrap! */
}

static void send_message_batch(struct peer *peer, u8 **msgs)
{
size_t size;
size_t hdr_size = tal_bytelen(towire_protocol_batch_element(tmpctx,
&peer->channel_id,
0));
u8 *batch_msg, *final_msg, *final_msg_ptr;
struct tlv_start_batch_tlvs *tlvs;

assert(tal_count(msgs) > 0);

/* When sending one message, no batching is required */
if (tal_count(msgs) == 1) {
peer_write(peer->pps, msgs[0]);
return;
}

/* We prefix each message with an interal wire type,
* protocol_batch_element. connectd will eat each message so they don't
* actually go out to the peer. It's just so connectd can chop up the
* message batch back out into individual messages. */

/* We start by calculating the total size */
size = 0;

/* Build the `start_batch` msg now so know it's size */
tlvs = tlv_start_batch_tlvs_new(tmpctx);
tlvs->batch_info = tal(tlvs, u16);
*tlvs->batch_info = WIRE_COMMITMENT_SIGNED;
batch_msg = towire_start_batch(tmpctx, &peer->channel_id,
tal_count(msgs), tlvs);
size += tal_bytelen(batch_msg) + hdr_size;

/* Count the size of all the messages in the batch */
for(u32 i = 0; i < tal_count(msgs); i++)
size += tal_bytelen(msgs[i]) + hdr_size;

/* Now we know the size of our `final_msg` so we allocate */
final_msg = tal_arr(tmpctx, u8, size);
final_msg_ptr = final_msg;

status_debug("proto_batch Building batch with %zu bytes, msgs: %zu",
size, tal_count(msgs));

/* Copy the bytes for `start_batch` prefix */
memcpy(final_msg_ptr,
towire_protocol_batch_element(tmpctx,
&peer->channel_id,
tal_bytelen(batch_msg)),
hdr_size);
final_msg_ptr += hdr_size;

memcpy(final_msg_ptr, batch_msg, tal_bytelen(batch_msg));
final_msg_ptr += tal_bytelen(batch_msg);

/* Now copy the bytes from all messages in `msgs` */
for(u32 i = 0; i < tal_count(msgs); i++) {
memcpy(final_msg_ptr,
towire_protocol_batch_element(tmpctx,
&peer->channel_id,
tal_bytelen(msgs[i])),
hdr_size);
final_msg_ptr += hdr_size;

memcpy(final_msg_ptr, msgs[i], tal_bytelen(msgs[i]));
final_msg_ptr += tal_bytelen(msgs[i]);
}

assert(final_msg + size == final_msg_ptr);
peer_write(peer->pps, take(final_msg));
}

static void send_commit(struct peer *peer)
{
const struct htlc **changed_htlcs;
Expand Down Expand Up @@ -1440,8 +1507,7 @@ static void send_commit(struct peer *peer)

peer->next_index[REMOTE]++;

for(u32 i = 0; i < tal_count(msgs); i++)
peer_write(peer->pps, take(msgs[i]));
send_message_batch(peer, msgs);

maybe_send_shutdown(peer);

Expand Down Expand Up @@ -1963,11 +2029,11 @@ static struct commitsig_info *handle_peer_commit_sig(struct peer *peer,
if (peer->splice_state->await_commitment_succcess
&& !tal_count(peer->splice_state->inflights) && cs_tlv && cs_tlv->splice_info) {
if (!bitcoin_txid_eq(&peer->channel->funding.txid,
&cs_tlv->splice_info->funding_txid)) {
cs_tlv->splice_info)) {
status_info("Ignoring stale commit_sig for channel_id"
" %s, as %s is locked in now.",
fmt_bitcoin_txid(tmpctx,
&cs_tlv->splice_info->funding_txid),
cs_tlv->splice_info),
fmt_bitcoin_txid(tmpctx,
&peer->channel->funding.txid));
return NULL;
Expand Down Expand Up @@ -2017,22 +2083,17 @@ static struct commitsig_info *handle_peer_commit_sig(struct peer *peer,
outpoint = peer->splice_state->inflights[commit_index - 1]->outpoint;
funding_sats = peer->splice_state->inflights[commit_index - 1]->amnt;

if (cs_tlv && cs_tlv->splice_info
&& cs_tlv->splice_info->batch_size == 1)
peer_failed_err(peer->pps, &peer->channel_id,
"batch_size can never be 1");

status_debug("handle_peer_commit_sig for inflight outpoint %s",
fmt_bitcoin_txid(tmpctx, &outpoint.txid));

if (cs_tlv->splice_info
&& !bitcoin_txid_eq(&outpoint.txid,
&cs_tlv->splice_info->funding_txid))
cs_tlv->splice_info))
peer_failed_err(peer->pps, &peer->channel_id,
"Expected commit sig message for %s but"
" got %s",
fmt_bitcoin_txid(tmpctx, &outpoint.txid),
fmt_bitcoin_txid(tmpctx, &cs_tlv->splice_info->funding_txid));
fmt_bitcoin_txid(tmpctx, cs_tlv->splice_info));
}
else {
outpoint = peer->channel->funding;
Expand Down Expand Up @@ -2089,7 +2150,7 @@ static struct commitsig_info *handle_peer_commit_sig(struct peer *peer,
fmt_amount_sat(tmpctx, funding_sats),
cs_tlv && cs_tlv->splice_info
? fmt_bitcoin_txid(tmpctx,
&cs_tlv->splice_info->funding_txid)
cs_tlv->splice_info)
: "N/A",
peer->splice_state->await_commitment_succcess ? "yes"
: "no",
Expand Down Expand Up @@ -2253,7 +2314,7 @@ static int commit_index_from_msg(const u8 *msg, struct peer *peer)
if (!cs_tlv || !cs_tlv->splice_info)
return -1;

funding_txid = cs_tlv->splice_info->funding_txid;
funding_txid = *cs_tlv->splice_info;

if (bitcoin_txid_eq(&funding_txid, &peer->channel->funding.txid))
return 0;
Expand Down Expand Up @@ -2306,28 +2367,29 @@ static struct commitsig_info *handle_peer_commit_sig_batch(struct peer *peer,
s64 remote_splice_amnt,
u64 local_index,
const struct pubkey *local_per_commit,
bool allow_empty_commit)
bool allow_empty_commit,
u16 batch_size)
{
struct channel_id channel_id;
struct bitcoin_signature commit_sig;
secp256k1_ecdsa_signature *raw_sigs;
u16 batch_size;
const u8 **msg_batch;
enum peer_wire type;
struct tlv_commitment_signed_tlvs *cs_tlv
= tlv_commitment_signed_tlvs_new(tmpctx);
status_debug("fromwire_commitment_signed(%p) primary", msg);
check_tx_abort(peer, msg, NULL);
type = fromwire_peektype(msg);
if (type != WIRE_COMMITMENT_SIGNED)
peer_failed_err(peer->pps, &peer->channel_id,
"Expected WIRE_COMMITMENT_SIGNED but got %s.",
peer_wire_name(type));
if (!fromwire_commitment_signed(tmpctx, msg,
&channel_id, &commit_sig.s, &raw_sigs,
&cs_tlv))
peer_failed_warn(peer->pps, &peer->channel_id,
"Bad commit_sig %s", tal_hex(msg, msg));

/* Default batch_size is 1 */
batch_size = 1;
if (cs_tlv->splice_info && cs_tlv->splice_info->batch_size)
batch_size = cs_tlv->splice_info->batch_size;

msg_batch = tal_arr(tmpctx, const u8*, batch_size);
msg_batch[0] = msg;
status_debug("msg_batch[0]: %p", msg_batch[0]);
Expand Down Expand Up @@ -2363,14 +2425,6 @@ static struct commitsig_info *handle_peer_commit_sig_batch(struct peer *peer,
" splice_info",
tal_hex(sub_msg, sub_msg), i, batch_size);

if (!sub_cs_tlv->splice_info
|| sub_cs_tlv->splice_info->batch_size != batch_size)
peer_failed_err(peer->pps, &peer->channel_id,
"batch_size value mismatch in"
" commit_sig bundle, item [%"PRIu16
"/%"PRIu16"] %s", i, batch_size,
tal_hex(sub_msg, sub_msg));

msg_batch[i] = sub_msg;
status_debug("msg_batch[%d]: %p", (int)i, msg_batch[i]);
}
Expand All @@ -2385,6 +2439,35 @@ static struct commitsig_info *handle_peer_commit_sig_batch(struct peer *peer,
allow_empty_commit, msg_batch);
}

static void handle_peer_start_batch(struct peer *peer, const u8 *msg)
{
u16 batch_size;
struct channel_id channel_id;
struct tlv_start_batch_tlvs *tlvs;
if (!fromwire_start_batch(tmpctx, msg, &channel_id, &batch_size, &tlvs))
peer_failed_warn(peer->pps, &peer->channel_id,
"Bad start_batch %s", tal_hex(msg, msg));

if (!tlvs || !tlvs->batch_info
|| *tlvs->batch_info != WIRE_COMMITMENT_SIGNED) {
status_unusual("Ignoring Unrecognized start_batch message type"
" %s, expected WIRE_COMMITMENT_SIGNED.",
tlvs && tlvs->batch_info
? peer_wire_name(*tlvs->batch_info)
: "N/A");
return;
}

handle_peer_commit_sig_batch(peer, peer_read(tmpctx, peer->pps), 0,
peer->channel->funding_pubkey[REMOTE],
NULL, 0, 0,
peer->next_index[LOCAL],
&peer->next_local_per_commit,
false,
batch_size);
}


/* Pops the penalty base for the given commitnum from our internal list. There
* may not be one, in which case we return NULL and leave the list
* unmodified. */
Expand Down Expand Up @@ -4850,13 +4933,17 @@ static void peer_in(struct peer *peer, const u8 *msg)
case WIRE_UPDATE_ADD_HTLC:
handle_peer_add_htlc(peer, msg);
return;
case WIRE_START_BATCH:
handle_peer_start_batch(peer, msg);
return;
case WIRE_COMMITMENT_SIGNED:
handle_peer_commit_sig_batch(peer, msg, 0,
peer->channel->funding_pubkey[REMOTE],
NULL, 0, 0,
peer->next_index[LOCAL],
&peer->next_local_per_commit,
false);
false,
1);
return;
case WIRE_UPDATE_FEE:
handle_peer_feechange(peer, msg);
Expand Down Expand Up @@ -4920,6 +5007,7 @@ static void peer_in(struct peer *peer, const u8 *msg)
return;

/* These are all swallowed by connectd */
case WIRE_PROTOCOL_BATCH_ELEMENT:
case WIRE_CHANNEL_ANNOUNCEMENT:
case WIRE_CHANNEL_UPDATE:
case WIRE_NODE_ANNOUNCEMENT:
Expand Down Expand Up @@ -5122,8 +5210,7 @@ static void resend_commitment(struct peer *peer, struct changed_htlc *last)
peer->splice_state->inflights[i]->remote_funding));
}

for(i = 0; i < tal_count(msgs); i++)
peer_write(peer->pps, take(msgs[i]));
send_message_batch(peer, msgs);

/* If we have already received the revocation for the previous, the
* other side shouldn't be asking for a retransmit! */
Expand Down
2 changes: 2 additions & 0 deletions common/gossmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,8 @@ const void *gossmap_stream_next(const tal_t *ctx,
case WIRE_UPDATE_FULFILL_HTLC:
case WIRE_UPDATE_FAIL_HTLC:
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
case WIRE_PROTOCOL_BATCH_ELEMENT:
case WIRE_START_BATCH:
case WIRE_COMMITMENT_SIGNED:
case WIRE_REVOKE_AND_ACK:
case WIRE_UPDATE_FEE:
Expand Down
4 changes: 4 additions & 0 deletions common/interactivetx.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ static u8 *read_next_msg(const tal_t *ctx,
case WIRE_UPDATE_FULFILL_HTLC:
case WIRE_UPDATE_FAIL_HTLC:
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
case WIRE_PROTOCOL_BATCH_ELEMENT:
case WIRE_START_BATCH:
case WIRE_COMMITMENT_SIGNED:
case WIRE_REVOKE_AND_ACK:
case WIRE_UPDATE_FEE:
Expand Down Expand Up @@ -771,6 +773,8 @@ char *process_interactivetx_updates(const tal_t *ctx,
case WIRE_UPDATE_FULFILL_HTLC:
case WIRE_UPDATE_FAIL_HTLC:
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
case WIRE_PROTOCOL_BATCH_ELEMENT:
case WIRE_START_BATCH:
case WIRE_COMMITMENT_SIGNED:
case WIRE_REVOKE_AND_ACK:
case WIRE_UPDATE_FEE:
Expand Down
3 changes: 3 additions & 0 deletions common/status_levels.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ enum status_failreason {
/* Gossipd sent unknown/malformed command, or fd failed */
STATUS_FAIL_GOSSIP_IO,

/* Connect daemon received a malformed protocol_batch_element */
STATUS_FAIL_PROTO_BATCH,

/* Other internal error. */
STATUS_FAIL_INTERNAL_ERROR,
};
Expand Down
2 changes: 2 additions & 0 deletions connectd/gossip_rcvd_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ static bool is_msg_gossip_broadcast(const u8 *cursor)
case WIRE_UPDATE_FULFILL_HTLC:
case WIRE_UPDATE_FAIL_HTLC:
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
case WIRE_PROTOCOL_BATCH_ELEMENT:
case WIRE_START_BATCH:
case WIRE_COMMITMENT_SIGNED:
case WIRE_REVOKE_AND_ACK:
case WIRE_UPDATE_FEE:
Expand Down
2 changes: 2 additions & 0 deletions connectd/gossip_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ static bool public_msg_type(enum peer_wire type)
case WIRE_UPDATE_FULFILL_HTLC:
case WIRE_UPDATE_FAIL_HTLC:
case WIRE_UPDATE_FAIL_MALFORMED_HTLC:
case WIRE_PROTOCOL_BATCH_ELEMENT:
case WIRE_START_BATCH:
case WIRE_COMMITMENT_SIGNED:
case WIRE_REVOKE_AND_ACK:
case WIRE_UPDATE_FEE:
Expand Down
Loading