Skip to content

Commit 2cc3d0d

Browse files
committed
refactor: improve QBFT instance cleanup with duty-aware deadlines
Replace slot-based cleanup with duty-specific beacon chain inclusion deadlines. This allows QBFT instances to progress through all configured rounds without premature removal. Key changes: - Separate instance identity from manager metadata using ManagedInstance wrapper - Calculate duty-specific deadlines per EIP-7045 (attestations valid until end of epoch E+1) - Add slots_per_epoch configuration parameter - Implement dual-trigger cleaner (completion notification + deadline timeout) - Eliminate placeholder values in instance IDs Fixes instances being cleaned after 2 slots, now properly respecting beacon chain inclusion windows (32-63 slots for attestations).
1 parent 82eaa98 commit 2cc3d0d

File tree

4 files changed

+150
-41
lines changed

4 files changed

+150
-41
lines changed

anchor/client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ impl Client {
450450
slot_clock.clone(),
451451
message_sender,
452452
config.global_config.ssv_network.ssv_domain_type,
453+
E::slots_per_epoch(),
453454
)
454455
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;
455456

anchor/qbft_manager/src/instance.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ impl<D: QbftData<Hash = Hash256>> Initialized<D> {
259259
pub async fn qbft_instance<D: QbftData<Hash = Hash256>>(
260260
mut rx: UnboundedReceiver<QbftMessage<D>>,
261261
message_sender: Arc<dyn MessageSender>,
262+
completion_tx: mpsc::UnboundedSender<crate::InstanceId>,
263+
instance_id: crate::InstanceId,
262264
) {
263265
// Signal a new instance that is uninitialized
264266
let mut instance = QbftInstance::Uninitialized(Uninitialized::default());
@@ -304,13 +306,20 @@ pub async fn qbft_instance<D: QbftData<Hash = Hash256>>(
304306
if let QbftInstance::Initialized(initialized) = instance {
305307
initialized.complete(Completed::TimedOut);
306308
}
309+
// No notification - either already sent when decided, or cleaner removed us
307310
break;
308311
}
309312
};
310313

311314
// If the instance is ongoing, check whether it is done.
312315
if let QbftInstance::Initialized(initialized) = instance {
313316
instance = initialized.complete_if_done(&message_sender);
317+
318+
// If we just transitioned to Decided, notify cleaner for immediate cleanup
319+
if matches!(instance, QbftInstance::Decided(_)) {
320+
let _ = completion_tx.send(instance_id);
321+
break;
322+
}
314323
}
315324

316325
// Drop guard as late as possible to keep the processor permit.

anchor/qbft_manager/src/lib.rs

Lines changed: 132 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,25 @@ const QBFT_INSTANCE_NAME: &str = "qbft_instance";
3939
const QBFT_MESSAGE_NAME: &str = "qbft_message";
4040
const QBFT_CLEANER_NAME: &str = "qbft_cleaner";
4141

42-
/// Number of slots to keep before the current slot
43-
const QBFT_RETAIN_SLOTS: u64 = 1;
42+
/// Calculate the beacon chain inclusion deadline for a duty
43+
fn calculate_deadline(role: Role, slot: types::Slot, slots_per_epoch: u64) -> types::Slot {
44+
match role {
45+
Role::Committee | Role::Aggregator => {
46+
// Attestations can be included until end of next epoch (epoch E+1)
47+
// Per EIP-7045: attestation from epoch E valid until end of epoch E+1
48+
let epoch = slot.epoch(slots_per_epoch);
49+
types::Slot::new((epoch.as_u64() + 2) * slots_per_epoch - 1)
50+
}
51+
Role::Proposer | Role::SyncCommittee => {
52+
// Must be in the same slot
53+
slot
54+
}
55+
Role::VoluntaryExit | Role::ValidatorRegistration => {
56+
// One epoch to complete
57+
types::Slot::new(slot.as_u64() + slots_per_epoch)
58+
}
59+
}
60+
}
4461

4562
// Unique Identifier for a committee and its corresponding QBFT instance
4663
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
@@ -98,8 +115,20 @@ pub struct QbftInitialization<D: QbftData> {
98115
on_completed: oneshot::Sender<Completed<D>>,
99116
}
100117

101-
// Map from an identifier to a sender for the instance
102-
type Map<I, D> = DashMap<I, UnboundedSender<QbftMessage<D>>>;
118+
// Manager's bookkeeping for an instance
119+
pub struct ManagedInstance<D: QbftData> {
120+
sender: UnboundedSender<QbftMessage<D>>,
121+
deadline: types::Slot,
122+
}
123+
124+
// Map from an identifier to managed instance data
125+
type Map<I, D> = DashMap<I, ManagedInstance<D>>;
126+
127+
// Enum to identify which instance completed
128+
pub enum InstanceId {
129+
BeaconVote(CommitteeInstanceId),
130+
ValidatorConsensus(ValidatorInstanceId),
131+
}
103132

104133
// Top level QBFTManager structure
105134
pub struct QbftManager {
@@ -115,6 +144,10 @@ pub struct QbftManager {
115144
message_sender: Arc<dyn MessageSender>,
116145
// Network domain to embed into messages
117146
domain: DomainType,
147+
// Channel to notify cleaner when instance completes
148+
completion_tx: mpsc::UnboundedSender<InstanceId>,
149+
// Slots per epoch for deadline calculations
150+
slots_per_epoch: u64,
118151
}
119152

120153
impl QbftManager {
@@ -125,21 +158,26 @@ impl QbftManager {
125158
slot_clock: impl SlotClock + 'static,
126159
message_sender: Arc<dyn MessageSender>,
127160
domain: DomainType,
161+
slots_per_epoch: u64,
128162
) -> Result<Arc<Self>, QbftError> {
163+
let (completion_tx, completion_rx) = mpsc::unbounded_channel();
164+
129165
let manager = Arc::new(QbftManager {
130166
processor,
131167
operator_id,
132168
validator_consensus_data_instances: DashMap::new(),
133169
beacon_vote_instances: DashMap::new(),
134170
message_sender,
135171
domain,
172+
completion_tx,
173+
slots_per_epoch,
136174
});
137175

138176
// Start a long running task that will clean up old instances
139-
manager
140-
.processor
141-
.permitless
142-
.send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?;
177+
manager.processor.permitless.send_async(
178+
Arc::clone(&manager).cleaner(slot_clock, completion_rx),
179+
QBFT_CLEANER_NAME,
180+
)?;
143181

144182
Ok(manager)
145183
}
@@ -161,6 +199,11 @@ impl QbftManager {
161199
let (result_sender, result_receiver) = oneshot::channel();
162200
let message_id = D::message_id(&self.domain, &id);
163201

202+
// Calculate deadline for this instance
203+
let role = message_id.role().ok_or(QbftError::InconsistentMessageId)?;
204+
let slot = types::Slot::new(*initial.instance_height(&id) as u64);
205+
let deadline = calculate_deadline(role, slot, self.slots_per_epoch);
206+
164207
// General the qbft configuration
165208
let config = ConfigBuilder::new(
166209
operator_id,
@@ -169,17 +212,12 @@ impl QbftManager {
169212
);
170213
let config = config
171214
.with_quorum_size(committee.cluster_members.len() - committee.get_f() as usize)
172-
.with_max_rounds(
173-
message_id
174-
.role()
175-
.and_then(|r| r.max_round())
176-
.ok_or(QbftError::InconsistentMessageId)? as usize,
177-
)
215+
.with_max_rounds(role.max_round().ok_or(QbftError::InconsistentMessageId)? as usize)
178216
.build()?;
179217

180218
// Get or spawn a new qbft instance. This will return the sender that we can use to send
181219
// new messages to the specific instance
182-
let sender = D::get_or_spawn_instance(self, id);
220+
let sender = D::get_or_spawn_instance(self, id.clone(), deadline);
183221
self.processor.urgent_consensus.send_immediate(
184222
move |drop_on_finish: DropOnFinish| {
185223
// A message to initialize this instance
@@ -261,7 +299,19 @@ impl QbftManager {
261299
id: D::Id,
262300
data: WrappedQbftMessage,
263301
) -> Result<(), QbftError> {
264-
let sender = D::get_or_spawn_instance(self, id);
302+
// Get the map for this data type
303+
let map = D::get_map(self);
304+
305+
// Look up existing instance - network messages should only go to existing instances
306+
let Some(managed) = map.get(&id) else {
307+
// Instance doesn't exist yet - this message arrived before decide_instance was called
308+
// This is normal during startup, just ignore it
309+
return Ok(());
310+
};
311+
312+
let sender = managed.sender.clone();
313+
drop(managed); // Release the lock before sending
314+
265315
self.processor.urgent_consensus.send_immediate(
266316
move |drop_on_finish: DropOnFinish| {
267317
let _ = sender.send(QbftMessage {
@@ -274,51 +324,84 @@ impl QbftManager {
274324
Ok(())
275325
}
276326

277-
// Long running cleaner that will remove instances that are no longer relevant
278-
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
279-
while !self.processor.permitless.is_closed() {
280-
sleep(
281-
slot_clock
282-
.duration_to_next_slot()
283-
.unwrap_or(slot_clock.slot_duration()),
284-
)
285-
.await;
286-
let Some(slot) = slot_clock.now() else {
287-
continue;
288-
};
289-
let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS);
290-
self.beacon_vote_instances
291-
.retain(|k, _| *k.instance_height >= cutoff.as_usize());
292-
self.validator_consensus_data_instances
293-
.retain(|k, _| *k.instance_height >= cutoff.as_usize());
327+
/// Long running cleaner that removes instances based on completion or deadline
328+
async fn cleaner(
329+
self: Arc<Self>,
330+
slot_clock: impl SlotClock,
331+
mut completion_rx: mpsc::UnboundedReceiver<InstanceId>,
332+
) {
333+
loop {
334+
tokio::select! {
335+
// Branch 1: Instance completed - clean immediately
336+
Some(id) = completion_rx.recv() => {
337+
match id {
338+
InstanceId::BeaconVote(id) => {
339+
self.beacon_vote_instances.remove(&id);
340+
}
341+
InstanceId::ValidatorConsensus(id) => {
342+
self.validator_consensus_data_instances.remove(&id);
343+
}
344+
}
345+
}
346+
// Branch 2: Slot timeout - clean expired instances
347+
_ = sleep(
348+
slot_clock
349+
.duration_to_next_slot()
350+
.unwrap_or(slot_clock.slot_duration())
351+
) => {
352+
let Some(current_slot) = slot_clock.now() else {
353+
continue;
354+
};
355+
self.beacon_vote_instances
356+
.retain(|_, managed| managed.deadline >= current_slot);
357+
self.validator_consensus_data_instances
358+
.retain(|_, managed| managed.deadline >= current_slot);
359+
}
360+
}
361+
362+
if self.processor.permitless.is_closed() {
363+
break;
364+
}
294365
}
295366
}
296367
}
297368

298-
// Trait that describes any data that is able to be decided upon during a qbft instance
299369
pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
300-
type Id: Hash + Eq + Send + Debug;
370+
type Id: Hash + Eq + Send + Debug + Clone;
301371

302372
fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self>;
303373

374+
fn wrap_id(id: Self::Id) -> InstanceId;
375+
304376
fn get_or_spawn_instance(
305377
manager: &QbftManager,
306378
id: Self::Id,
379+
deadline: types::Slot,
307380
) -> UnboundedSender<QbftMessage<Self>> {
308381
let map = Self::get_map(manager);
309-
match map.entry(id) {
310-
dashmap::Entry::Occupied(entry) => entry.get().clone(),
382+
match map.entry(id.clone()) {
383+
dashmap::Entry::Occupied(entry) => entry.get().sender.clone(),
311384
dashmap::Entry::Vacant(entry) => {
312385
// There is not an instance running yet, store the sender and spawn a new instance
313-
// with the reeiver
386+
// with the receiver
314387
let (tx, rx) = mpsc::unbounded_channel();
315388
let span = debug_span!("qbft_instance", instance_id = ?entry.key());
316-
let tx = entry.insert(tx);
389+
let managed = ManagedInstance {
390+
sender: tx,
391+
deadline,
392+
};
393+
let sender = entry.insert(managed).sender.clone();
394+
let instance_id = Self::wrap_id(id);
395+
let completion_tx = manager.completion_tx.clone();
396+
let message_sender = manager.message_sender.clone();
317397
let _ = manager.processor.permitless.send_async(
318-
Box::pin(qbft_instance(rx, manager.message_sender.clone()).instrument(span)),
398+
Box::pin(
399+
qbft_instance(rx, message_sender, completion_tx, instance_id)
400+
.instrument(span),
401+
),
319402
QBFT_INSTANCE_NAME,
320403
);
321-
tx.clone()
404+
sender
322405
}
323406
}
324407
}
@@ -334,6 +417,10 @@ impl QbftDecidable for ValidatorConsensusData {
334417
&manager.validator_consensus_data_instances
335418
}
336419

420+
fn wrap_id(id: Self::Id) -> InstanceId {
421+
InstanceId::ValidatorConsensus(id)
422+
}
423+
337424
fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
338425
id.instance_height
339426
}
@@ -354,6 +441,10 @@ impl QbftDecidable for BeaconVote {
354441
&manager.beacon_vote_instances
355442
}
356443

444+
fn wrap_id(id: Self::Id) -> InstanceId {
445+
InstanceId::BeaconVote(id)
446+
}
447+
357448
fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
358449
id.instance_height
359450
}

anchor/qbft_manager/src/tests.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ where
304304
slot_clock.clone(),
305305
Arc::new(MockMessageSender::new(network_tx.clone(), operator_id)),
306306
DomainType([0; 4]),
307+
32, // slots_per_epoch
307308
)
308309
.expect("Creation should not fail");
309310

@@ -965,10 +966,17 @@ async fn test_timeout(round_timeout_to_test: usize) {
965966
let (sender_tx, _sender_rx) = unbounded_channel();
966967
let (message_tx, message_rx) = unbounded_channel();
967968
let (result_tx, result_rx) = oneshot::channel();
969+
let (completion_tx, _completion_rx) = unbounded_channel();
968970
let message_sender = MockMessageSender::new(sender_tx, OperatorId(1));
971+
let instance_id = super::InstanceId::BeaconVote(CommitteeInstanceId {
972+
committee: CommitteeId::default(),
973+
instance_height: 0.into(),
974+
});
969975
let _handle = tokio::spawn(qbft_instance::<BeaconVote>(
970976
message_rx,
971977
Arc::new(message_sender),
978+
completion_tx,
979+
instance_id,
972980
));
973981

974982
// create a slot clock at slot 0 with a slot duration of 12 seconds

0 commit comments

Comments
 (0)