Skip to content

Commit 0bebe3a

Browse files
authored
Trust Quorum: Prepare phase retries and testing (#8000)
This PR implements the ability for a trust quorum node to handle prepare acknowledgements and send retries when time has advanced via `Node::tick` calls. The vast majority of the code is test code. `coordinator.rs` is the start of a property based test to test the behavior of a node that is coordinating reconfigurations. The coordinating node itself is the system under test (SUT), and there is an abstract model that keeps enough information to allow asserting properties about the behavior of the SUT. A `TestInput` is generated which contains an initial configuration for the coordinating node and a generated list of abstract `Action`s to be executed by the test. Each action has a corresponding method on the `TestState` for handling it. These methods update the model state, SUT state, and then verify any properties they can. The `Action` enum is going to grow in the next few PRs such that reconfigurations beyond the initial configuration will run and messages can be dropped. These will correspond with an expansion of the `Node` implementation to allow recovering key shares from past committed configuration and the ability to handle `Commit` and `Cancel` API calls which ultimately are triggered from Nexus, as described in RFD 238.
1 parent 1fc519e commit 0bebe3a

File tree

5 files changed

+797
-26
lines changed

5 files changed

+797
-26
lines changed

trust-quorum/src/coordinator_state.rs

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::messages::{PeerMsg, PrepareMsg};
99
use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg};
1010
use crate::{Configuration, Envelope, Epoch, PlatformId};
1111
use gfss::shamir::Share;
12+
use slog::{Logger, o, warn};
1213
use std::collections::{BTreeMap, BTreeSet};
1314
use std::time::Instant;
1415

@@ -24,9 +25,11 @@ use std::time::Instant;
2425
/// allows progress to always be made with a full linearization of epochs.
2526
///
2627
/// We allow some unused fields before we complete the coordination code
27-
#[allow(unused)]
2828
pub struct CoordinatorState {
29+
log: Logger,
30+
2931
/// When the reconfiguration started
32+
#[expect(unused)]
3033
start_time: Instant,
3134

3235
/// A copy of the message used to start this reconfiguration
@@ -49,6 +52,7 @@ impl CoordinatorState {
4952
/// Return the newly constructed `CoordinatorState` along with this node's
5053
/// `PrepareMsg` so that it can be persisted.
5154
pub fn new_uninitialized(
55+
log: Logger,
5256
now: Instant,
5357
msg: ValidatedReconfigureMsg,
5458
) -> Result<(CoordinatorState, PrepareMsg), ReconfigurationError> {
@@ -76,7 +80,7 @@ impl CoordinatorState {
7680
prepare_acks: BTreeSet::new(),
7781
};
7882

79-
let state = CoordinatorState::new(now, msg, config, op);
83+
let state = CoordinatorState::new(log, now, msg, config, op);
8084

8185
// Safety: Construction of a `ValidatedReconfigureMsg` ensures that
8286
// `my_platform_id` is part of the new configuration and has a share.
@@ -86,6 +90,7 @@ impl CoordinatorState {
8690

8791
/// A reconfiguration from one group to another
8892
pub fn new_reconfiguration(
93+
log: Logger,
8994
now: Instant,
9095
msg: ValidatedReconfigureMsg,
9196
last_committed_config: &Configuration,
@@ -101,18 +106,24 @@ impl CoordinatorState {
101106
new_shares,
102107
};
103108

104-
Ok(CoordinatorState::new(now, msg, config, op))
109+
Ok(CoordinatorState::new(log, now, msg, config, op))
105110
}
106111

107-
// Intentionallly private!
112+
// Intentionally private!
113+
//
114+
// The public constructors `new_uninitialized` and `new_reconfiguration` are
115+
// more specific, and perform validation of arguments.
108116
fn new(
117+
log: Logger,
109118
now: Instant,
110119
reconfigure_msg: ValidatedReconfigureMsg,
111120
configuration: Configuration,
112121
op: CoordinatorOperation,
113122
) -> CoordinatorState {
114-
let retry_deadline = now + reconfigure_msg.retry_timeout();
123+
// We want to send any pending messages immediately
124+
let retry_deadline = now;
115125
CoordinatorState {
126+
log: log.new(o!("component" => "tq-coordinator-state")),
116127
start_time: now,
117128
reconfigure_msg,
118129
configuration,
@@ -135,8 +146,12 @@ impl CoordinatorState {
135146
// will return a copy of it.
136147
//
137148
// This method is "in progress" - allow unused parameters for now
138-
#[allow(unused)]
149+
#[expect(unused)]
139150
pub fn send_msgs(&mut self, now: Instant, outbox: &mut Vec<Envelope>) {
151+
if now < self.retry_deadline {
152+
return;
153+
}
154+
self.retry_deadline = now + self.reconfigure_msg.retry_timeout();
140155
match &self.op {
141156
CoordinatorOperation::CollectShares {
142157
epoch,
@@ -156,20 +171,55 @@ impl CoordinatorState {
156171
}
157172
}
158173
}
174+
175+
/// Record a `PrepareAck` from another node as part of tracking
176+
/// quorum for the prepare phase of the trust quorum protocol.
177+
pub fn ack_prepare(&mut self, from: PlatformId) {
178+
match &mut self.op {
179+
CoordinatorOperation::Prepare {
180+
prepares, prepare_acks, ..
181+
} => {
182+
if !self.configuration.members.contains_key(&from) {
183+
warn!(
184+
self.log,
185+
"PrepareAck from node that is not a cluster member";
186+
"epoch" => %self.configuration.epoch,
187+
"from" => %from
188+
);
189+
return;
190+
}
191+
192+
// Remove the responder so we don't ask it again
193+
prepares.remove(&from);
194+
195+
// Save the ack for quorum purposes
196+
prepare_acks.insert(from);
197+
}
198+
op => {
199+
warn!(
200+
self.log,
201+
"Ack received when coordinator is not preparing";
202+
"op" => op.name(),
203+
"from" => %from
204+
);
205+
}
206+
}
207+
}
159208
}
160209

161210
/// What should the coordinator be doing?
162-
///
163-
/// We haven't started implementing upgrade from LRTQ yet
164-
#[allow(unused)]
165211
pub enum CoordinatorOperation {
212+
// We haven't started implementing this yet
213+
#[expect(unused)]
166214
CollectShares {
167215
epoch: Epoch,
168216
members: BTreeMap<PlatformId, Sha3_256Digest>,
169217
collected_shares: BTreeMap<PlatformId, Share>,
170218
new_shares: BTreeMap<PlatformId, Share>,
171219
},
220+
// We haven't started implementing this yet
172221
// Epoch is always 0
222+
#[allow(unused)]
173223
CollectLrtqShares {
174224
members: BTreeMap<PlatformId, ShareDigestLrtq>,
175225
shares: BTreeMap<PlatformId, LrtqShare>,
@@ -182,3 +232,15 @@ pub enum CoordinatorOperation {
182232
prepare_acks: BTreeSet<PlatformId>,
183233
},
184234
}
235+
236+
impl CoordinatorOperation {
237+
pub fn name(&self) -> &'static str {
238+
match self {
239+
CoordinatorOperation::CollectShares { .. } => "collect shares",
240+
CoordinatorOperation::CollectLrtqShares { .. } => {
241+
"collect lrtq shares"
242+
}
243+
CoordinatorOperation::Prepare { .. } => "prepare",
244+
}
245+
}
246+
}

trust-quorum/src/lib.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub use persistent_state::{PersistentState, PersistentStateSummary};
3838
Deserialize,
3939
Display,
4040
)]
41-
pub struct Epoch(u64);
41+
pub struct Epoch(pub u64);
4242

4343
/// The number of shares required to reconstruct the rack secret
4444
///
@@ -72,6 +72,12 @@ pub struct PlatformId {
7272
serial_number: String,
7373
}
7474

75+
impl std::fmt::Display for PlatformId {
76+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77+
write!(f, "{}:{}", self.part_number, self.serial_number)
78+
}
79+
}
80+
7581
impl PlatformId {
7682
pub fn new(part_number: String, serial_number: String) -> PlatformId {
7783
PlatformId { part_number, serial_number }
@@ -89,7 +95,7 @@ impl PlatformId {
8995
/// A container to make messages between trust quorum nodes routable
9096
#[derive(Debug, Serialize, Deserialize)]
9197
pub struct Envelope {
92-
to: PlatformId,
93-
from: PlatformId,
94-
msg: PeerMsg,
98+
pub to: PlatformId,
99+
pub from: PlatformId,
100+
pub msg: PeerMsg,
95101
}

trust-quorum/src/node.rs

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
77
use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg};
88
use crate::{
9-
CoordinatorState, Envelope, PersistentState, PlatformId, messages::*,
9+
CoordinatorState, Envelope, Epoch, PersistentState, PlatformId, messages::*,
1010
};
1111

12-
use slog::{Logger, o};
12+
use slog::{Logger, o, warn};
1313
use std::time::Instant;
1414

1515
/// An entity capable of participating in trust quorum
@@ -76,15 +76,72 @@ impl Node {
7676
Ok(persistent_state)
7777
}
7878

79+
/// Process a timer tick
80+
///
81+
/// Ticks are issued by the caller in order to move the protocol forward.
82+
/// The current time is passed in to make the calls deterministic.
83+
pub fn tick(&mut self, now: Instant, outbox: &mut Vec<Envelope>) {
84+
self.send_coordinator_msgs(now, outbox);
85+
}
86+
87+
/// Handle a message from another node
88+
pub fn handle(
89+
&mut self,
90+
_now: Instant,
91+
_outbox: &mut Vec<Envelope>,
92+
from: PlatformId,
93+
msg: PeerMsg,
94+
) -> Option<PersistentState> {
95+
match msg {
96+
PeerMsg::PrepareAck(epoch) => {
97+
self.handle_prepare_ack(from, epoch);
98+
None
99+
}
100+
_ => todo!(
101+
"cannot handle message variant yet - not implemented: {msg:?}"
102+
),
103+
}
104+
}
105+
106+
/// Return the current state of the coordinator
107+
pub fn get_coordinator_state(&self) -> Option<&CoordinatorState> {
108+
self.coordinator_state.as_ref()
109+
}
110+
111+
fn handle_prepare_ack(&mut self, from: PlatformId, epoch: Epoch) {
112+
// Are we coordinating for this epoch?
113+
if let Some(cs) = &mut self.coordinator_state {
114+
let current_epoch = cs.reconfigure_msg().epoch();
115+
if current_epoch == epoch {
116+
// Store the ack in the coordinator state
117+
cs.ack_prepare(from);
118+
} else {
119+
// Log and drop message
120+
warn!(self.log, "Received prepare ack for wrong epoch";
121+
"from" => %from,
122+
"current_epoch" => %current_epoch,
123+
"acked_epoch" => %epoch
124+
);
125+
}
126+
} else {
127+
warn!(
128+
self.log,
129+
"Received prepare ack when not coordinating";
130+
"from" => %from,
131+
"acked_epoch" => %epoch
132+
);
133+
}
134+
}
135+
79136
// Send any required messages as a reconfiguration coordinator
80137
fn send_coordinator_msgs(
81138
&mut self,
82139
now: Instant,
83140
outbox: &mut Vec<Envelope>,
84141
) {
85-
// This function is going to be called unconditionally in `tick`
86-
// callbacks. In this case we may not actually be a coordinator. We just
87-
// ignore the call in that case.
142+
// This function is called unconditionally in `tick` callbacks. In this
143+
// case we may not actually be a coordinator. We ignore the call in
144+
// that case.
88145
if let Some(c) = self.coordinator_state.as_mut() {
89146
c.send_msgs(now, outbox);
90147
}
@@ -104,7 +161,11 @@ impl Node {
104161
// We have no committed configuration or lrtq ledger
105162
if self.persistent_state.is_uninitialized() {
106163
let (coordinator_state, my_prepare_msg) =
107-
CoordinatorState::new_uninitialized(now, msg)?;
164+
CoordinatorState::new_uninitialized(
165+
self.log.clone(),
166+
now,
167+
msg,
168+
)?;
108169
self.coordinator_state = Some(coordinator_state);
109170
// Add the prepare to our `PersistentState`
110171
self.persistent_state
@@ -118,8 +179,12 @@ impl Node {
118179
let config =
119180
self.persistent_state.last_committed_configuration().unwrap();
120181

121-
self.coordinator_state =
122-
Some(CoordinatorState::new_reconfiguration(now, msg, &config)?);
182+
self.coordinator_state = Some(CoordinatorState::new_reconfiguration(
183+
self.log.clone(),
184+
now,
185+
msg,
186+
&config,
187+
)?);
123188

124189
Ok(None)
125190
}
@@ -191,7 +256,6 @@ mod tests {
191256

192257
// A PersistentState should always be returned
193258
// It should include the `PrepareMsg` for this node.
194-
assert_eq!(persistent_state.generation, 0);
195259
assert!(persistent_state.lrtq.is_none());
196260
assert!(persistent_state.commits.is_empty());
197261
assert!(persistent_state.decommissioned.is_none());

trust-quorum/src/persistent_state.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ use std::collections::BTreeMap;
1818
/// All the persistent state for this protocol
1919
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2020
pub struct PersistentState {
21-
// Ledger generation
22-
pub generation: u64,
23-
2421
// If this node was an LRTQ node, sled-agent will start it with the ledger
2522
// data it read from disk. This allows us to upgrade from LRTQ.
2623
pub lrtq: Option<LrtqShareData>,
@@ -38,7 +35,6 @@ pub struct PersistentState {
3835
impl PersistentState {
3936
pub fn empty() -> PersistentState {
4037
PersistentState {
41-
generation: 0,
4238
lrtq: None,
4339
prepares: BTreeMap::new(),
4440
commits: BTreeMap::new(),

0 commit comments

Comments
 (0)