Skip to content

Commit 35f9ceb

Browse files
authored
RFC 12 Implementation: Coordination layer groundwork (#3744)
Refs: keep-network/tbtc-v2#737 Here we present the first part of the changes meant to implement [RFC 12: Decentralized wallet coordination](https://github.com/keep-network/tbtc-v2/blob/main/docs/rfc/rfc-12.adoc) in the tBTC wallet client. This pull request focuses on the groundwork necessary to run the off-chain coordination layer. ### Coordination layer orchestration The node orchestrates the coordination layer upon startup. Specifically, it runs two separate goroutines: - Coordination window watch - Coordination result processor Both goroutines are steered by the node's root context and communicate through a dedicated channel. ### Coordination window watch The coordination window watch goroutine is responsible for detecting coordination windows that occur every `900` blocks. Once a window is detected, it runs the window handler function and passes the window data as an argument. The window watch process guarantees that the window handler is called only once for the given coordination window. Moreover, it also guarantees that the window handler is called for all coordination windows. This is achieved by calling each handler in a separate goroutine so the watch loop does not block for long and the chance of missing a coordination window signal is negligible. The window handler function triggers the window processing logic. Specific steps of that logic are: 1. Determine the list of wallets controlled by the given node 2. Take a coordination executor for each wallet 3. Use the coordination executors to run the coordination procedure for each wallet (in parallel) 4. Collect coordination results and push them to the processing channel ### Coordination executor The coordination executor is a component that is responsible for running the coordination procedure for the given wallet and coordination window. It is designed to encapsulate the logic of the procedure (coordination seed, communication, and so on). It also ensures that only one instance of the procedure is executed at a time. The executor is also responsible for assembling the coordination procedure's result and reporting all coordination faults detected during execution. The design of the coordination executor is inspired by the existing signing and DKG executor. It attempts to fit the coordination procedure's logic into the existing codebase in an elegant way. ### Coordination result processor The coordination result processor goroutine listens for incoming coordination results and triggers the result handler function in a separate goroutine to ensure all results are processed independently. The result handler function triggers the result processing logic. Specific steps of that logic are: 1. Record coordination faults reported in the result 2. Detect the type of the action proposal being part of the result 3. Fire the appropriate proposal handler The proposal handlers are part of the existing codebase. They are responsible for the orchestration and execution of the proposed wallet actions. ### Intersection with the existing chain-based coordination mechanism The presented groundwork was built alongside the existing chain-based coordination mechanism. Some initial integration steps around data types were done. The existing mechanism will be gradually removed and replaced in the follow-up pull requests. ### Next steps The next steps (coarse-grained) on the way towards RFC 12 implementation are: - Implement coordination procedure logic (i.e. implement the `coordinationExecutor.coordinate` method) - Finalize coordination result processing (i.e. implement the `processCoordinationResult` function and refactor `node`'s handlers appropriately) - Remove the existing chain-based mechanism (i.e. detach `WalletCoordinator`'s events handlers and remove unnecessary code from `chain.go`) - Modify the SPV maintainter to not rely on `WalletCoordinator`'s events during unproven transactions lookup
2 parents 73697d1 + 5c7513d commit 35f9ceb

15 files changed

+1040
-45
lines changed

pkg/chain/local_v1/blockcounter.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type watcher struct {
2020
channel chan uint64
2121
}
2222

23-
var blockTime = time.Duration(500 * time.Millisecond)
23+
var defaultBlockTime = 500 * time.Millisecond
2424

2525
func (lbc *localBlockCounter) WaitForBlockHeight(blockNumber uint64) error {
2626
waiter, err := lbc.BlockHeightWaiter(blockNumber)
@@ -88,8 +88,16 @@ func (lbc *localBlockCounter) WatchBlocks(ctx context.Context) <-chan uint64 {
8888

8989
// count is an internal function that counts up time to simulate the generation
9090
// of blocks.
91-
func (lbc *localBlockCounter) count() {
92-
ticker := time.NewTicker(blockTime)
91+
func (lbc *localBlockCounter) count(blockTime ...time.Duration) {
92+
var resolvedBlockTime time.Duration
93+
switch len(blockTime) {
94+
case 1:
95+
resolvedBlockTime = blockTime[0]
96+
default:
97+
resolvedBlockTime = defaultBlockTime
98+
}
99+
100+
ticker := time.NewTicker(resolvedBlockTime)
93101

94102
for range ticker.C {
95103
lbc.structMutex.Lock()
@@ -127,10 +135,10 @@ func (lbc *localBlockCounter) count() {
127135
// BlockCounter creates a BlockCounter that runs completely locally. It is
128136
// designed to simply increase block height at a set time interval in the
129137
// background.
130-
func BlockCounter() (chain.BlockCounter, error) {
138+
func BlockCounter(blockTime ...time.Duration) (chain.BlockCounter, error) {
131139
counter := localBlockCounter{blockHeight: 0, waiters: make(map[uint64][]chan uint64)}
132140

133-
go counter.count()
141+
go counter.count(blockTime...)
134142

135143
return &counter, nil
136144
}

pkg/chain/local_v1/local_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -339,13 +339,13 @@ func TestLocalBlockHeightWaiter(t *testing.T) {
339339
},
340340
"returns immediately for block height already reached": {
341341
blockHeight: 2,
342-
initialDelay: 3 * blockTime,
342+
initialDelay: 3 * defaultBlockTime,
343343
expectedWaitTime: 0,
344344
},
345345
"waits for block height not yet reached": {
346346
blockHeight: 5,
347-
initialDelay: 2 * blockTime,
348-
expectedWaitTime: 3 * blockTime,
347+
initialDelay: 2 * defaultBlockTime,
348+
expectedWaitTime: 3 * defaultBlockTime,
349349
},
350350
}
351351

pkg/tbtc/chain.go

+6-18
Original file line numberDiff line numberDiff line change
@@ -363,26 +363,19 @@ type WalletCoordinatorChain interface {
363363

364364
// HeartbeatRequestSubmittedEvent represents a wallet heartbeat request
365365
// submitted to the chain.
366+
//
367+
// TODO: Remove this type and all related code.
366368
type HeartbeatRequestSubmittedEvent struct {
367369
WalletPublicKeyHash [20]byte
368370
Message []byte
369371
Coordinator chain.Address
370372
BlockNumber uint64
371373
}
372374

373-
// DepositSweepProposal represents a deposit sweep proposal submitted to the chain.
374-
type DepositSweepProposal struct {
375-
WalletPublicKeyHash [20]byte
376-
DepositsKeys []struct {
377-
FundingTxHash bitcoin.Hash
378-
FundingOutputIndex uint32
379-
}
380-
SweepTxFee *big.Int
381-
DepositsRevealBlocks []*big.Int
382-
}
383-
384375
// DepositSweepProposalSubmittedEvent represents a deposit sweep proposal
385376
// submission event.
377+
//
378+
// TODO: Remove this type and all related code.
386379
type DepositSweepProposalSubmittedEvent struct {
387380
Proposal *DepositSweepProposal
388381
Coordinator chain.Address
@@ -404,6 +397,8 @@ type DepositSweepProposalSubmittedEventFilter struct {
404397

405398
// RedemptionProposalSubmittedEvent represents a redemption proposal
406399
// submission event.
400+
//
401+
// TODO: Remove this type and all related code.
407402
type RedemptionProposalSubmittedEvent struct {
408403
Proposal *RedemptionProposal
409404
Coordinator chain.Address
@@ -423,13 +418,6 @@ type RedemptionProposalSubmittedEventFilter struct {
423418
WalletPublicKeyHash [20]byte
424419
}
425420

426-
// RedemptionProposal represents a redemption proposal submitted to the chain.
427-
type RedemptionProposal struct {
428-
WalletPublicKeyHash [20]byte
429-
RedeemersOutputScripts []bitcoin.Script
430-
RedemptionTxFee *big.Int
431-
}
432-
433421
// RedemptionRequestedEvent represents a redemption requested event.
434422
type RedemptionRequestedEvent struct {
435423
WalletPublicKeyHash [20]byte

pkg/tbtc/chain_test.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -794,19 +794,22 @@ func buildRedemptionProposalValidationKey(
794794
}
795795

796796
// Connect sets up the local chain.
797-
func Connect() *localChain {
797+
func Connect(blockTime ...time.Duration) *localChain {
798798
operatorPrivateKey, _, err := operator.GenerateKeyPair(local_v1.DefaultCurve)
799799
if err != nil {
800800
panic(err)
801801
}
802802

803-
return ConnectWithKey(operatorPrivateKey)
803+
return ConnectWithKey(operatorPrivateKey, blockTime...)
804804
}
805805

806806
// ConnectWithKey sets up the local chain using the provided operator private
807807
// key.
808-
func ConnectWithKey(operatorPrivateKey *operator.PrivateKey) *localChain {
809-
blockCounter, _ := local_v1.BlockCounter()
808+
func ConnectWithKey(
809+
operatorPrivateKey *operator.PrivateKey,
810+
blockTime ...time.Duration,
811+
) *localChain {
812+
blockCounter, _ := local_v1.BlockCounter(blockTime...)
810813

811814
localChain := &localChain{
812815
dkgResultSubmissionHandlers: make(

pkg/tbtc/coordination.go

+257
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package tbtc
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/keep-network/keep-core/pkg/chain"
7+
"github.com/keep-network/keep-core/pkg/generator"
8+
"github.com/keep-network/keep-core/pkg/net"
9+
"github.com/keep-network/keep-core/pkg/protocol/group"
10+
"golang.org/x/sync/semaphore"
11+
)
12+
13+
const (
14+
// coordinationFrequencyBlocks is the number of blocks between two
15+
// consecutive coordination windows.
16+
coordinationFrequencyBlocks = 900
17+
// coordinationActivePhaseDurationBlocks is the number of blocks in the
18+
// active phase of the coordination window. The active phase is the
19+
// phase during which the communication between the coordination leader and
20+
// their followers is allowed.
21+
coordinationActivePhaseDurationBlocks = 80
22+
// coordinationPassivePhaseDurationBlocks is the number of blocks in the
23+
// passive phase of the coordination window. The passive phase is the
24+
// phase during which communication is not allowed. Participants are
25+
// expected to validate the result of the coordination and prepare for
26+
// execution of the proposed wallet action.
27+
coordinationPassivePhaseDurationBlocks = 20
28+
// coordinationDurationBlocks is the number of blocks in a single
29+
// coordination window.
30+
coordinationDurationBlocks = coordinationActivePhaseDurationBlocks +
31+
coordinationPassivePhaseDurationBlocks
32+
)
33+
34+
// errCoordinationExecutorBusy is an error returned when the coordination
35+
// executor cannot execute the requested coordination due to an ongoing one.
36+
var errCoordinationExecutorBusy = fmt.Errorf("coordination executor is busy")
37+
38+
// coordinationWindow represents a single coordination window. The coordination
39+
// block is the first block of the window.
40+
type coordinationWindow struct {
41+
// coordinationBlock is the first block of the coordination window.
42+
coordinationBlock uint64
43+
}
44+
45+
// newCoordinationWindow creates a new coordination window for the given
46+
// coordination block.
47+
func newCoordinationWindow(coordinationBlock uint64) *coordinationWindow {
48+
return &coordinationWindow{
49+
coordinationBlock: coordinationBlock,
50+
}
51+
}
52+
53+
// ActivePhaseEndBlock returns the block number at which the active phase
54+
// of the coordination window ends.
55+
func (cw *coordinationWindow) activePhaseEndBlock() uint64 {
56+
return cw.coordinationBlock + coordinationActivePhaseDurationBlocks
57+
}
58+
59+
// EndBlock returns the block number at which the coordination window ends.
60+
func (cw *coordinationWindow) endBlock() uint64 {
61+
return cw.coordinationBlock + coordinationDurationBlocks
62+
}
63+
64+
// isAfter returns true if this coordination window is after the other
65+
// window.
66+
func (cw *coordinationWindow) isAfter(other *coordinationWindow) bool {
67+
if other == nil {
68+
return true
69+
}
70+
71+
return cw.coordinationBlock > other.coordinationBlock
72+
}
73+
74+
// watchCoordinationWindows watches for new coordination windows and runs
75+
// the given callback when a new window is detected. The callback is run
76+
// in a separate goroutine. It is guaranteed that the callback is not run
77+
// twice for the same window. The context passed as the first parameter
78+
// is used to cancel the watch.
79+
func watchCoordinationWindows(
80+
ctx context.Context,
81+
watchBlocksFn func(ctx context.Context) <-chan uint64,
82+
onWindowFn func(window *coordinationWindow),
83+
) {
84+
blocksChan := watchBlocksFn(ctx)
85+
var lastWindow *coordinationWindow
86+
87+
for {
88+
select {
89+
case block := <-blocksChan:
90+
if block%coordinationFrequencyBlocks == 0 {
91+
// Make sure the current window is not the same as the last one.
92+
// There is no guarantee that the block channel will not emit
93+
// the same block again.
94+
if window := newCoordinationWindow(block); window.isAfter(lastWindow) {
95+
lastWindow = window
96+
// Run the callback in a separate goroutine to avoid blocking
97+
// this loop and potentially missing the next block.
98+
go onWindowFn(window)
99+
}
100+
}
101+
case <-ctx.Done():
102+
return
103+
}
104+
}
105+
}
106+
107+
// CoordinationFaultType represents a type of the coordination fault.
108+
type CoordinationFaultType uint8
109+
110+
const (
111+
// FaultUnknown is a fault type used when the fault type is unknown.
112+
FaultUnknown CoordinationFaultType = iota
113+
// FaultLeaderIdleness is a fault type used when the leader was idle, i.e.
114+
// missed their turn to propose a wallet action.
115+
FaultLeaderIdleness
116+
// FaultLeaderMistake is a fault type used when the leader's proposal
117+
// turned out to be invalid.
118+
FaultLeaderMistake
119+
// FaultLeaderImpersonation is a fault type used when the leader was
120+
// impersonated by another operator who raised their own proposal.
121+
FaultLeaderImpersonation
122+
)
123+
124+
func (cft CoordinationFaultType) String() string {
125+
switch cft {
126+
case FaultUnknown:
127+
return "Unknown"
128+
case FaultLeaderIdleness:
129+
return "LeaderIdleness"
130+
case FaultLeaderMistake:
131+
return "FaultLeaderMistake"
132+
case FaultLeaderImpersonation:
133+
return "LeaderImpersonation"
134+
default:
135+
panic("unknown coordination fault type")
136+
}
137+
}
138+
139+
// coordinationFault represents a single coordination fault.
140+
type coordinationFault struct {
141+
// culprit is the address of the operator that is responsible for the fault.
142+
culprit chain.Address
143+
// faultType is the type of the fault.
144+
faultType CoordinationFaultType
145+
}
146+
147+
func (cf *coordinationFault) String() string {
148+
return fmt.Sprintf(
149+
"operator [%s], fault [%s]",
150+
cf.culprit,
151+
cf.faultType,
152+
)
153+
}
154+
155+
// coordinationProposal represents a single action proposal for the given wallet.
156+
type coordinationProposal interface {
157+
// actionType returns the specific type of the walletAction being subject
158+
// of this proposal.
159+
actionType() WalletActionType
160+
// validityBlocks returns the number of blocks for which the proposal is
161+
// valid.
162+
validityBlocks() uint64
163+
}
164+
165+
// noopProposal is a proposal that does not propose any action.
166+
type noopProposal struct{}
167+
168+
func (np *noopProposal) actionType() WalletActionType {
169+
return ActionNoop
170+
}
171+
172+
func (np *noopProposal) validityBlocks() uint64 {
173+
// Panic to make sure that the proposal is not processed by the node.
174+
panic("noop proposal does not have validity blocks")
175+
}
176+
177+
// coordinationResult represents the result of the coordination procedure
178+
// executed for the given wallet in the given coordination window.
179+
type coordinationResult struct {
180+
wallet wallet
181+
window *coordinationWindow
182+
leader chain.Address
183+
proposal coordinationProposal
184+
faults []*coordinationFault
185+
}
186+
187+
func (cr *coordinationResult) String() string {
188+
return fmt.Sprintf(
189+
"wallet [%s], window [%v], leader [%s], proposal [%s], faults [%s]",
190+
&cr.wallet,
191+
cr.window.coordinationBlock,
192+
cr.leader,
193+
cr.proposal.actionType(),
194+
cr.faults,
195+
)
196+
}
197+
198+
// coordinationExecutor is responsible for executing the coordination
199+
// procedure for the given wallet.
200+
type coordinationExecutor struct {
201+
lock *semaphore.Weighted
202+
203+
signers []*signer // TODO: Do we need whole signers?
204+
broadcastChannel net.BroadcastChannel
205+
membershipValidator *group.MembershipValidator
206+
protocolLatch *generator.ProtocolLatch
207+
}
208+
209+
// newCoordinationExecutor creates a new coordination executor for the
210+
// given wallet.
211+
func newCoordinationExecutor(
212+
signers []*signer,
213+
broadcastChannel net.BroadcastChannel,
214+
membershipValidator *group.MembershipValidator,
215+
protocolLatch *generator.ProtocolLatch,
216+
) *coordinationExecutor {
217+
return &coordinationExecutor{
218+
lock: semaphore.NewWeighted(1),
219+
signers: signers,
220+
broadcastChannel: broadcastChannel,
221+
membershipValidator: membershipValidator,
222+
protocolLatch: protocolLatch,
223+
}
224+
}
225+
226+
// wallet returns the wallet this executor is responsible for.
227+
func (ce *coordinationExecutor) wallet() wallet {
228+
// All signers belong to one wallet. Take that wallet from the
229+
// first signer.
230+
return ce.signers[0].wallet
231+
}
232+
233+
// coordinate executes the coordination procedure for the given coordination
234+
// window.
235+
func (ce *coordinationExecutor) coordinate(
236+
window *coordinationWindow,
237+
) (*coordinationResult, error) {
238+
if lockAcquired := ce.lock.TryAcquire(1); !lockAcquired {
239+
return nil, errCoordinationExecutorBusy
240+
}
241+
defer ce.lock.Release(1)
242+
243+
// TODO: Implement coordination logic. Remember about:
244+
// - Setting up the right context
245+
// - Using the protocol latch
246+
// - Using the membership validator
247+
// Example result:
248+
result := &coordinationResult{
249+
wallet: ce.wallet(),
250+
window: window,
251+
leader: ce.wallet().signingGroupOperators[0],
252+
proposal: &noopProposal{},
253+
faults: nil,
254+
}
255+
256+
return result, nil
257+
}

0 commit comments

Comments
 (0)