Skip to content

Migrate query handling from request-response to libp2p-stream#52

Open
mo4islona wants to merge 2 commits intomasterfrom
refactor/p2p-controller
Open

Migrate query handling from request-response to libp2p-stream#52
mo4islona wants to merge 2 commits intomasterfrom
refactor/p2p-controller

Conversation

@mo4islona
Copy link

@mo4islona mo4islona commented Mar 2, 2026

Summary

Replaces the libp2p request-response protocol with direct libp2p-stream for query handling, addressing issue #25.

Depends on: subsquid/sqd-network#196

Key changes:

  • Each incoming query gets its own QUIC stream and dedicated tokio task, replacing the bounded lossy queue (try_send) that silently dropped requests under load
  • Semaphore-based concurrency control sends ServiceOverloaded errors when at capacity instead of silently dropping
  • Reads query protobuf directly from the stream, writes response back — no swarm event loop relay
  • Removes unit tests replaced by e2e integration tests (on master)

Architecture: before vs after

OLD (request-response):
  client -> send_request -> swarm event loop -> bounded queue (try_send, lossy!) -> worker -> send_response

NEW (stream):
  client -> open_stream -> [semaphore gate] -> spawn task -> read/write directly on QUIC stream

Benchmark Results

Full results in RESULT.md. P2P transport benchmark (benches/p2p_transport.rs) compares both approaches over QUIC on localhost.

Concurrent queries with realistic processing delay (1-10ms)

Processing Delay Stream QPS Req-Resp QPS Speedup
0.5 ms 4,035 603 6.7x
1 ms 3,761 455 8.3x
5 ms 1,358 150 9.1x
10 ms 796 84 9.4x

Under high load (concurrency=50)

Request-response silently drops 96-99% of requests because the bounded queue overflows. Stream processes every request with QUIC backpressure.

Sustained load (3s continuous, 1ms processing, 10KB response)

Concurrency Stream QPS Req-Resp QPS Req-Resp Drops Speedup
5 1,597 457 0 3.5x
10 3,618 460 0 7.9x
20 5,623 30,037* 98.3% dropped -

Test plan

  • E2e integration tests pass on both master and feature branch
  • P2P transport benchmark proves stream is 3-9x faster with realistic workloads
  • Review query error handling paths (ServiceOverloaded, BadRequest, etc.)
  • Verify with live testnet deployment

🤖 Generated with Claude Code

Copilot AI review requested due to automatic review settings March 2, 2026 10:41

This comment was marked as outdated.

@mo4islona mo4islona marked this pull request as draft March 2, 2026 11:42
@mo4islona mo4islona force-pushed the refactor/p2p-controller branch 2 times, most recently from 3cc13cf to 04b6b0a Compare March 2, 2026 12:10
- Split binary crate into lib+bin so tests/benchmarks can access internals
- Add e2e integration tests exercising real queries against parquet data
- Add query throughput benchmark (Worker-level load test)
- Add P2P transport benchmark comparing libp2p-stream vs request-response
- Fix clap attribute on sentry_is_enabled

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@mo4islona mo4islona force-pushed the refactor/p2p-controller branch from 04b6b0a to ef1a258 Compare March 2, 2026 20:32
@mo4islona mo4islona changed the title Add tests and plan for P2P controller refactoring (issue #25) Migrate query handling from request-response to libp2p-stream Mar 2, 2026
@mo4islona mo4islona marked this pull request as ready for review March 2, 2026 20:43
@mo4islona mo4islona force-pushed the refactor/p2p-controller branch 3 times, most recently from 42088f3 to f2450b3 Compare March 2, 2026 21:25
- Replace request-response protocol with direct libp2p-stream for query handling
- Spawn dedicated task per incoming stream with backpressure via QUIC flow control
- Remove bounded lossy queue (try_send) that dropped requests under load
- Remove unit tests replaced by e2e integration tests

Closes #25

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@mo4islona mo4islona force-pushed the refactor/p2p-controller branch from f2450b3 to 672ea1b Compare March 2, 2026 21:43
.build_and_write_response(
&mut stream,
query_id,
result.clone(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small optimization: result.clone() here copies the entire QueryOk.data (Vec<u8>, up to 100MB) just so generate_log() can compute sha3_256() on the original below at line 300.

You could compute the hash and extract the log metadata before this call, then pass ownership without cloning:

// Pre-compute what generate_log() needs before consuming result
let log_meta = match &result {
    Ok(r) => Some((r.data.len() as u64, r.sha3_256().await, r.last_block)),
    Err(_) => None,
};

// No clone needed — pass ownership
match self.build_and_write_response(&mut stream, query_id, result, retry_after, compression).await {
    // ...
}

// Use pre-computed log_meta for generate_log()

This avoids a potentially 100MB allocation per query just for logging purposes.

Copy link
Author

@mo4islona mo4islona Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to include my common rust skills
Let me ask him to fix everything

---
 PR Analysis: refactor/p2p-controller — Stream-based Query Handling

 Overview

 Single commit 672ea1b migrating query handling from libp2p request-response to libp2p-stream. Solid architectural change with benchmark data backing the decision (3-9x throughput improvement, proper
  backpressure instead of silent drops).

 ---
 Code Style (coding-guidelines)

 Good:
 - Naming follows Rust conventions: run_query_accept_loop, handle_query_stream, build_and_write_response
 - QueryType correctly derives Debug, Clone, Copy — needed for the new spawned tasks
 - Doc comments on new public/internal methods are concise and useful
 - Proper use of #[instrument(skip_all)] for tracing

 Issues:

 1. Duplicate std import blocks (p2p.rs:1 vs p2p.rs:16) — std::sync::atomic is imported separately from the main std import at line 1:
 use std::{env, sync::Arc, time::Duration};  // line 1
 // ...
 use std::sync::atomic::{AtomicUsize, Ordering};  // line 16
 1. These should be merged into one use std::{...} block.
 2. Removed all clippy lint attributes (main.rs) — The #![warn(clippy::correctness)], #![warn(clippy::perf)], etc. were all removed. This reduces static analysis coverage. If intentional, consider at
  minimum keeping clippy::correctness and clippy::perf.
 3. result.clone() on line 281 — The QueryResult is cloned to pass into build_and_write_response, then the original is mutated with timing info and used for logging. The old code had the same clone
 with a comment "Cloning is much cheaper than hash computation" — that context is now lost. This clone is potentially expensive if the result contains large data (Vec<u8>).

 ---
 Performance (m10-performance)

 Good:
 - AtomicUsize for active_queries counter — correct choice per guidelines ("Atomics for primitives, not Mutex for bool/usize")
 - Ordering::Relaxed is appropriate here — no data dependencies between the counter and query processing, just a best-effort concurrency limiter
 - Direct stream write eliminates the event-loop relay hop (confirmed by benchmarks)
 - tokio::task::block_in_place for signing — correct for CPU-bound work in async context

 Issues:

 4. read_query_from_stream allocates unbounded (p2p.rs:662):
 let mut buf = Vec::new();
 let max_size = protocol::MAX_QUERY_MSG_SIZE;
 let bytes_read = stream.take(max_size + 1).read_to_end(&mut buf).await
 4. Vec::new() starts at 0 capacity, then read_to_end will grow it through multiple reallocations. Since you know the max is MAX_QUERY_MSG_SIZE, consider
 Vec::with_capacity(some_reasonable_initial_size) to reduce reallocation churn on the hot path.
 5. Double encoding check — build_and_write_response calls msg.encoded_len() (line 351) to check size, then msg.encode_to_vec() (line 356) which re-encodes. The encode_to_vec internally computes
 length again. Consider using msg.encode_length_delimited_to_vec() or pre-allocating with the known length:
 let result_size = msg.encoded_len();
 let mut bytes = Vec::with_capacity(result_size);
 msg.encode(&mut bytes).unwrap();
 6. block_in_place for signing in build_error_response_message (p2p.rs:727) — This is called from run_query_accept_loop which is on the accept task. block_in_place will block the entire accept loop
 while signing an error response. For the overload rejection path this adds unnecessary latency to accepting new streams. Consider tokio::spawn_blocking or moving signing off the accept loop.

 ---
 Concurrency (m07-concurrency)

 Good:
 - Proper tokio::select! with cancellation token in the accept loop
 - One-task-per-stream model is clean and avoids the previous bounded-queue bottleneck
 - fetch_add / fetch_sub pattern for the counter with proper guard on the spawned task

 Issues:

 7. TOCTOU race in concurrency limiter (p2p.rs:206-214):
 if self.active_queries.fetch_add(1, Ordering::Relaxed) >= self.max_queries {
     self.active_queries.fetch_sub(1, Ordering::Relaxed);
     // reject
 }
 7. This is a well-known pattern and works correctly — the counter may briefly exceed max_queries by the number of concurrent accept loops (2: plain + SQL), but fetch_add is atomic so no queries are
 lost. This is acceptable, just noting it's not a hard limit. Worth a comment.
 8. No JoinHandle tracking for spawned query tasks — When cancellation fires, run_query_accept_loop breaks but in-flight query tasks spawned via tokio::spawn continue running detached. The old code
 used for_each_concurrent which would wait. Consider whether graceful drain of in-flight queries matters for correctness (e.g., log integrity).

 ---
 Anti-Patterns (m15-anti-pattern)

 9. result.clone() hiding ownership issue (p2p.rs:281) — As noted above, result is cloned because it's needed for both writing the response and generating the log. A more zero-copy approach: extract
 the data you need for logging before passing ownership to build_and_write_response, or have build_and_write_response return the timing info separately without needing the result clone.
 10. .to_string() allocation in error path (p2p.rs:258):
 QueryError::BadRequest("Invalid signature or timestamp".to_string())
 10. Minor, but this allocates on every invalid query. If QueryError::BadRequest could accept &'static str or Cow<str>, this would be zero-cost on the rejection path.
 11. Unused imports — QueueFull, ResponseChannel, ReceiverStream may now be partially unused (check after removing the old request-response code paths). ResponseChannel is still used for logs, but
 ReceiverStream import should be verified.

 ---
 Summary

 ┌───────────────┬───────────┬───────────────────────────────────────────────────────────────────────┐
 │   Category    │   Grade   │                                 Notes                                 │
 ├───────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
 │ Architecture  │ Excellent │ Stream-per-query eliminates bottleneck, proper backpressure           │
 ├───────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
 │ Performance   │ Good      │ A few allocation optimizations possible on hot path                   │
 ├───────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
 │ Concurrency   │ Good      │ Correct atomic pattern; consider graceful shutdown of in-flight tasks │
 ├───────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
 │ Code Style    │ Good      │ Minor import organization; lost clippy lints                          │
 ├───────────────┼───────────┼───────────────────────────────────────────────────────────────────────┤
 │ Anti-Patterns │ Minor     │ One unnecessary clone on hot path                                     │
 └───────────────┴───────────┴───────────────────────────────────────────────────────────────────────┘

 Top 3 actionable items:
 1. Pre-allocate buffer in read_query_from_stream (Vec::with_capacity)
 2. Avoid block_in_place signing on the accept loop path (write_error_to_stream → build_error_response_message)
 3. Consider tracking spawned task handles for graceful shutdown of in-flight queries
 

Copy link
Author

@mo4islona mo4islona Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Gradonsky so we have to make a trade-off

The clone was also blocking the response — it happened before build_and_write_response:

Old flow:
  clone 100MB (~10-50ms, blocks response)  →  send response  →  sha3 hash (~250ms, after response sent)

Suggested flow:
  sha3 hash (~250ms, blocks response)  →  send response
  

┌───────────┬────────────┬────────┬────────┐
│ Algorithm │ Throughput │  1 MB  │ 100 MB │
├───────────┼────────────┼────────┼────────┤
│ SHA2-256  │ ~1 GB/s    │ ~1ms   │ ~100ms │
├───────────┼────────────┼────────┼────────┤
│ SHA3-256  │ ~400 MB/s  │ ~2.5ms │ ~250ms │
├───────────┼────────────┼────────┼────────┤
│ BLAKE3    │ ~5 GB/s    │ ~0.2ms │ ~20ms  │
└───────────┴────────────┴────────┴────────┘

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants