Skip to content

Commit f3acf0a

Browse files
committed
connectd: Implement sending of start_batch
Implement the sending of `start_batch` and `protocol_batch_element` from `channeld` to `connectd`. Each real peer wire message is prefixed with `protocol_batch_element` so connectd can know the size of the message that were batched together. `connectd` intercepts `protocol_batch_element` messages and eats them (doesn’t forward them to peer) to get individual messages out of the batch. It needs this to be able to encrypt them individiaully. Afterwards it recombines the now encrypted messages into a single message to send over the wire to the peer. `channeld` remains responsible for making `start_batch` the first message of the message bundle.
1 parent b910086 commit f3acf0a

File tree

2 files changed

+140
-5
lines changed

2 files changed

+140
-5
lines changed

channeld/channeld.c

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,6 +1269,78 @@ static s64 sats_diff(struct amount_sat a, struct amount_sat b)
12691269
return (s64)a.satoshis - (s64)b.satoshis; /* Raw: splicing numbers can wrap! */
12701270
}
12711271

1272+
static void send_message_batch(struct peer *peer, u8 **msgs)
1273+
{
1274+
size_t size;
1275+
size_t hdr_size = tal_bytelen(towire_protocol_batch_element(tmpctx,
1276+
&peer->channel_id,
1277+
0));
1278+
u8 *batch_msg, *final_msg, *final_msg_ptr;
1279+
struct tlv_start_batch_tlvs *tlvs;
1280+
1281+
assert(tal_count(msgs) > 0);
1282+
1283+
/* When sending one message, no batching is required */
1284+
if (tal_count(msgs) == 1) {
1285+
peer_write(peer->pps, msgs[0]);
1286+
return;
1287+
}
1288+
1289+
/* We prefix each message with an interal wire type,
1290+
* protocol_batch_element. connectd will eat each message so they don't
1291+
* actually go out to the peer. It's just so connectd can chop up the
1292+
* message batch back out into individual messages. */
1293+
1294+
/* We start by calculating the total size */
1295+
size = 0;
1296+
1297+
/* Build the `start_batch` msg now so know it's size */
1298+
tlvs = tlv_start_batch_tlvs_new(tmpctx);
1299+
tlvs->batch_info = tal(tlvs, u16);
1300+
*tlvs->batch_info = WIRE_COMMITMENT_SIGNED;
1301+
batch_msg = towire_start_batch(tmpctx, &peer->channel_id,
1302+
tal_count(msgs), tlvs);
1303+
size += tal_bytelen(batch_msg) + hdr_size;
1304+
1305+
/* Count the size of all the messages in the batch */
1306+
for(u32 i = 0; i < tal_count(msgs); i++)
1307+
size += tal_bytelen(msgs[i]) + hdr_size;
1308+
1309+
/* Now we know the size of our `final_msg` so we allocate */
1310+
final_msg = tal_arr(tmpctx, u8, size);
1311+
final_msg_ptr = final_msg;
1312+
1313+
status_debug("proto_batch Building batch with %zu bytes, msgs: %zu",
1314+
size, tal_count(msgs));
1315+
1316+
/* Copy the bytes for `start_batch` prefix */
1317+
memcpy(final_msg_ptr,
1318+
towire_protocol_batch_element(tmpctx,
1319+
&peer->channel_id,
1320+
tal_bytelen(batch_msg)),
1321+
hdr_size);
1322+
final_msg_ptr += hdr_size;
1323+
1324+
memcpy(final_msg_ptr, batch_msg, tal_bytelen(batch_msg));
1325+
final_msg_ptr += tal_bytelen(batch_msg);
1326+
1327+
/* Now copy the bytes from all messages in `msgs` */
1328+
for(u32 i = 0; i < tal_count(msgs); i++) {
1329+
memcpy(final_msg_ptr,
1330+
towire_protocol_batch_element(tmpctx,
1331+
&peer->channel_id,
1332+
tal_bytelen(msgs[i])),
1333+
hdr_size);
1334+
final_msg_ptr += hdr_size;
1335+
1336+
memcpy(final_msg_ptr, msgs[i], tal_bytelen(msgs[i]));
1337+
final_msg_ptr += tal_bytelen(msgs[i]);
1338+
}
1339+
1340+
assert(final_msg + size == final_msg_ptr);
1341+
peer_write(peer->pps, take(final_msg));
1342+
}
1343+
12721344
static void send_commit(struct peer *peer)
12731345
{
12741346
const struct htlc **changed_htlcs;
@@ -1435,8 +1507,7 @@ static void send_commit(struct peer *peer)
14351507

14361508
peer->next_index[REMOTE]++;
14371509

1438-
for(u32 i = 0; i < tal_count(msgs); i++)
1439-
peer_write(peer->pps, take(msgs[i]));
1510+
send_message_batch(peer, msgs);
14401511

14411512
maybe_send_shutdown(peer);
14421513

@@ -5139,8 +5210,7 @@ static void resend_commitment(struct peer *peer, struct changed_htlc *last)
51395210
peer->splice_state->inflights[i]->remote_funding));
51405211
}
51415212

5142-
for(i = 0; i < tal_count(msgs); i++)
5143-
peer_write(peer->pps, take(msgs[i]));
5213+
send_message_batch(peer, msgs);
51445214

51455215
/* If we have already received the revocation for the previous, the
51465216
* other side shouldn't be asking for a retransmit! */

connectd/multiplex.c

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,66 @@ static struct io_plan *io_sock_shutdown_cb(struct io_conn *conn, struct peer *un
399399
return io_sock_shutdown(conn);
400400
}
401401

402+
/* Process and eat protocol_batch_element messages, encrypt each element message
403+
* and return the encrypted messages as one long byte array. */
404+
static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES)
405+
{
406+
u8 *ret = tal_arr(peer, u8, 0);
407+
size_t ret_size = 0;
408+
const u8 *cursor = msg;
409+
size_t plen = tal_count(msg);
410+
411+
status_debug("Processing batch elements of %zu bytes. %s", plen,
412+
tal_hex(tmpctx, msg));
413+
414+
do {
415+
u8 *element_bytes;
416+
u16 element_size;
417+
struct channel_id channel_id;
418+
u8 *enc_msg;
419+
420+
if (fromwire_u16(&cursor, &plen) != WIRE_PROTOCOL_BATCH_ELEMENT)
421+
status_failed(STATUS_FAIL_PROTO_BATCH,
422+
"process_batch_elements on msg that is"
423+
" not WIRE_PROTOCOL_BATCH_ELEMENT. %s",
424+
tal_hexstr(tmpctx, cursor, plen));
425+
426+
fromwire_channel_id(&cursor, &plen, &channel_id);
427+
428+
element_size = fromwire_u16(&cursor, &plen);
429+
if (!element_size)
430+
status_failed(STATUS_FAIL_PROTO_BATCH,
431+
"process_batch_elements cannot have zero"
432+
" length elements. %s",
433+
tal_hexstr(tmpctx, cursor, plen));
434+
435+
element_bytes = fromwire_tal_arrn(NULL, &cursor, &plen,
436+
element_size);
437+
if (!element_bytes)
438+
status_failed(STATUS_FAIL_PROTO_BATCH,
439+
"process_batch_elements fromwire_tal_arrn"
440+
" %s",
441+
tal_hexstr(tmpctx, cursor, plen));
442+
443+
status_debug("Processing batch extracted item %s. %s",
444+
peer_wire_name(fromwire_peektype(element_bytes)),
445+
tal_hex(tmpctx, element_bytes));
446+
447+
enc_msg = cryptomsg_encrypt_msg(tmpctx, &peer->cs,
448+
take(element_bytes));
449+
450+
tal_resize(&ret, ret_size + tal_bytelen(enc_msg));
451+
memcpy(&ret[ret_size], enc_msg, tal_bytelen(enc_msg));
452+
ret_size += tal_bytelen(enc_msg);
453+
454+
} while(plen);
455+
456+
if (taken(msg))
457+
tal_free(msg);
458+
459+
return ret;
460+
}
461+
402462
static struct io_plan *encrypt_and_send(struct peer *peer,
403463
const u8 *msg TAKES,
404464
struct io_plan *(*next)
@@ -442,8 +502,13 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
442502

443503
set_urgent_flag(peer, is_urgent(type));
444504

505+
/* Special message type directing us to process batch items. */
506+
if (type == WIRE_PROTOCOL_BATCH_ELEMENT)
507+
peer->sent_to_peer = process_batch_elements(peer, msg);
508+
else
509+
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
445510
/* We free this and the encrypted version in next write_to_peer */
446-
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
511+
447512
return io_write(peer->to_peer,
448513
peer->sent_to_peer,
449514
tal_bytelen(peer->sent_to_peer),

0 commit comments

Comments
 (0)