Skip to content

Commit a22fcc0

Browse files
authored
feat: StatefulSet reconciliation improvements and status monitoring (#43) (#51)
* feat: Phase 1 - StatefulSet diff detection and validation (#43) Implement intelligent StatefulSet update detection and immutable field validation to improve reconciliation efficiency and safety. Changes: - Add statefulset_needs_update() method for semantic diff detection - Add validate_statefulset_update() method for immutable field checks - Refactor reconciliation loop to check/validate before updating - Add new error types: InternalError, ImmutableFieldModified, SerdeJson - Extend error policy with 60s requeue for immutable field errors - Add 9 comprehensive unit tests (35 tests total, all passing) - Update CHANGELOG.md with detailed changes Benefits: - Reduces unnecessary API calls and reconciliation overhead - Prevents invalid updates that would cause API rejections - Provides clear error messages for users - Foundation for rollout monitoring in Phase 2 Related: #43 * feat: Phase 2 - Status types and StatefulSet helpers (#43) Extend status structures and add StatefulSet status helper methods for rollout monitoring support. Changes: - Add Condition struct for Kubernetes standard conditions (Ready, Progressing, Degraded) - Extend Status struct with observed_generation and conditions fields - Extend Pool status with replica tracking and revision fields - Add new PoolState variants: Updating, RolloutComplete, RolloutFailed, Degraded - Add context methods: - get_statefulset_status() - Fetch StatefulSet status - is_rollout_complete() - Check if rollout is complete - get_statefulset_revisions() - Get current and update revisions Benefits: - Foundation for comprehensive rollout monitoring - Kubernetes-standard status conditions - Per-pool rollout status tracking - All existing tests continue to pass Related: #43 * fix: Enable RUST_LOG environment variable support (#43) The tracing subscriber was not respecting the RUST_LOG environment variable, making it impossible to see debug logs from the operator. Changes: - Enable 'env-filter' feature for tracing-subscriber in Cargo.toml - Add .with_env_filter() to tracing subscriber initialization - Allows operators to see debug logs with: RUST_LOG=operator=debug This fixes the "no logs" issue reported when running the operator. Testing: - Verified operator runs successfully with RUST_LOG=info - Verified debug logs appear with RUST_LOG=operator=debug - Confirmed diff detection and validation logs are visible Related: #43 * fix: Block pool name changes to prevent orphaned StatefulSets (#43) Changing a pool name creates a new StatefulSet but leaves the old one orphaned. This is invalid because pool names are part of the StatefulSet selector (immutable field). Changes: - Add validation before StatefulSet reconciliation loop - List all StatefulSets owned by the Tenant - Check for StatefulSets whose pool names don't match current spec - Return ImmutableFieldModified error with clear guidance Error message guides users to delete and recreate Tenant if rename needed. Testing: - All 35 tests passing - Operator detects orphaned StatefulSets and returns error - 60-second requeue for user to fix Related: #43 * feat: Phase 3 - Status updates and rollout monitoring (#43) Implement comprehensive status updates and rollout monitoring to track StatefulSet reconciliation progress in real-time. Changes: - Add build_pool_status() helper method to Tenant to extract status from StatefulSets - Collect pool statuses during StatefulSet reconciliation loop - Aggregate pool statuses into overall Tenant conditions (Ready, Progressing, Degraded) - Update Tenant status with replica counts, pool states, and conditions - Requeue faster (10s) when pools are updating for responsive monitoring - Refactor Context.update_status() to accept full Status struct - Add chrono dependency for timestamp generation Status Updates: - Pool status includes: replicas, ready_replicas, current_replicas, updated_replicas, current_revision, update_revision, last_update_time, and state - Pool states: NotCreated, Initialized, Updating, RolloutComplete, RolloutFailed, Degraded - Tenant conditions: Ready (True/False), Progressing (True during rollout), Degraded (True when degraded) - Tenant overall state: Ready, NotReady, Updating, Degraded Rollout Monitoring: - Tracks individual pool rollout status based on StatefulSet replicas and revisions - Sets Progressing condition during rollouts - Sets Ready condition when all replicas are ready and updated - Sets Degraded condition when pools are unhealthy - Requeues every 10 seconds during active rollouts for responsive updates This completes Phase 3 of issue #43. Status information is now properly populated and visible in kubectl/k9s. * fix: Correct status update to send complete Tenant object (#43) The replace_status API requires a complete Kubernetes object with apiVersion, kind, metadata, spec, and status - not just the status field. Changes: - Clone the Tenant resource and set its status field - Serialize the complete Tenant object - Fixes 'Object Kind is missing' error during status updates This fixes the status update errors seen in the operator logs. * fix: Remove redundant Health column from kubectl output The Health column was referencing a non-existent .status.healthStatus field. Removed it since STATE column already shows the current state. * fix: Use patch_status with JSON merge patch for status updates (#43) Changed from replace_status with raw bytes to patch_status with JSON merge patch, which is the proper kube-rs API for updating status subresources. This fixes the 'Object Kind is missing' error. * fix: Resolve clippy lint warnings - Use chained if-let conditions to collapse nested if statements - Use as_deref() instead of as_ref().map(|s| s.as_str()) - Allow unwrap/expect in test modules (acceptable in tests) All clippy checks now pass with -D warnings. * fix: Resolve clippy lint warnings Replace unwrap() and expect() calls with explicit pattern matching in RBAC test code to satisfy clippy lints without using allow annotations. All test behavior is preserved with explicit panic messages on None values.
1 parent d4a6d18 commit a22fcc0

13 files changed

Lines changed: 1293 additions & 74 deletions

File tree

CHANGELOG.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
#### **StatefulSet Reconciliation Improvements** (2025-12-03, Issue #43)
13+
14+
Implemented intelligent StatefulSet update detection and validation to improve reconciliation efficiency and safety:
15+
16+
- **Diff Detection**: Added `statefulset_needs_update()` method to detect actual changes
17+
- Compares existing vs desired StatefulSet specs semantically
18+
- Avoids unnecessary API calls when no changes are needed
19+
- Checks: replicas, image, env vars, resources, scheduling, pod management policy, etc.
20+
21+
- **Immutable Field Validation**: Added `validate_statefulset_update()` method
22+
- Prevents modifications to immutable StatefulSet fields (selector, volumeClaimTemplates, serviceName)
23+
- Provides clear error messages for invalid updates (e.g., changing volumesPerServer)
24+
- Protects against API rejections during reconciliation
25+
26+
- **Enhanced Reconciliation Logic**: Refactored StatefulSet reconciliation loop
27+
- Checks if StatefulSet exists before attempting update
28+
- Validates update safety before applying changes
29+
- Only applies updates when actual changes are detected
30+
- Records Kubernetes events for update lifecycle (Created, UpdateStarted, UpdateValidationFailed)
31+
32+
- **Error Handling**: Extended error policy
33+
- Added 60-second requeue for immutable field modification errors (user-fixable)
34+
- Consistent error handling across credential and validation failures
35+
36+
- **New Error Types**: Added to `types::error::Error`
37+
- `InternalError` - For unexpected internal conditions
38+
- `ImmutableFieldModified` - For attempted modifications to immutable fields
39+
- `SerdeJson` - For JSON serialization errors during comparisons
40+
41+
- **Comprehensive Test Coverage**: Added 9 new unit tests (35 tests total)
42+
- Tests for diff detection (no changes, image, replicas, env vars, resources)
43+
- Tests for validation (selector, serviceName, volumesPerServer changes rejected)
44+
- Test for safe updates (image changes allowed)
45+
46+
**Benefits**:
47+
- Reduces unnecessary API calls and reconciliation overhead
48+
- Prevents reconciliation failures from invalid updates
49+
- Provides better error messages for users
50+
- Foundation for rollout monitoring (Phase 2)
51+
1052
### Changed
1153

1254
#### **Code Refactoring**: Credential Validation Simplification (2025-11-15)

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ homepage = "https://rustfs.com"
88

99

1010
[dependencies]
11+
chrono = "0.4"
1112
const-str = "0.7.0"
1213
serde = { version = "1.0.228", features = ["derive"] }
1314
tokio = { version = "1.48.0", features = ["rt", "rt-multi-thread", "macros", "fs", "io-std", "io-util"] }
1415
futures = "0.3.31"
1516
tracing = "0.1.41"
16-
tracing-subscriber = { version = "0.3.20" }
17+
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
1718
serde_json = "1.0.145"
1819
serde_yaml_ng = "0.10.0"
1920
strum = { version = "0.27.2", features = ["derive"] }

deploy/rustfs-operator/crds/tenant.yaml

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ spec:
1717
- jsonPath: .status.currentState
1818
name: State
1919
type: string
20-
- jsonPath: .status.healthStatus
21-
name: Health
22-
type: string
2320
- jsonPath: .metadata.creationTimestamp
2421
name: Age
2522
type: date
@@ -1157,15 +1154,87 @@ spec:
11571154
availableReplicas:
11581155
format: int32
11591156
type: integer
1157+
conditions:
1158+
description: Kubernetes standard conditions
1159+
items:
1160+
description: Kubernetes standard condition for Tenant resources
1161+
properties:
1162+
lastTransitionTime:
1163+
description: Last time the condition transitioned from one status to another
1164+
nullable: true
1165+
type: string
1166+
message:
1167+
description: Human-readable message indicating details about the transition
1168+
type: string
1169+
observedGeneration:
1170+
description: The generation of the Tenant resource that this condition reflects
1171+
format: int64
1172+
nullable: true
1173+
type: integer
1174+
reason:
1175+
description: One-word CamelCase reason for the condition's last transition
1176+
type: string
1177+
status:
1178+
description: Status of the condition (True, False, Unknown)
1179+
type: string
1180+
type:
1181+
description: Type of condition (Ready, Progressing, Degraded)
1182+
type: string
1183+
required:
1184+
- message
1185+
- reason
1186+
- status
1187+
- type
1188+
type: object
1189+
type: array
11601190
currentState:
11611191
type: string
1192+
observedGeneration:
1193+
description: The generation observed by the operator
1194+
format: int64
1195+
nullable: true
1196+
type: integer
11621197
pools:
11631198
items:
11641199
properties:
1200+
currentReplicas:
1201+
description: Number of pods with current revision
1202+
format: int32
1203+
nullable: true
1204+
type: integer
1205+
currentRevision:
1206+
description: Current revision hash of the StatefulSet
1207+
nullable: true
1208+
type: string
1209+
lastUpdateTime:
1210+
description: Last time the pool status was updated
1211+
nullable: true
1212+
type: string
1213+
readyReplicas:
1214+
description: Number of pods with Ready condition
1215+
format: int32
1216+
nullable: true
1217+
type: integer
1218+
replicas:
1219+
description: Total number of non-terminated pods targeted by this pool's StatefulSet
1220+
format: int32
1221+
nullable: true
1222+
type: integer
11651223
ssName:
1224+
description: Name of the StatefulSet for this pool
11661225
type: string
11671226
state:
1227+
description: Current state of the pool
1228+
type: string
1229+
updateRevision:
1230+
description: Update revision hash of the StatefulSet (different from current during rollout)
1231+
nullable: true
11681232
type: string
1233+
updatedReplicas:
1234+
description: Number of pods with updated revision
1235+
format: int32
1236+
nullable: true
1237+
type: integer
11691238
required:
11701239
- ssName
11711240
- state

src/context.rs

Lines changed: 105 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -106,38 +106,41 @@ impl Context {
106106
.await
107107
}
108108

109-
pub async fn update_status<S>(
109+
pub async fn update_status(
110110
&self,
111111
resource: &Tenant,
112-
current_status: S,
113-
replica: i32,
114-
) -> Result<Tenant, Error>
115-
where
116-
S: ToString,
117-
{
112+
status: crate::types::v1alpha1::status::Status,
113+
) -> Result<Tenant, Error> {
114+
use kube::api::{Patch, PatchParams};
115+
118116
let api: Api<Tenant> = Api::namespaced(self.client.clone(), &resource.namespace()?);
119-
let name = &resource.name();
117+
let name = resource.name();
120118

121-
let update_func = async |tenant: &Tenant| {
122-
let mut status = tenant.status.clone().unwrap_or_default();
123-
status.available_replicas = replica;
124-
status.current_state = current_status.to_string();
125-
let status_body = serde_json::to_vec(&status)?;
119+
// Create a JSON merge patch for the status
120+
let status_patch = serde_json::json!({
121+
"status": status
122+
});
126123

127-
api.replace_status(name, &PostParams::default(), status_body)
128-
.context(KubeSnafu)
129-
.await
130-
};
131-
132-
match update_func(resource).await {
124+
// Try to patch the status
125+
match api
126+
.patch_status(
127+
&name,
128+
&PatchParams::default(),
129+
&Patch::Merge(status_patch.clone()),
130+
)
131+
.context(KubeSnafu)
132+
.await
133+
{
133134
Ok(t) => return Ok(t),
134135
_ => {}
135136
}
136137

137138
info!("status update failed due to conflict, retrieve the latest resource and retry.");
138139

139-
let new_one = api.get(name).context(KubeSnafu).await?;
140-
update_func(&new_one).await
140+
// Retry with the same patch
141+
api.patch_status(&name, &PatchParams::default(), &Patch::Merge(status_patch))
142+
.context(KubeSnafu)
143+
.await
141144
}
142145

143146
pub async fn delete<T>(&self, name: &str, namespace: &str) -> Result<(), Error>
@@ -287,4 +290,85 @@ impl Context {
287290

288291
Ok(())
289292
}
293+
294+
/// Gets the status of a StatefulSet including rollout progress
295+
///
296+
/// # Returns
297+
/// The StatefulSet status with replica counts and revision information
298+
pub async fn get_statefulset_status(
299+
&self,
300+
name: &str,
301+
namespace: &str,
302+
) -> Result<k8s_openapi::api::apps::v1::StatefulSetStatus, Error> {
303+
let ss: k8s_openapi::api::apps::v1::StatefulSet = self.get(name, namespace).await?;
304+
305+
ss.status.ok_or_else(|| Error::Types {
306+
source: types::error::Error::InternalError {
307+
msg: format!("StatefulSet {} has no status", name),
308+
},
309+
})
310+
}
311+
312+
/// Checks if a StatefulSet rollout is complete
313+
///
314+
/// A rollout is considered complete when:
315+
/// - observedGeneration matches metadata.generation (controller has seen latest spec)
316+
/// - replicas == readyReplicas (all pods are ready)
317+
/// - currentRevision == updateRevision (all pods are on the new revision)
318+
/// - updatedReplicas == replicas (all pods have been updated)
319+
///
320+
/// # Returns
321+
/// - `Ok(true)` if rollout is complete
322+
/// - `Ok(false)` if rollout is still in progress
323+
/// - `Err` if there's an error fetching the StatefulSet
324+
pub async fn is_rollout_complete(&self, name: &str, namespace: &str) -> Result<bool, Error> {
325+
let ss: k8s_openapi::api::apps::v1::StatefulSet = self.get(name, namespace).await?;
326+
327+
let metadata = &ss.metadata;
328+
let spec = ss.spec.as_ref().ok_or_else(|| Error::Types {
329+
source: types::error::Error::InternalError {
330+
msg: format!("StatefulSet {} missing spec", name),
331+
},
332+
})?;
333+
334+
let status = ss.status.as_ref().ok_or_else(|| Error::Types {
335+
source: types::error::Error::InternalError {
336+
msg: format!("StatefulSet {} missing status", name),
337+
},
338+
})?;
339+
340+
let desired_replicas = spec.replicas.unwrap_or(1);
341+
342+
// Check if controller has observed the latest generation
343+
let generation_current = metadata.generation.is_some()
344+
&& status.observed_generation.is_some()
345+
&& metadata.generation == status.observed_generation;
346+
347+
// Check if all replicas are ready
348+
let replicas_ready = status.replicas == desired_replicas
349+
&& status.ready_replicas.unwrap_or(0) == desired_replicas
350+
&& status.updated_replicas.unwrap_or(0) == desired_replicas;
351+
352+
// Check if all pods are on the same revision
353+
let revisions_match = status.current_revision.is_some()
354+
&& status.update_revision.is_some()
355+
&& status.current_revision == status.update_revision;
356+
357+
Ok(generation_current && replicas_ready && revisions_match)
358+
}
359+
360+
/// Gets the current and update revision of a StatefulSet
361+
///
362+
/// # Returns
363+
/// A tuple of (current_revision, update_revision)
364+
/// Returns None for either value if not available
365+
pub async fn get_statefulset_revisions(
366+
&self,
367+
name: &str,
368+
namespace: &str,
369+
) -> Result<(Option<String>, Option<String>), Error> {
370+
let status = self.get_statefulset_status(name, namespace).await?;
371+
372+
Ok((status.current_revision, status.update_revision))
373+
}
290374
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub mod tests;
3838

3939
pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
4040
tracing_subscriber::fmt()
41+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
4142
.with_level(true)
4243
.with_file(true)
4344
.with_line_number(true)

0 commit comments

Comments
 (0)