Skip to content

Commit 135888d

Browse files
committed
channeldb: write to new update index for ChannelEdgePolicy2
1 parent b6410bf commit 135888d

File tree

8 files changed

+298
-109
lines changed

8 files changed

+298
-109
lines changed

channeldb/edge_policy.go

Lines changed: 97 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,89 @@ const (
3939
edgePolicy2EncodingType edgePolicyEncodingType = 0
4040
)
4141

42+
type edgePolicyEncodingInfo struct {
43+
updateBucketKey []byte
44+
updateKey []byte
45+
serialize func(w io.Writer, toNode []byte) error
46+
typeByte func() (edgePolicyEncodingType, bool)
47+
}
48+
49+
func encodingInfoFromEdgePolicy(policy models.ChannelEdgePolicy) (
50+
*edgePolicyEncodingInfo, error) {
51+
52+
switch p := policy.(type) {
53+
case *models.ChannelEdgePolicy1:
54+
updateUnix := uint64(p.LastUpdate.Unix())
55+
var indexKey [8 + 8]byte
56+
byteOrder.PutUint64(indexKey[:8], updateUnix)
57+
byteOrder.PutUint64(indexKey[8:], p.ChannelID)
58+
59+
return &edgePolicyEncodingInfo{
60+
updateBucketKey: edgeUpdateIndexBucket,
61+
updateKey: indexKey[:],
62+
serialize: func(w io.Writer, toNode []byte) error {
63+
copy(p.ToNode[:], toNode)
64+
65+
return serializeChanEdgePolicy1(w, p)
66+
},
67+
typeByte: func() (edgePolicyEncodingType, bool) {
68+
return 0, false
69+
},
70+
}, nil
71+
72+
case *models.ChannelEdgePolicy2:
73+
indexKey := make([]byte, 4+8)
74+
byteOrder.PutUint32(
75+
indexKey[:4], p.BlockHeight.Val,
76+
)
77+
byteOrder.PutUint64(
78+
indexKey[4:], p.ShortChannelID.Val.ToUint64(),
79+
)
80+
81+
return &edgePolicyEncodingInfo{
82+
updateBucketKey: edgeUpdate2IndexBucket,
83+
updateKey: indexKey,
84+
serialize: func(w io.Writer, toNode []byte) error {
85+
copy(p.ToNode[:], toNode)
86+
87+
return serializeChanEdgePolicy2(w, p)
88+
},
89+
typeByte: func() (edgePolicyEncodingType, bool) {
90+
return edgePolicy2EncodingType, true
91+
},
92+
}, nil
93+
94+
default:
95+
return nil, fmt.Errorf("unhandled implementation of the "+
96+
"models.ChannelEdgePolicy interface: %T", policy)
97+
}
98+
}
99+
42100
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy1,
43101
from, to []byte) error {
44102

103+
encodingInfo, err := encodingInfoFromEdgePolicy(edge)
104+
if err != nil {
105+
return err
106+
}
107+
108+
chanID := edge.SCID().ToUint64()
109+
45110
var edgeKey [33 + 8]byte
46111
copy(edgeKey[:], from)
47-
byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
112+
byteOrder.PutUint64(edgeKey[33:], chanID)
48113

49114
var b bytes.Buffer
50-
if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
115+
if err := serializeChanEdgePolicy(&b, encodingInfo, to); err != nil {
51116
return err
52117
}
53118

54119
// Before we write out the new edge, we'll create a new entry in the
55120
// update index in order to keep it fresh.
56-
updateUnix := uint64(edge.LastUpdate.Unix())
57-
var indexKey [8 + 8]byte
58-
byteOrder.PutUint64(indexKey[:8], updateUnix)
59-
byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
60-
61-
updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
121+
indexKey := encodingInfo.updateKey
122+
updateIndex, err := edges.CreateBucketIfNotExists(
123+
encodingInfo.updateBucketKey,
124+
)
62125
if err != nil {
63126
return err
64127
}
@@ -88,32 +151,36 @@ func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy1,
88151
return err
89152
}
90153

91-
oldPol, ok := oldEdgePolicy.(*models.ChannelEdgePolicy1)
92-
if !ok {
93-
return fmt.Errorf("expected "+
94-
"*models.ChannelEdgePolicy1, got: %T",
95-
oldEdgePolicy)
154+
info, err := encodingInfoFromEdgePolicy(oldEdgePolicy)
155+
if err != nil {
156+
return err
96157
}
97158

98-
oldUpdateTime := uint64(oldPol.LastUpdate.Unix())
99-
100-
var oldIndexKey [8 + 8]byte
101-
byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
102-
byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
159+
// Sanity check that the old update is assigned to the same
160+
// update bucket as the new one.
161+
if !bytes.Equal(
162+
info.updateBucketKey, encodingInfo.updateBucketKey,
163+
) {
164+
165+
return fmt.Errorf("received a new update belonging "+
166+
"to bucket %s where previous update for the "+
167+
"same channel belonged to bucket %s",
168+
string(encodingInfo.updateBucketKey),
169+
string(info.updateKey))
170+
}
103171

104-
if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
172+
oldIndexKey := info.updateKey
173+
if err := updateIndex.Delete(oldIndexKey); err != nil {
105174
return err
106175
}
107176
}
108177

109-
if err := updateIndex.Put(indexKey[:], nil); err != nil {
178+
if err := updateIndex.Put(indexKey, nil); err != nil {
110179
return err
111180
}
112181

113182
err = updateEdgePolicyDisabledIndex(
114-
edges, edge.ChannelID,
115-
edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
116-
edge.IsDisabled(),
183+
edges, chanID, !edge.IsNode1(), edge.IsDisabled(),
117184
)
118185
if err != nil {
119186
return err
@@ -172,7 +239,7 @@ func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
172239
}
173240

174241
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
175-
nodePub []byte) (*models.ChannelEdgePolicy1, error) {
242+
nodePub []byte) (models.ChannelEdgePolicy, error) {
176243

177244
var edgeKey [33 + 8]byte
178245
copy(edgeKey[:], nodePub)
@@ -201,17 +268,11 @@ func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
201268
return nil, err
202269
}
203270

204-
pol, ok := ep.(*models.ChannelEdgePolicy1)
205-
if !ok {
206-
return nil, fmt.Errorf("expected *models.ChannelEdgePolicy1, "+
207-
"got: %T", ep)
208-
}
209-
210-
return pol, nil
271+
return ep, nil
211272
}
212273

213274
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
214-
chanID []byte) (*models.ChannelEdgePolicy1, *models.ChannelEdgePolicy1,
275+
chanID []byte) (models.ChannelEdgePolicy, models.ChannelEdgePolicy,
215276
error) {
216277

217278
edgeInfoBytes := edgeIndex.Get(chanID)
@@ -239,37 +300,10 @@ func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
239300
return edge1, edge2, nil
240301
}
241302

242-
func serializeChanEdgePolicy(w io.Writer,
243-
edgePolicy models.ChannelEdgePolicy, toNode []byte) error {
244-
245-
var (
246-
withTypeByte bool
247-
typeByte edgePolicyEncodingType
248-
serialize func(w io.Writer) error
249-
)
250-
251-
switch policy := edgePolicy.(type) {
252-
case *models.ChannelEdgePolicy1:
253-
serialize = func(w io.Writer) error {
254-
copy(policy.ToNode[:], toNode)
255-
256-
return serializeChanEdgePolicy1(w, policy)
257-
}
258-
case *models.ChannelEdgePolicy2:
259-
withTypeByte = true
260-
typeByte = edgePolicy2EncodingType
261-
262-
serialize = func(w io.Writer) error {
263-
copy(policy.ToNode[:], toNode)
264-
265-
return serializeChanEdgePolicy2(w, policy)
266-
}
267-
default:
268-
return fmt.Errorf("unhandled implementation of "+
269-
"ChannelEdgePolicy: %T", edgePolicy)
270-
}
303+
func serializeChanEdgePolicy(w io.Writer, info *edgePolicyEncodingInfo,
304+
toNode []byte) error {
271305

272-
if withTypeByte {
306+
if typeByte, ok := info.typeByte(); ok {
273307
// First, write the identifying encoding byte to signal that
274308
// this is not using the legacy encoding.
275309
_, err := w.Write([]byte{chanEdgePolicyNewEncodingPrefix})
@@ -284,7 +318,7 @@ func serializeChanEdgePolicy(w io.Writer,
284318
}
285319
}
286320

287-
return serialize(w)
321+
return info.serialize(w, toNode)
288322
}
289323

290324
func serializeChanEdgePolicy1(w io.Writer,

channeldb/edge_policy_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ func TestEdgePolicySerialisation(t *testing.T) {
2626
toNode = info.GetToNode()
2727
)
2828

29-
err := serializeChanEdgePolicy(&b, info, toNode[:])
29+
encodingInfo, err := encodingInfoFromEdgePolicy(info)
30+
require.NoError(t, err)
31+
32+
err = serializeChanEdgePolicy(&b, encodingInfo, toNode[:])
3033
require.NoError(t, err)
3134

3235
newInfo, err := deserializeChanEdgePolicy(&b)

0 commit comments

Comments
 (0)