Skip to content

Commit bf49df6

Browse files
committed
feat: add request lifecycle hooks for client-side load balancing and observability
Adds RequestLifecycleHooks trait to enable intercepting and modifying S3 API requests at key points in the request lifecycle. This supports client-side load balancing, request telemetry, and debug logging.
1 parent a190fbc commit bf49df6

File tree

3 files changed

+705
-30
lines changed

3 files changed

+705
-30
lines changed

examples/debug_logging_hook.rs

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
2+
// Copyright 2024 MinIO, Inc.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
//! Example demonstrating how to use RequestLifecycleHooks for debug logging.
17+
//!
18+
//! This example shows:
19+
//! - Creating a custom debug logging hook
20+
//! - Attaching the hook to the MinIO client
21+
//! - Automatic logging of all S3 API requests with headers and response status
22+
//! - Using both `before_signing_mut` and `after_execute` hooks
23+
//!
24+
//! Run with default values (test-bucket / test-object.txt, verbose mode enabled):
25+
//! ```
26+
//! cargo run --example debug_logging_hook
27+
//! ```
28+
//!
29+
//! Run with custom bucket and object:
30+
//! ```
31+
//! cargo run --example debug_logging_hook -- mybucket myobject
32+
//! ```
33+
//!
34+
//! Disable verbose output:
35+
//! ```
36+
//! cargo run --example debug_logging_hook -- --no-verbose
37+
//! ```
38+
39+
use clap::{ArgAction, Parser};
40+
use futures_util::StreamExt;
41+
use minio::s3::builders::ObjectContent;
42+
use minio::s3::client::hooks::{Extensions, RequestLifecycleHooks};
43+
use minio::s3::client::{Method, Response};
44+
use minio::s3::creds::StaticProvider;
45+
use minio::s3::error::Error;
46+
use minio::s3::http::Url;
47+
use minio::s3::multimap_ext::Multimap;
48+
use minio::s3::response::BucketExistsResponse;
49+
use minio::s3::segmented_bytes::SegmentedBytes;
50+
use minio::s3::types::{S3Api, ToStream};
51+
use minio::s3::{MinioClient, MinioClientBuilder};
52+
use std::sync::Arc;
53+
54+
/// Debug logging hook that prints detailed information about each S3 request.
55+
#[derive(Debug)]
56+
struct DebugLoggingHook {
57+
/// Enable verbose output including all headers
58+
verbose: bool,
59+
}
60+
61+
impl DebugLoggingHook {
62+
fn new(verbose: bool) -> Self {
63+
Self { verbose }
64+
}
65+
}
66+
67+
#[async_trait::async_trait]
68+
impl RequestLifecycleHooks for DebugLoggingHook {
69+
fn name(&self) -> &'static str {
70+
"debug-logger"
71+
}
72+
73+
async fn before_signing_mut(
74+
&self,
75+
method: &Method,
76+
url: &mut Url,
77+
_region: &str,
78+
_headers: &mut Multimap,
79+
_query_params: &Multimap,
80+
bucket_name: Option<&str>,
81+
object_name: Option<&str>,
82+
_body: Option<&SegmentedBytes>,
83+
_extensions: &mut Extensions,
84+
) -> Result<(), Error> {
85+
if self.verbose {
86+
let bucket_obj = match (bucket_name, object_name) {
87+
(Some(b), Some(o)) => format!("{b}/{o}"),
88+
(Some(b), None) => b.to_string(),
89+
_ => url.to_string(),
90+
};
91+
println!("→ Preparing {method} request for {bucket_obj}");
92+
}
93+
Ok(())
94+
}
95+
96+
async fn after_execute(
97+
&self,
98+
method: &Method,
99+
url: &Url,
100+
_region: &str,
101+
headers: &Multimap,
102+
_query_params: &Multimap,
103+
bucket_name: Option<&str>,
104+
object_name: Option<&str>,
105+
resp: &Result<Response, reqwest::Error>,
106+
_extensions: &mut Extensions,
107+
) {
108+
// Format the basic request info
109+
let bucket_obj = match (bucket_name, object_name) {
110+
(Some(b), Some(o)) => format!("{b}/{o}"),
111+
(Some(b), None) => b.to_string(),
112+
_ => url.to_string(),
113+
};
114+
115+
// Format response status
116+
let status = match resp {
117+
Ok(response) => format!("✓ {}", response.status()),
118+
Err(err) => format!("✗ Error: {err}"),
119+
};
120+
121+
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
122+
println!("S3 Request: {method} {bucket_obj}");
123+
println!("URL: {url}");
124+
println!("Status: {status}");
125+
126+
if self.verbose {
127+
// Print headers alphabetically
128+
let mut header_strings: Vec<String> = headers
129+
.iter_all()
130+
.map(|(k, v)| format!("{}: {}", k, v.join(",")))
131+
.collect();
132+
header_strings.sort();
133+
134+
println!("\nRequest Headers:");
135+
for header in header_strings {
136+
println!(" {header}");
137+
}
138+
}
139+
140+
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
141+
}
142+
}
143+
144+
/// Example demonstrating debug logging with hooks
145+
#[derive(Parser)]
146+
struct Cli {
147+
/// Bucket to use for the example
148+
#[arg(default_value = "test-bucket")]
149+
bucket: String,
150+
/// Object to upload
151+
#[arg(default_value = "test-object.txt")]
152+
object: String,
153+
/// Disable verbose output (verbose is enabled by default, use --no-verbose to disable)
154+
#[arg(long = "no-verbose", action = ArgAction::SetFalse, default_value_t = true)]
155+
verbose: bool,
156+
}
157+
158+
#[tokio::main]
159+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
160+
env_logger::init();
161+
let args = Cli::parse();
162+
163+
println!("\n🔧 MinIO Debug Logging Hook Example\n");
164+
println!("This example demonstrates how hooks can be used for debugging S3 requests.");
165+
println!(
166+
"We'll perform a few operations on bucket '{}' with debug logging enabled.\n",
167+
args.bucket
168+
);
169+
170+
// Create the debug logging hook
171+
let debug_hook = Arc::new(DebugLoggingHook::new(args.verbose));
172+
173+
// Create MinIO client with the debug logging hook attached
174+
let static_provider = StaticProvider::new(
175+
"Q3AM3UQ867SPQQA43P2F",
176+
"zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
177+
None,
178+
);
179+
180+
let client: MinioClient = MinioClientBuilder::new("https://play.min.io".parse()?)
181+
.provider(Some(static_provider))
182+
.hook(debug_hook) // Attach the debug logging hook
183+
.build()?;
184+
185+
println!("✓ Created MinIO client with debug logging hook\n");
186+
187+
// Operation 1: Check if bucket exists
188+
println!("📋 Checking if bucket exists...");
189+
let resp: BucketExistsResponse = client.bucket_exists(&args.bucket).build().send().await?;
190+
191+
// Operation 2: Create bucket if it doesn't exist
192+
if !resp.exists() {
193+
println!("\n📋 Creating bucket...");
194+
client.create_bucket(&args.bucket).build().send().await?;
195+
} else {
196+
println!("\n✓ Bucket already exists");
197+
}
198+
199+
// Operation 3: Upload a small object
200+
println!("\n📋 Uploading object...");
201+
let content = b"Hello from MinIO Rust SDK with debug logging!";
202+
let object_content: ObjectContent = content.to_vec().into();
203+
client
204+
.put_object_content(&args.bucket, &args.object, object_content)
205+
.build()
206+
.send()
207+
.await?;
208+
209+
// Operation 4: List objects in the bucket
210+
println!("\n📋 Listing objects in bucket...");
211+
let mut list_stream = client
212+
.list_objects(&args.bucket)
213+
.recursive(false)
214+
.build()
215+
.to_stream()
216+
.await;
217+
218+
let mut total_objects = 0;
219+
while let Some(result) = list_stream.next().await {
220+
match result {
221+
Ok(resp) => {
222+
total_objects += resp.contents.len();
223+
}
224+
Err(e) => {
225+
eprintln!("Error listing objects: {e}");
226+
}
227+
}
228+
}
229+
println!("\n✓ Found {total_objects} objects in bucket");
230+
231+
println!("\n🎉 All operations completed successfully with debug logging enabled!\n");
232+
println!("💡 Tip: Run with --no-verbose to disable detailed output\n");
233+
234+
Ok(())
235+
}

src/s3/client.rs

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use dashmap::DashMap;
2020
use http::HeaderMap;
2121
pub use hyper::http::Method;
2222
use reqwest::Body;
23-
pub use reqwest::{Error as ReqwestError, Response};
23+
pub use reqwest::Response;
2424
use std::fmt::Debug;
2525
use std::fs::File;
2626
use std::io::prelude::*;
@@ -487,6 +487,7 @@ impl MinioClient {
487487
let date = utc_now();
488488
headers.add(X_AMZ_DATE, to_amz_date(date));
489489

490+
// Allow hooks to modify the request before signing (e.g., for client-side load balancing)
490491
let url_before_hook = url.to_string();
491492
self.run_before_signing_hooks(
492493
method,
@@ -496,16 +497,19 @@ impl MinioClient {
496497
query_params,
497498
bucket_name,
498499
object_name,
499-
body.clone(),
500+
&body,
500501
&mut extensions,
501502
)
502503
.await?;
503504

504-
let url_after_hook = url.to_string();
505-
if url_before_hook != url_after_hook {
506-
headers.add("x-minio-redirect", url_after_hook);
505+
// If a hook modified the URL (e.g., redirecting to a different MinIO node for load balancing),
506+
// add the x-minio-redirect header to inform the server about the client-side redirection.
507+
// This enables server-side telemetry, debugging, and load balancing metrics.
508+
// The header contains the actual endpoint where the request is being sent.
509+
if url.to_string() != url_before_hook {
510+
headers.add("x-minio-redirect", url.to_string());
507511
}
508-
512+
509513
if let Some(p) = &self.shared.provider {
510514
let creds = p.fetch();
511515
if creds.session_token.is_some() {
@@ -532,31 +536,9 @@ impl MinioClient {
532536
}
533537
}
534538

535-
if false {
536-
let mut header_strings: Vec<String> = headers
537-
.iter_all()
538-
.map(|(k, v)| format!("{}: {}", k, v.join(",")))
539-
.collect();
540-
541-
// Sort headers alphabetically by name
542-
header_strings.sort();
543-
544-
let debug_str = format!(
545-
"S3 request: {method} url={:?}; headers={:?}; body={body:?}",
546-
url.path,
547-
header_strings.join("; ")
548-
);
549-
let truncated = if debug_str.len() > 1000 {
550-
format!("{}...", &debug_str[..997])
551-
} else {
552-
debug_str
553-
};
554-
println!("{truncated}");
555-
}
556-
557539
if (*method == Method::PUT) || (*method == Method::POST) {
558540
//TODO: why-oh-why first collect into a vector and then iterate to a stream?
559-
let bytes_vec: Vec<Bytes> = match body.clone() {
541+
let bytes_vec: Vec<Bytes> = match body.as_ref() {
560542
Some(v) => v.iter().collect(),
561543
None => Vec::new(),
562544
};
@@ -698,7 +680,7 @@ impl MinioClient {
698680
query_params: &Multimap,
699681
bucket_name: Option<&str>,
700682
object_name: Option<&str>,
701-
body: Option<Arc<SegmentedBytes>>,
683+
body: &Option<Arc<SegmentedBytes>>,
702684
extensions: &mut http::Extensions,
703685
) -> Result<(), Error> {
704686
for hook in self.shared.client_hooks.iter() {

0 commit comments

Comments
 (0)