From 6bda5b0fe7565e71702b72f073a48c19d42c3ca1 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 10 Nov 2025 03:45:51 +0000 Subject: [PATCH 1/2] feat: add Axum webhook server for automated gem index filtering Add a new webhook server binary that: - Accepts POST requests to /webhook endpoint - Downloads RubyGems versions index from rubygems.org - Fetches allowlist from S3 - Filters index using existing gem-index-filter library - Computes SHA-256 checksums - Uploads filtered results to S3 with timestamps - Maintains latest pointers for easy access - Handles graceful shutdown with background task tracking Technical details: - Built with Axum 0.7 for async HTTP server - Uses AWS SDK for S3 operations - Integrates with existing streaming filter library - Returns 202 Accepted for async processing - Strips versions to minimize output size - All dependencies are optional (server feature flag) Configuration via environment variables: - BUCKET_NAME: S3 bucket (default: rubygems-filtered) - ALLOWLIST_KEY: S3 key for allowlist (default: allowlist.txt) Build with: cargo build --bin webhook-server --features server --- Cargo.toml | 18 +++ WEBHOOK_SERVER.md | 194 +++++++++++++++++++++++++++++++++ src/bin/webhook-server.rs | 223 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 435 insertions(+) create mode 100644 WEBHOOK_SERVER.md create mode 100644 src/bin/webhook-server.rs diff --git a/Cargo.toml b/Cargo.toml index c7137bc..b7a7c85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,26 @@ rustc-hash = "2.0" sha2 = "0.10" hex = "0.4" +# Optional dependencies for webhook server +axum = { version = "0.7", optional = true } +tokio = { version = "1", features = ["full"], optional = true } +serde = { version = "1.0", features = ["derive"], optional = true } +serde_json = { version = "1.0", optional = true } +aws-config = { version = "1.0", optional = true } +aws-sdk-s3 = { version = "1.0", optional = true } +reqwest = { version = "0.11", optional = true } +chrono = { version = "0.4", optional = true } + [[bin]] name = "gem-index-filter" path = "src/main.rs" +[[bin]] +name = "webhook-server" +path = "src/bin/webhook-server.rs" +required-features = ["server"] + +[features] +server = ["axum", "tokio", "serde", "serde_json", "aws-config", "aws-sdk-s3", "reqwest", "chrono"] + [dev-dependencies] diff --git a/WEBHOOK_SERVER.md b/WEBHOOK_SERVER.md new file mode 100644 index 0000000..f89a0df --- /dev/null +++ b/WEBHOOK_SERVER.md @@ -0,0 +1,194 @@ +# Webhook Server + +A high-performance webhook server that automatically filters the RubyGems versions index and uploads the filtered results to S3. + +## Features + +- **Webhook-triggered processing**: Accepts POST requests to `/webhook` endpoint +- **Async processing**: Returns immediately (202 Accepted) while processing in background +- **Streaming filtering**: Uses gem-index-filter library for memory-efficient processing +- **S3 integration**: Fetches allowlist from S3 and uploads filtered results +- **SHA-256 checksums**: Automatically computes and stores checksums +- **Latest pointers**: Updates `filtered-latest.bin` and `filtered-latest.sha256` for easy access +- **Graceful shutdown**: Waits for active tasks to complete on shutdown + +## Building + +Build the webhook server with the `server` feature: + +```bash +cargo build --bin webhook-server --features server --release +``` + +## Configuration + +Configure via environment variables: + +| Variable | Description | Default | +|----------|-------------|---------| +| `BUCKET_NAME` | S3 bucket for allowlist and output | `rubygems-filtered` | +| `ALLOWLIST_KEY` | S3 key for allowlist file | `allowlist.txt` | + +AWS credentials are loaded from the environment (via AWS SDK defaults). + +## Running + +```bash +export BUCKET_NAME=my-rubygems-bucket +export ALLOWLIST_KEY=allowlist.txt +export AWS_REGION=us-east-1 +export AWS_ACCESS_KEY_ID=... +export AWS_SECRET_ACCESS_KEY=... + +./target/release/webhook-server +``` + +The server listens on `0.0.0.0:8080`. + +## API + +### POST /webhook + +Triggers index filtering and S3 upload. + +**Request:** +```bash +curl -X POST http://localhost:8080/webhook +``` + +**Response:** +```json +{ + "status": "accepted" +} +``` + +HTTP 202 Accepted - Processing happens in background. + +## Processing Flow + +1. **Webhook received**: Returns 202 immediately, spawns background task +2. **Fetch allowlist**: Downloads allowlist from S3 (`BUCKET_NAME/ALLOWLIST_KEY`) +3. **Download index**: Fetches https://index.rubygems.org/versions +4. **Filter**: Uses gem-index-filter to filter with allowlist, strips versions +5. **Compute checksum**: Calculates SHA-256 of filtered output +6. **Upload timestamped files**: + - `versions/filtered-YYYYMMDD-HHMMSS.bin` + - `versions/filtered-YYYYMMDD-HHMMSS.sha256` +7. **Update latest pointers**: + - `versions/filtered-latest.bin` → latest timestamped version + - `versions/filtered-latest.sha256` → latest checksum + +## Allowlist Format + +The allowlist file in S3 should contain one gem name per line: + +``` +rails +sinatra +puma +rack +# Comments are supported +nokogiri +``` + +Empty lines and lines starting with `#` are ignored. + +## Example S3 Structure + +After processing, your S3 bucket will contain: + +``` +s3://my-rubygems-bucket/ + allowlist.txt + versions/ + filtered-20241110-143052.bin + filtered-20241110-143052.sha256 + filtered-20241110-154521.bin + filtered-20241110-154521.sha256 + filtered-latest.bin -> copy of most recent + filtered-latest.sha256 -> copy of most recent checksum +``` + +## Performance + +- **Input**: ~21MB RubyGems versions index +- **Allowlist**: 10,000 gems +- **Processing time**: ~100ms filtering + network I/O +- **Memory**: ~3MB for filtering +- **Output**: Typically <1MB (with version stripping) + +## Graceful Shutdown + +Press Ctrl+C to initiate graceful shutdown. The server will: + +1. Stop accepting new webhook requests +2. Wait for all active background tasks to complete +3. Exit cleanly + +## Integration Examples + +### With RubyGems Webhook + +Configure RubyGems.org to send webhook notifications to your server when gems are updated. + +### Scheduled Updates (cron) + +```bash +# Update every hour +0 * * * * curl -X POST https://your-server.com/webhook +``` + +### GitHub Actions + +```yaml +- name: Trigger gem index update + run: curl -X POST https://your-server.com/webhook +``` + +## Monitoring + +The server logs to stdout/stderr: + +``` +Listening on 0.0.0.0:8080 +Fetching allowlist from S3: rubygems-filtered/allowlist.txt +Loaded 10000 gems in allowlist +Fetching RubyGems index from https://index.rubygems.org/versions +Downloaded 21428463 bytes, filtering... +Filtered to 823451 bytes, SHA-256: abc123... +Uploaded: versions/filtered-20241110-143052.bin and versions/filtered-20241110-143052.sha256 (also updated latest pointers) +``` + +## Error Handling + +- **S3 errors**: Logged to stderr, task fails but server continues +- **Network errors**: Logged to stderr, task fails but server continues +- **Filter errors**: Logged to stderr, task fails but server continues + +All errors are non-fatal to the server process - only the individual webhook task fails. + +## Security Considerations + +- **No authentication**: Add a reverse proxy (nginx, API Gateway) for authentication +- **Rate limiting**: Not implemented - add via reverse proxy if needed +- **IAM permissions**: Server needs S3 read/write access to configured bucket + +Required IAM permissions: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:PutObject", + "s3:CopyObject" + ], + "Resource": "arn:aws:s3:::my-rubygems-bucket/*" + } + ] +} +``` diff --git a/src/bin/webhook-server.rs b/src/bin/webhook-server.rs new file mode 100644 index 0000000..d0bf5b5 --- /dev/null +++ b/src/bin/webhook-server.rs @@ -0,0 +1,223 @@ +use axum::{http::StatusCode, response::IntoResponse, routing::post, Router}; +use aws_sdk_s3::Client as S3Client; +use gem_index_filter::{filter_versions_streaming, DigestAlgorithm, FilterMode, VersionOutput}; +use serde::Serialize; +use std::collections::HashSet; +use std::io::Cursor; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::task::JoinSet; + +#[derive(Clone)] +struct AppState { + s3_client: S3Client, + active_tasks: Arc>>, + bucket_name: String, + allowlist_key: String, +} + +#[derive(Serialize)] +struct AcceptedResponse { + status: String, +} + +#[tokio::main] +async fn main() { + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .load() + .await; + let s3_client = S3Client::new(&config); + + let state = AppState { + s3_client, + active_tasks: Arc::new(Mutex::new(JoinSet::new())), + bucket_name: std::env::var("BUCKET_NAME").unwrap_or("rubygems-filtered".to_string()), + allowlist_key: std::env::var("ALLOWLIST_KEY") + .unwrap_or("allowlist.txt".to_string()), + }; + + let app = Router::new() + .route("/webhook", post(handle_webhook)) + .with_state(state.clone()); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:8080") + .await + .unwrap(); + + println!("Listening on 0.0.0.0:8080"); + + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal(state.active_tasks)) + .await + .unwrap(); +} + +async fn handle_webhook( + axum::extract::State(state): axum::extract::State, +) -> impl IntoResponse { + let s3_client = state.s3_client.clone(); + let bucket_name = state.bucket_name.clone(); + let allowlist_key = state.allowlist_key.clone(); + + state.active_tasks.lock().await.spawn(async move { + if let Err(e) = process_index(s3_client, bucket_name, allowlist_key).await { + eprintln!("Error processing index: {}", e); + } + }); + + ( + StatusCode::ACCEPTED, + axum::Json(AcceptedResponse { + status: "accepted".to_string(), + }), + ) +} + +async fn process_index( + s3_client: S3Client, + bucket_name: String, + allowlist_key: String, +) -> Result<(), Box> { + println!("Fetching allowlist from S3: {}/{}", bucket_name, allowlist_key); + + // Fetch allowlist from S3 + let allowlist = fetch_allowlist(&s3_client, &bucket_name, &allowlist_key).await?; + println!("Loaded {} gems in allowlist", allowlist.len()); + + println!("Fetching RubyGems index from https://index.rubygems.org/versions"); + + // Fetch the RubyGems index + let response = reqwest::get("https://index.rubygems.org/versions") + .await? + .bytes() + .await?; + + println!("Downloaded {} bytes, filtering...", response.len()); + + // Filter the gem index using the existing library + let (filtered_data, checksum) = filter_gem_index(&response, &allowlist)?; + + println!( + "Filtered to {} bytes, SHA-256: {}", + filtered_data.len(), + checksum + ); + + // Upload filtered data with timestamp + let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S"); + let data_key = format!("versions/filtered-{}.bin", timestamp); + + s3_client + .put_object() + .bucket(&bucket_name) + .key(&data_key) + .body(filtered_data.into()) + .content_type("application/octet-stream") + .send() + .await?; + + // Upload checksum as metadata file + let checksum_key = format!("versions/filtered-{}.sha256", timestamp); + s3_client + .put_object() + .bucket(&bucket_name) + .key(&checksum_key) + .body(checksum.into_bytes().into()) + .content_type("text/plain") + .send() + .await?; + + // Update "latest" pointers + let latest_data_key = "versions/filtered-latest.bin"; + let latest_checksum_key = "versions/filtered-latest.sha256"; + + // Copy the timestamped versions to the latest pointers + s3_client + .copy_object() + .bucket(&bucket_name) + .copy_source(format!("{}/{}", bucket_name, data_key)) + .key(latest_data_key) + .send() + .await?; + + s3_client + .copy_object() + .bucket(&bucket_name) + .copy_source(format!("{}/{}", bucket_name, checksum_key)) + .key(latest_checksum_key) + .send() + .await?; + + println!( + "Uploaded: {} and {} (also updated latest pointers)", + data_key, checksum_key + ); + Ok(()) +} + +/// Fetch and parse allowlist from S3 +async fn fetch_allowlist( + s3_client: &S3Client, + bucket_name: &str, + key: &str, +) -> Result, Box> { + let response = s3_client + .get_object() + .bucket(bucket_name) + .key(key) + .send() + .await?; + + let bytes = response.body.collect().await?.into_bytes(); + let content = String::from_utf8(bytes.to_vec())?; + + let mut allowlist = HashSet::new(); + for line in content.lines() { + let gem_name = line.trim(); + // Skip empty lines and comments + if !gem_name.is_empty() && !gem_name.starts_with('#') { + allowlist.insert(gem_name.to_string()); + } + } + + Ok(allowlist) +} + +/// Filter gem index using the existing gem-index-filter library +fn filter_gem_index( + data: &[u8], + allowlist: &HashSet, +) -> Result<(Vec, String), Box> { + // Convert HashSet to HashSet<&str> for FilterMode + let allowlist_refs: HashSet<&str> = allowlist.iter().map(|s| s.as_str()).collect(); + + // Create input reader from bytes + let input = Cursor::new(data); + + // Create output buffer + let mut output = Vec::new(); + + // Stream and filter with SHA-256 checksum computation + let checksum = filter_versions_streaming( + input, + &mut output, + FilterMode::Allow(&allowlist_refs), + VersionOutput::Strip, // Strip versions to reduce output size + Some(DigestAlgorithm::Sha256), + )?; + + Ok((output, checksum.unwrap_or_default())) +} + +async fn shutdown_signal(active_tasks: Arc>>) { + tokio::signal::ctrl_c() + .await + .expect("Failed to listen for ctrl-c"); + + println!("Shutdown signal received, waiting for active tasks..."); + + let mut tasks = active_tasks.lock().await; + while tasks.join_next().await.is_some() {} + + println!("All tasks completed"); +} From 08f5688e0dedfba66c5f4d02327767f02209e1f2 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 10 Nov 2025 03:55:31 +0000 Subject: [PATCH 2/2] refactor: use S3 checksum_sha256 field instead of separate files Replace separate .sha256 checksum files with S3's built-in checksum support. This simplifies the S3 structure and provides better integration with S3's native checksum verification features. Changes: - Store SHA-256 checksums using S3's checksum_sha256 field - Remove separate .sha256 file uploads - Remove latest checksum pointer (no longer needed) - Add base64 dependency for checksum encoding - Update documentation with examples of checksum retrieval Benefits: - Cleaner S3 bucket structure (fewer files) - Native S3 checksum verification on download - Checksums preserved during S3 copy operations - Reduced API calls (1 PUT instead of 2) The checksum is still computed during filtering and logged, but now stored as S3 object metadata rather than a separate file. --- Cargo.toml | 3 ++- WEBHOOK_SERVER.md | 47 ++++++++++++++++++++++++++++----------- src/bin/webhook-server.rs | 35 +++++++++-------------------- 3 files changed, 46 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7a7c85..97e1faf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ aws-config = { version = "1.0", optional = true } aws-sdk-s3 = { version = "1.0", optional = true } reqwest = { version = "0.11", optional = true } chrono = { version = "0.4", optional = true } +base64 = { version = "0.22", optional = true } [[bin]] name = "gem-index-filter" @@ -35,6 +36,6 @@ path = "src/bin/webhook-server.rs" required-features = ["server"] [features] -server = ["axum", "tokio", "serde", "serde_json", "aws-config", "aws-sdk-s3", "reqwest", "chrono"] +server = ["axum", "tokio", "serde", "serde_json", "aws-config", "aws-sdk-s3", "reqwest", "chrono", "base64"] [dev-dependencies] diff --git a/WEBHOOK_SERVER.md b/WEBHOOK_SERVER.md index f89a0df..1a2a2ec 100644 --- a/WEBHOOK_SERVER.md +++ b/WEBHOOK_SERVER.md @@ -72,12 +72,10 @@ HTTP 202 Accepted - Processing happens in background. 3. **Download index**: Fetches https://index.rubygems.org/versions 4. **Filter**: Uses gem-index-filter to filter with allowlist, strips versions 5. **Compute checksum**: Calculates SHA-256 of filtered output -6. **Upload timestamped files**: - - `versions/filtered-YYYYMMDD-HHMMSS.bin` - - `versions/filtered-YYYYMMDD-HHMMSS.sha256` -7. **Update latest pointers**: - - `versions/filtered-latest.bin` → latest timestamped version - - `versions/filtered-latest.sha256` → latest checksum +6. **Upload timestamped file**: + - `versions/filtered-YYYYMMDD-HHMMSS.bin` with embedded SHA-256 checksum +7. **Update latest pointer**: + - `versions/filtered-latest.bin` → copy of latest timestamped version (preserves checksum) ## Allowlist Format @@ -102,12 +100,35 @@ After processing, your S3 bucket will contain: s3://my-rubygems-bucket/ allowlist.txt versions/ - filtered-20241110-143052.bin - filtered-20241110-143052.sha256 - filtered-20241110-154521.bin - filtered-20241110-154521.sha256 - filtered-latest.bin -> copy of most recent - filtered-latest.sha256 -> copy of most recent checksum + filtered-20241110-143052.bin (with SHA-256 checksum metadata) + filtered-20241110-154521.bin (with SHA-256 checksum metadata) + filtered-latest.bin (copy of most recent, preserves checksum) +``` + +Each `.bin` file has its SHA-256 checksum stored as S3 object metadata (using S3's `checksum-sha256` field), eliminating the need for separate `.sha256` files. + +### Retrieving Checksums + +To retrieve the checksum for verification, use the AWS CLI: + +```bash +# Get checksum from object metadata +aws s3api head-object \ + --bucket my-rubygems-bucket \ + --key versions/filtered-latest.bin \ + --checksum-mode ENABLED \ + | jq -r '.ChecksumSHA256' +``` + +Or download with automatic verification: + +```bash +# S3 will automatically verify the checksum during download +aws s3api get-object \ + --bucket my-rubygems-bucket \ + --key versions/filtered-latest.bin \ + --checksum-mode ENABLED \ + filtered.bin ``` ## Performance @@ -157,7 +178,7 @@ Loaded 10000 gems in allowlist Fetching RubyGems index from https://index.rubygems.org/versions Downloaded 21428463 bytes, filtering... Filtered to 823451 bytes, SHA-256: abc123... -Uploaded: versions/filtered-20241110-143052.bin and versions/filtered-20241110-143052.sha256 (also updated latest pointers) +Uploaded: versions/filtered-20241110-143052.bin with SHA-256: abc123... (also updated latest pointer) ``` ## Error Handling diff --git a/src/bin/webhook-server.rs b/src/bin/webhook-server.rs index d0bf5b5..c7f927f 100644 --- a/src/bin/webhook-server.rs +++ b/src/bin/webhook-server.rs @@ -1,5 +1,6 @@ use axum::{http::StatusCode, response::IntoResponse, routing::post, Router}; use aws_sdk_s3::Client as S3Client; +use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; use gem_index_filter::{filter_versions_streaming, DigestAlgorithm, FilterMode, VersionOutput}; use serde::Serialize; use std::collections::HashSet; @@ -103,7 +104,11 @@ async fn process_index( checksum ); - // Upload filtered data with timestamp + // Convert hex checksum to base64 for S3 + let checksum_bytes = hex::decode(&checksum)?; + let checksum_base64 = BASE64.encode(&checksum_bytes); + + // Upload filtered data with timestamp and embedded SHA-256 checksum let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S"); let data_key = format!("versions/filtered-{}.bin", timestamp); @@ -113,25 +118,13 @@ async fn process_index( .key(&data_key) .body(filtered_data.into()) .content_type("application/octet-stream") + .checksum_sha256(&checksum_base64) .send() .await?; - // Upload checksum as metadata file - let checksum_key = format!("versions/filtered-{}.sha256", timestamp); - s3_client - .put_object() - .bucket(&bucket_name) - .key(&checksum_key) - .body(checksum.into_bytes().into()) - .content_type("text/plain") - .send() - .await?; - - // Update "latest" pointers + // Update "latest" pointer (preserves checksum metadata) let latest_data_key = "versions/filtered-latest.bin"; - let latest_checksum_key = "versions/filtered-latest.sha256"; - // Copy the timestamped versions to the latest pointers s3_client .copy_object() .bucket(&bucket_name) @@ -140,17 +133,9 @@ async fn process_index( .send() .await?; - s3_client - .copy_object() - .bucket(&bucket_name) - .copy_source(format!("{}/{}", bucket_name, checksum_key)) - .key(latest_checksum_key) - .send() - .await?; - println!( - "Uploaded: {} and {} (also updated latest pointers)", - data_key, checksum_key + "Uploaded: {} with SHA-256: {} (also updated latest pointer)", + data_key, checksum ); Ok(()) }