Skip to content

Commit 136e08f

Browse files
committed
Implement Flow for OffersMessageFlow
1 parent 5bf464c commit 136e08f

File tree

3 files changed

+254
-7
lines changed

3 files changed

+254
-7
lines changed

lightning/src/ln/channel.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1807,7 +1807,7 @@ pub(super) struct ChannelContext<SP: Deref> where SP::Target: SignerProvider {
18071807
/// Either the height at which this channel was created or the height at which it was last
18081808
/// serialized if it was serialized by versions prior to 0.0.103.
18091809
/// We use this to close if funding is never broadcasted.
1810-
pub(super) channel_creation_height: u32,
1810+
pub(crate) channel_creation_height: u32,
18111811

18121812
counterparty_dust_limit_satoshis: u64,
18131813

lightning/src/ln/channelmanager.rs

+21-4
Original file line numberDiff line numberDiff line change
@@ -1349,11 +1349,11 @@ impl Readable for Option<RAAMonitorUpdateBlockingAction> {
13491349
}
13501350

13511351
/// State we hold per-peer.
1352-
pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
1352+
pub(crate) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13531353
/// `channel_id` -> `Channel`
13541354
///
13551355
/// Holds all channels where the peer is the counterparty.
1356-
pub(super) channel_by_id: HashMap<ChannelId, Channel<SP>>,
1356+
pub(crate) channel_by_id: HashMap<ChannelId, Channel<SP>>,
13571357
/// `temporary_channel_id` -> `InboundChannelRequest`.
13581358
///
13591359
/// When manual channel acceptance is enabled, this holds all unaccepted inbound channels where
@@ -1362,7 +1362,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13621362
/// the channel is rejected, then the entry is simply removed.
13631363
pub(super) inbound_channel_request_by_id: HashMap<ChannelId, InboundChannelRequest>,
13641364
/// The latest `InitFeatures` we heard from the peer.
1365-
latest_features: InitFeatures,
1365+
pub(crate) latest_features: InitFeatures,
13661366
/// Messages to send to the peer - pushed to in the same lock that they are generated in (except
13671367
/// for broadcast messages, where ordering isn't as strict).
13681368
pub(super) pending_msg_events: Vec<MessageSendEvent>,
@@ -10202,7 +10202,7 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
1020210202
/// Sending multiple requests increases the chances of successful delivery in case some
1020310203
/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
1020410204
/// even if multiple invoices are received.
10205-
const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
10205+
pub(crate) const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
1020610206

1020710207
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, MR, L>
1020810208
where
@@ -10812,6 +10812,23 @@ where
1081210812
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
1081310813
}
1081410814

10815+
fn peer_for_blinded_path(&self) -> Vec<MessageForwardNode> {
10816+
self.per_peer_state.read().unwrap()
10817+
.iter()
10818+
.map(|(node_id, peer_state)| (node_id, peer_state.lock().unwrap()))
10819+
.filter(|(_, peer)| peer.is_connected)
10820+
.filter(|(_, peer)| peer.latest_features.supports_onion_messages())
10821+
.map(|(node_id, peer)| MessageForwardNode {
10822+
node_id: *node_id,
10823+
short_channel_id: peer.channel_by_id
10824+
.iter()
10825+
.filter(|(_, channel)| channel.context().is_usable())
10826+
.min_by_key(|(_, channel)| channel.context().channel_creation_height)
10827+
.and_then(|(_, channel)| channel.context().get_short_channel_id()),
10828+
})
10829+
.collect::<Vec<_>>()
10830+
}
10831+
1081510832
/// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
1081610833
/// [`Router::create_blinded_payment_paths`].
1081710834
fn create_blinded_payment_paths(

lightning/src/offers/flow.rs

+232-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use bitcoin::{secp256k1, Network};
1616
use types::payment::PaymentHash;
1717
use crate::blinded_path::message::{BlindedMessagePath, MessageContext, MessageForwardNode, OffersContext};
1818
use crate::blinded_path::payment::BlindedPaymentPath;
19-
use crate::ln::channelmanager::PaymentId;
19+
use crate::ln::channelmanager::{PaymentId, MAX_SHORT_LIVED_RELATIVE_EXPIRY, OFFERS_MESSAGE_REQUEST_LIMIT};
2020
use crate::ln::inbound_payment;
2121
use crate::onion_message::dns_resolution::{DNSSECQuery, HumanReadableName};
2222
use crate::sign::EntropySource;
@@ -142,4 +142,234 @@ where
142142
pub fn get_our_node_id(&self) -> PublicKey {
143143
self.our_network_pubkey
144144
}
145-
}
145+
146+
fn duration_since_epoch(&self) -> Duration {
147+
#[cfg(not(feature = "std"))]
148+
let now = Duration::from_secs(
149+
self.highest_seen_timestamp.load(Ordering::Acquire) as u64
150+
);
151+
#[cfg(feature = "std")]
152+
let now = std::time::SystemTime::now()
153+
.duration_since(std::time::SystemTime::UNIX_EPOCH)
154+
.expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH");
155+
156+
now
157+
}
158+
}
159+
160+
impl<ES: Deref, MR: Deref> Flow for OffersMessageFlow<ES, MR>
161+
where
162+
ES::Target: EntropySource,
163+
MR::Target: MessageRouter,
164+
{
165+
fn create_offer_builder(&self, nonce: Nonce) -> Result<OfferBuilder<DerivedMetadata, secp256k1::All>, Bolt12SemanticError> {
166+
let node_id = self.get_our_node_id();
167+
let expanded_key = &self.inbound_payment_key;
168+
let secp_ctx = &self.secp_ctx;
169+
170+
let builder = OfferBuilder::deriving_signing_pubkey(node_id, expanded_key, nonce, secp_ctx)
171+
.chain_hash(self.chain_hash);
172+
173+
Ok(builder)
174+
}
175+
176+
fn create_refund_builder(&self, amount_msats: u64, absolute_expiry: Duration, payment_id: PaymentId, nonce: Nonce) -> Result<RefundBuilder<secp256k1::All>, Bolt12SemanticError> {
177+
let node_id = self.get_our_node_id();
178+
let expanded_key = &self.inbound_payment_key;
179+
let secp_ctx = &self.secp_ctx;
180+
181+
let builder = RefundBuilder::deriving_signing_pubkey(
182+
node_id, expanded_key, nonce, secp_ctx, amount_msats, payment_id
183+
)?
184+
.chain_hash(self.chain_hash)
185+
.absolute_expiry(absolute_expiry);
186+
187+
Ok(builder)
188+
}
189+
190+
fn create_invoice_request_builder<'a>(
191+
&'a self, offer: &'a Offer, nonce: Nonce, quantity: Option<u64>, amount_msats: Option<u64>,
192+
payer_note: Option<String>, human_readable_name: Option<HumanReadableName>, payment_id: PaymentId
193+
) -> Result<InvoiceRequestBuilder<'a, 'a, secp256k1::All>, Bolt12SemanticError> {
194+
let expanded_key = &self.inbound_payment_key;
195+
let secp_ctx = &self.secp_ctx;
196+
197+
let builder = offer
198+
.request_invoice(expanded_key, nonce, secp_ctx, payment_id)?
199+
.chain_hash(self.chain_hash)?;
200+
201+
let builder = match quantity {
202+
None => builder,
203+
Some(quantity) => builder.quantity(quantity)?,
204+
};
205+
let builder = match amount_msats {
206+
None => builder,
207+
Some(amount_msats) => builder.amount_msats(amount_msats)?,
208+
};
209+
let builder = match payer_note {
210+
None => builder,
211+
Some(payer_note) => builder.payer_note(payer_note),
212+
};
213+
let builder = match human_readable_name {
214+
None => builder,
215+
Some(hrn) => builder.sourced_from_human_readable_name(hrn),
216+
};
217+
218+
Ok(builder.into())
219+
}
220+
221+
fn create_invoice_builder<'a>(
222+
&'a self, refund: &'a Refund, payment_paths: Vec<BlindedPaymentPath>, payment_hash: PaymentHash
223+
) -> Result<InvoiceBuilder<'a, DerivedSigningPubkey>, Bolt12SemanticError> {
224+
let expanded_key = &self.inbound_payment_key;
225+
let entropy = &*self.entropy_source;
226+
227+
#[cfg(feature = "std")]
228+
let builder = refund.respond_using_derived_keys(
229+
payment_paths, payment_hash, expanded_key, entropy
230+
)?;
231+
#[cfg(not(feature = "std"))]
232+
let created_at = Duration::from_secs(
233+
self.highest_seen_timestamp.load(Ordering::Acquire) as u64
234+
);
235+
#[cfg(not(feature = "std"))]
236+
let builder = refund.respond_using_derived_keys_no_std(
237+
payment_paths, payment_hash, created_at, expanded_key, entropy
238+
)?;
239+
let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
240+
241+
Ok(builder)
242+
}
243+
244+
fn create_blinded_paths(&self, peers: Vec<MessageForwardNode>, context: MessageContext) -> Result<Vec<BlindedMessagePath>, ()> {
245+
let recipient = self.get_our_node_id();
246+
let secp_ctx = &self.secp_ctx;
247+
248+
let peers = peers
249+
.iter()
250+
.map(|node| node.node_id)
251+
.collect::<Vec<_>>();
252+
253+
self.message_router
254+
.create_blinded_paths(recipient, context, peers, secp_ctx)
255+
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
256+
}
257+
258+
fn create_compact_blinded_paths(&self, peers: Vec<MessageForwardNode>, context: OffersContext) -> Result<Vec<BlindedMessagePath>, ()> {
259+
let recipient = self.get_our_node_id();
260+
let secp_ctx = &self.secp_ctx;
261+
262+
self.message_router
263+
.create_compact_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
264+
.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
265+
}
266+
267+
fn create_blinded_paths_using_absolute_expiry(&self, peers: Vec<MessageForwardNode>, context: OffersContext, absolute_expiry: Option<Duration>) -> Result<Vec<BlindedMessagePath>, ()> {
268+
let now = self.duration_since_epoch();
269+
let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY);
270+
271+
if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry {
272+
self.create_compact_blinded_paths(peers, context)
273+
} else {
274+
self.create_blinded_paths(peers, MessageContext::Offers(context))
275+
}
276+
}
277+
278+
fn enqueue_invoice_request(
279+
&self, invoice_request: InvoiceRequest, reply_paths: Vec<BlindedMessagePath>
280+
) -> Result<(), Bolt12SemanticError> {
281+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
282+
if !invoice_request.paths().is_empty() {
283+
reply_paths
284+
.iter()
285+
.flat_map(|reply_path| invoice_request.paths().iter().map(move |path| (path, reply_path)))
286+
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
287+
.for_each(|(path, reply_path)| {
288+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
289+
destination: Destination::BlindedPath(path.clone()),
290+
reply_path: reply_path.clone(),
291+
};
292+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
293+
pending_offers_messages.push((message, instructions));
294+
});
295+
} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
296+
for reply_path in reply_paths {
297+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
298+
destination: Destination::Node(node_id),
299+
reply_path,
300+
};
301+
let message = OffersMessage::InvoiceRequest(invoice_request.clone());
302+
pending_offers_messages.push((message, instructions));
303+
}
304+
} else {
305+
debug_assert!(false);
306+
return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
307+
}
308+
309+
Ok(())
310+
}
311+
312+
fn enqueue_invoice(
313+
&self, invoice: Bolt12Invoice, refund: &Refund, reply_paths: Vec<BlindedMessagePath>
314+
) -> Result<(), Bolt12SemanticError> {
315+
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
316+
if refund.paths().is_empty() {
317+
for reply_path in reply_paths {
318+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
319+
destination: Destination::Node(refund.payer_signing_pubkey()),
320+
reply_path,
321+
};
322+
let message = OffersMessage::Invoice(invoice.clone());
323+
pending_offers_messages.push((message, instructions));
324+
}
325+
} else {
326+
reply_paths
327+
.iter()
328+
.flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path)))
329+
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
330+
.for_each(|(path, reply_path)| {
331+
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
332+
destination: Destination::BlindedPath(path.clone()),
333+
reply_path: reply_path.clone(),
334+
};
335+
let message = OffersMessage::Invoice(invoice.clone());
336+
pending_offers_messages.push((message, instructions));
337+
});
338+
}
339+
340+
Ok(())
341+
}
342+
343+
fn enqueue_dns_onion_message(
344+
&self, message: DNSSECQuery, dns_resolvers: Vec<Destination>, reply_paths: Vec<BlindedMessagePath>
345+
) -> Result<(), Bolt12SemanticError> {
346+
let message_params = dns_resolvers
347+
.iter()
348+
.flat_map(|destination| reply_paths.iter().map(move |path| (path, destination)))
349+
.take(OFFERS_MESSAGE_REQUEST_LIMIT);
350+
for (reply_path, destination) in message_params {
351+
self.pending_dns_onion_messages.lock().unwrap().push((
352+
DNSResolverMessage::DNSSECQuery(message.clone()),
353+
MessageSendInstructions::WithSpecifiedReplyPath {
354+
destination: destination.clone(),
355+
reply_path: reply_path.clone(),
356+
},
357+
));
358+
}
359+
360+
Ok(())
361+
}
362+
363+
fn get_and_clear_pending_offers_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
364+
core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
365+
}
366+
367+
fn get_and_clear_pending_async_messages(&self) -> Vec<(AsyncPaymentsMessage, MessageSendInstructions)> {
368+
core::mem::take(&mut self.pending_async_payments_messages.lock().unwrap())
369+
}
370+
371+
#[cfg(feature = "dnssec")]
372+
fn get_and_clear_pending_dns_messages(&self) -> Vec<(DNSResolverMessage, MessageSendInstructions)> {
373+
core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
374+
}
375+
}

0 commit comments

Comments
 (0)