Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi rfq receive (AddInvoice multiple hop hints) #1457

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6665348
rfqmsg: Htlc.SumAssetBalance requires specifier checker
GeorgeTsagk Feb 27, 2025
4ddf460
rfq: policies use specifier checker
GeorgeTsagk Mar 26, 2025
08752b8
rfq: populate assetID field in interceptor response
GeorgeTsagk Mar 26, 2025
8a22cc0
multi: move rfq marshal code into rfq package
GeorgeTsagk Mar 26, 2025
3ebcce5
tapchannel: aux invoice manager uses specifier checker
GeorgeTsagk Feb 27, 2025
b61c791
tapchannel: add group key tests to AuxInvoiceManager
GeorgeTsagk Feb 27, 2025
3d704c2
rfq: don't return error on empty groupkey lookup
GeorgeTsagk Mar 5, 2025
33f55b1
rfq: asset specifier matcher checks against groupkey hash
GeorgeTsagk Mar 5, 2025
32f4184
tapchannel: make ProduceHtlcExtraData groupkey aware
GeorgeTsagk Mar 5, 2025
49a0de8
taprpc: add groupkey to SendPaymentRequest
GeorgeTsagk Mar 5, 2025
5a1ab5f
rpcserver: add asset specifier marshaller
GeorgeTsagk Mar 26, 2025
a8ebcaf
rpcserver: SendPayment uses groupkey
GeorgeTsagk Mar 5, 2025
a486a7b
taprpc: add groupkey to AddInvoice
GeorgeTsagk Mar 5, 2025
99e85a7
rpcserver: AddInvoice uses groupkey
GeorgeTsagk Mar 5, 2025
46a2f28
multi: move rfq channel helpers to rfq manager
GeorgeTsagk Mar 27, 2025
f5f7ae4
rfq+rpcserver: RfqChannel returns map of peers to channels
GeorgeTsagk Mar 27, 2025
b4eeae1
rfq: add RfqToHopHint helper in manager
GeorgeTsagk Mar 27, 2025
ea6446a
rpcserver: add AcquireBuyOrder helper
GeorgeTsagk Mar 27, 2025
f97fef1
rpcserver: AddInvoices supports multi-rfq
GeorgeTsagk Mar 27, 2025
c5d02b1
taprpc: update AddInvoice documentation
GeorgeTsagk Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions rfq/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/schnorr"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/taproot-assets/address"
"github.com/lightninglabs/taproot-assets/asset"
"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/rfqmsg"
Expand Down Expand Up @@ -232,6 +235,7 @@ func (m *Manager) startSubsystems(ctx context.Context) error {
HtlcInterceptor: m.cfg.HtlcInterceptor,
HtlcSubscriber: m.cfg.HtlcSubscriber,
AcceptHtlcEvents: m.acceptHtlcEvents,
SpecifierChecker: m.AssetMatchesSpecifier,
})
if err != nil {
return fmt.Errorf("error initializing RFQ order handler: %w",
Expand Down Expand Up @@ -948,6 +952,10 @@ func (m *Manager) getAssetGroupKey(ctx context.Context,
// Perform the DB query.
group, err := m.cfg.GroupLookup.QueryAssetGroup(ctx, id)
if err != nil {
if errors.Is(err, address.ErrAssetGroupUnknown) {
return fn.None[btcec.PublicKey](), nil
}

return fn.None[btcec.PublicKey](), err
}

Expand All @@ -971,6 +979,18 @@ func (m *Manager) AssetMatchesSpecifier(ctx context.Context,

switch {
case specifier.HasGroupPubKey():
specifierGK := specifier.UnwrapGroupKeyToPtr()

// Let's directly check if the ID is equal to the hash of the
// group key. This is used by the sender to indicate that any
// asset that belongs to this group may be used.
groupKeyX := schnorr.SerializePubKey(specifierGK)
if asset.ID(groupKeyX) == id {
return true, nil
}

// Now let's make an actual query to find this assetID's group,
// if it exists.
group, err := m.getAssetGroupKey(ctx, id)
if err != nil {
return false, err
Expand All @@ -980,8 +1000,6 @@ func (m *Manager) AssetMatchesSpecifier(ctx context.Context,
return false, nil
}

specifierGK := specifier.UnwrapGroupKeyToPtr()

return group.UnwrapToPtr().IsEqual(specifierGK), nil

case specifier.HasId():
Expand Down
173 changes: 173 additions & 0 deletions rfq/marshal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package rfq

import (
"fmt"

"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/rfqmath"
"github.com/lightninglabs/taproot-assets/rfqmsg"
"github.com/lightninglabs/taproot-assets/taprpc/rfqrpc"
)

// MarshalAcceptedSellQuoteEvent marshals a peer accepted sell quote event to
// its RPC representation.
func MarshalAcceptedSellQuoteEvent(
event *PeerAcceptedSellQuoteEvent) *rfqrpc.PeerAcceptedSellQuote {

return MarshalAcceptedSellQuote(event.SellAccept)
}

// MarshalAcceptedSellQuote marshals a peer accepted sell quote to its RPC
// representation.
func MarshalAcceptedSellQuote(
accept rfqmsg.SellAccept) *rfqrpc.PeerAcceptedSellQuote {

rpcAssetRate := &rfqrpc.FixedPoint{
Coefficient: accept.AssetRate.Rate.Coefficient.String(),
Scale: uint32(accept.AssetRate.Rate.Scale),
}

// Calculate the equivalent asset units for the given total BTC amount
// based on the asset-to-BTC conversion rate.
numAssetUnits := rfqmath.MilliSatoshiToUnits(
accept.Request.PaymentMaxAmt, accept.AssetRate.Rate,
)

minTransportableMSat := rfqmath.MinTransportableMSat(
rfqmath.DefaultOnChainHtlcMSat, accept.AssetRate.Rate,
)

return &rfqrpc.PeerAcceptedSellQuote{
Peer: accept.Peer.String(),
Id: accept.ID[:],
Scid: uint64(accept.ShortChannelId()),
BidAssetRate: rpcAssetRate,
Expiry: uint64(accept.AssetRate.Expiry.Unix()),
AssetAmount: numAssetUnits.ScaleTo(0).ToUint64(),
MinTransportableMsat: uint64(minTransportableMSat),
}
}

// MarshalAcceptedBuyQuoteEvent marshals a peer accepted buy quote event to
// its rpc representation.
func MarshalAcceptedBuyQuoteEvent(
event *PeerAcceptedBuyQuoteEvent) (*rfqrpc.PeerAcceptedBuyQuote,
error) {

// We now calculate the minimum amount of asset units that can be
// transported within a single HTLC for this asset at the given rate.
// This corresponds to the 354 satoshi minimum non-dust HTLC value.
minTransportableUnits := rfqmath.MinTransportableUnits(
rfqmath.DefaultOnChainHtlcMSat, event.AssetRate.Rate,
).ScaleTo(0).ToUint64()

return &rfqrpc.PeerAcceptedBuyQuote{
Peer: event.Peer.String(),
Id: event.ID[:],
Scid: uint64(event.ShortChannelId()),
AssetMaxAmount: event.Request.AssetMaxAmt,
AskAssetRate: &rfqrpc.FixedPoint{
Coefficient: event.AssetRate.Rate.Coefficient.String(),
Scale: uint32(event.AssetRate.Rate.Scale),
},
Expiry: uint64(event.AssetRate.Expiry.Unix()),
MinTransportableUnits: minTransportableUnits,
}, nil
}

// MarshalInvalidQuoteRespEvent marshals an invalid quote response event to
// its rpc representation.
func MarshalInvalidQuoteRespEvent(
event *InvalidQuoteRespEvent) *rfqrpc.InvalidQuoteResponse {

peer := event.QuoteResponse.MsgPeer()
id := event.QuoteResponse.MsgID()

return &rfqrpc.InvalidQuoteResponse{
Status: rfqrpc.QuoteRespStatus(event.Status),
Peer: peer.String(),
Id: id[:],
}
}

// MarshalIncomingRejectQuoteEvent marshals an incoming reject quote event to
// its RPC representation.
func MarshalIncomingRejectQuoteEvent(
event *IncomingRejectQuoteEvent) *rfqrpc.RejectedQuoteResponse {

return &rfqrpc.RejectedQuoteResponse{
Peer: event.Peer.String(),
Id: event.ID.Val[:],
ErrorMessage: event.Err.Val.Msg,
ErrorCode: uint32(event.Err.Val.Code),
}
}

// NewAddAssetBuyOrderResponse creates a new AddAssetBuyOrderResponse from
// the given RFQ event.
func NewAddAssetBuyOrderResponse(
event fn.Event) (*rfqrpc.AddAssetBuyOrderResponse, error) {

resp := &rfqrpc.AddAssetBuyOrderResponse{}

switch e := event.(type) {
case *PeerAcceptedBuyQuoteEvent:
acceptedQuote, err := MarshalAcceptedBuyQuoteEvent(e)
if err != nil {
return nil, err
}

resp.Response = &rfqrpc.AddAssetBuyOrderResponse_AcceptedQuote{
AcceptedQuote: acceptedQuote,
}
return resp, nil

case *InvalidQuoteRespEvent:
resp.Response = &rfqrpc.AddAssetBuyOrderResponse_InvalidQuote{
InvalidQuote: MarshalInvalidQuoteRespEvent(e),
}
return resp, nil

case *IncomingRejectQuoteEvent:
resp.Response = &rfqrpc.AddAssetBuyOrderResponse_RejectedQuote{
RejectedQuote: MarshalIncomingRejectQuoteEvent(e),
}
return resp, nil

default:
return nil, fmt.Errorf("unknown AddAssetBuyOrder event "+
"type: %T", e)
}
}

// NewAddAssetSellOrderResponse creates a new AddAssetSellOrderResponse from
// the given RFQ event.
func NewAddAssetSellOrderResponse(
event fn.Event) (*rfqrpc.AddAssetSellOrderResponse, error) {

resp := &rfqrpc.AddAssetSellOrderResponse{}

switch e := event.(type) {
case *PeerAcceptedSellQuoteEvent:
resp.Response = &rfqrpc.AddAssetSellOrderResponse_AcceptedQuote{
AcceptedQuote: MarshalAcceptedSellQuoteEvent(e),
}
return resp, nil

case *InvalidQuoteRespEvent:
resp.Response = &rfqrpc.AddAssetSellOrderResponse_InvalidQuote{
InvalidQuote: MarshalInvalidQuoteRespEvent(e),
}
return resp, nil

case *IncomingRejectQuoteEvent:
resp.Response = &rfqrpc.AddAssetSellOrderResponse_RejectedQuote{
RejectedQuote: MarshalIncomingRejectQuoteEvent(e),
}
return resp, nil

default:
return nil, fmt.Errorf("unknown AddAssetSellOrder event "+
"type: %T", e)
}
}
59 changes: 42 additions & 17 deletions rfq/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/btcsuite/btcd/btcec/v2/schnorr"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/taproot-assets/asset"
Expand Down Expand Up @@ -58,7 +59,8 @@ type SerialisedScid = rfqmsg.SerialisedScid
type Policy interface {
// CheckHtlcCompliance returns an error if the given HTLC intercept
// descriptor does not satisfy the subject policy.
CheckHtlcCompliance(htlc lndclient.InterceptedHtlc) error
CheckHtlcCompliance(ctx context.Context, htlc lndclient.InterceptedHtlc,
specifierChecker rfqmsg.SpecifierChecker) error

// Expiry returns the policy's expiry time as a unix timestamp.
Expiry() uint64
Expand Down Expand Up @@ -145,8 +147,8 @@ func NewAssetSalePolicy(quote rfqmsg.BuyAccept) *AssetSalePolicy {
// included as a hop hint within the invoice. The SCID is the only piece of
// information used to determine the policy applicable to the HTLC. As a result,
// HTLC custom records are not expected to be present.
func (c *AssetSalePolicy) CheckHtlcCompliance(
htlc lndclient.InterceptedHtlc) error {
func (c *AssetSalePolicy) CheckHtlcCompliance(_ context.Context,
htlc lndclient.InterceptedHtlc, _ rfqmsg.SpecifierChecker) error {

// Since we will be reading CurrentAmountMsat value we acquire a read
// lock.
Expand Down Expand Up @@ -248,11 +250,23 @@ func (c *AssetSalePolicy) GenerateInterceptorResponse(

outgoingAmt := rfqmath.DefaultOnChainHtlcMSat

// Unpack asset ID.
assetID, err := c.AssetSpecifier.UnwrapIdOrErr()
if err != nil {
return nil, fmt.Errorf("asset sale policy has no asset ID: %w",
err)
var assetID asset.ID

// We have performed checks for the asset IDs inside the HTLC against
// the specifier's group key in a previous step. Here we just need to
// provide a dummy value as the asset ID. The real asset IDs will be
// carefully picked in a later step in the process. What really matters
// now is the total amount.
switch {
case c.AssetSpecifier.HasGroupPubKey():
groupKey := c.AssetSpecifier.UnwrapGroupKeyToPtr()
groupKeyX := schnorr.SerializePubKey(groupKey)

assetID = asset.ID(groupKeyX)

case c.AssetSpecifier.HasId():
specifierID := *c.AssetSpecifier.UnwrapIdToPtr()
copy(assetID[:], specifierID[:])
}

// Compute the outgoing asset amount given the msat outgoing amount and
Expand Down Expand Up @@ -341,8 +355,9 @@ func NewAssetPurchasePolicy(quote rfqmsg.SellAccept) *AssetPurchasePolicy {

// CheckHtlcCompliance returns an error if the given HTLC intercept descriptor
// does not satisfy the subject policy.
func (c *AssetPurchasePolicy) CheckHtlcCompliance(
htlc lndclient.InterceptedHtlc) error {
func (c *AssetPurchasePolicy) CheckHtlcCompliance(ctx context.Context,
htlc lndclient.InterceptedHtlc,
specifierChecker rfqmsg.SpecifierChecker) error {

// Since we will be reading CurrentAmountMsat value we acquire a read
// lock.
Expand All @@ -368,7 +383,9 @@ func (c *AssetPurchasePolicy) CheckHtlcCompliance(
}

// Sum the asset balance in the HTLC record.
assetAmt, err := htlcRecord.SumAssetBalance(c.AssetSpecifier)
assetAmt, err := htlcRecord.SumAssetBalance(
ctx, c.AssetSpecifier, specifierChecker,
)
if err != nil {
return fmt.Errorf("error summing asset balance: %w", err)
}
Expand Down Expand Up @@ -523,15 +540,19 @@ func NewAssetForwardPolicy(incoming, outgoing Policy) (*AssetForwardPolicy,

// CheckHtlcCompliance returns an error if the given HTLC intercept descriptor
// does not satisfy the subject policy.
func (a *AssetForwardPolicy) CheckHtlcCompliance(
htlc lndclient.InterceptedHtlc) error {
func (a *AssetForwardPolicy) CheckHtlcCompliance(ctx context.Context,
htlc lndclient.InterceptedHtlc, sChk rfqmsg.SpecifierChecker) error {

if err := a.incomingPolicy.CheckHtlcCompliance(htlc); err != nil {
if err := a.incomingPolicy.CheckHtlcCompliance(
ctx, htlc, sChk,
); err != nil {
return fmt.Errorf("error checking forward policy, inbound "+
"HTLC does not comply with policy: %w", err)
}

if err := a.outgoingPolicy.CheckHtlcCompliance(htlc); err != nil {
if err := a.outgoingPolicy.CheckHtlcCompliance(
ctx, htlc, sChk,
); err != nil {
return fmt.Errorf("error checking forward policy, outbound "+
"HTLC does not comply with policy: %w", err)
}
Expand Down Expand Up @@ -642,6 +663,10 @@ type OrderHandlerCfg struct {
// HtlcSubscriber is a subscriber that is used to retrieve live HTLC
// event updates.
HtlcSubscriber HtlcSubscriber

// SpecifierChecker is an interface that contains methods for
// checking certain properties related to asset specifiers.
SpecifierChecker rfqmsg.SpecifierChecker
}

// OrderHandler orchestrates management of accepted quote bundles. It monitors
Expand Down Expand Up @@ -684,7 +709,7 @@ func NewOrderHandler(cfg OrderHandlerCfg) (*OrderHandler, error) {
//
// NOTE: This function must be thread safe. It is used by an external
// interceptor service.
func (h *OrderHandler) handleIncomingHtlc(_ context.Context,
func (h *OrderHandler) handleIncomingHtlc(ctx context.Context,
htlc lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse,
error) {

Expand Down Expand Up @@ -716,7 +741,7 @@ func (h *OrderHandler) handleIncomingHtlc(_ context.Context,
// At this point, we know that a policy exists and has not expired
// whilst sitting in the local cache. We can now check that the HTLC
// complies with the policy.
err = policy.CheckHtlcCompliance(htlc)
err = policy.CheckHtlcCompliance(ctx, htlc, h.cfg.SpecifierChecker)
if err != nil {
log.Warnf("HTLC does not comply with policy: %v "+
"(HTLC=%v, policy=%v)", err, htlc, policy)
Expand Down
Loading