Replace query request-response with libp2p-stream#196
Conversation
There was a problem hiding this comment.
Pull request overview
This PR migrates worker query handling from libp2p request-response to direct libp2p-stream substreams, shifting per-stream IO and backpressure handling to the caller (worker-rs) and simplifying the transport-layer worker behaviour.
Changes:
- Update
P2PTransportBuilder::build_worker()to register query + SQL query stream protocols and return the correspondingIncomingStreamsto the caller. - Refactor worker transport behaviour to remove query/SQL query request-response handling, result queues, and related send methods.
- Extend protobuf schema with a new
TimeReportmessage and attach it toQueryExecuted.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
crates/transport/src/lib.rs |
Re-exports libp2p::Stream and (under worker) libp2p_stream::IncomingStreams for downstream stream handling. |
crates/transport/src/builder.rs |
build_worker() now constructs a libp2p_stream::Behaviour, accepts query protocols, and returns the IncomingStreams alongside events + handle. |
crates/transport/src/actors/worker.rs |
Removes query/SQL query request-response server behaviours and result queues; embeds a stream behaviour instead. |
crates/messages/proto/messages.proto |
Adds TimeReport and a new exec_time_report field on QueryExecuted. |
Comments suppressed due to low confidence (1)
crates/transport/src/actors/worker.rs:59
WorkerConfigstill includessend_logs_timeout, but it is no longer used anywhere in the transport implementation (no reads of this field in the crate). This makes the config misleading for callers. Either remove the field (and update defaults/serde expectations) or wire it into the logs response path (e.g., apply it when sending logs responses).
pub struct WorkerConfig {
pub logs_queue_size: usize,
pub status_queue_size: usize,
pub events_queue_size: usize,
pub shutdown_timeout: Duration,
pub query_execution_timeout: Duration,
pub send_logs_timeout: Duration,
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) -> Result< | ||
| ( | ||
| impl Stream<Item = WorkerEvent>, | ||
| WorkerTransportHandle, | ||
| libp2p_stream::IncomingStreams, | ||
| libp2p_stream::IncomingStreams, | ||
| ), |
There was a problem hiding this comment.
build_worker now returns a 4-tuple with two IncomingStreams values of the same type. This is easy to accidentally swap at call sites (query vs SQL query), and the compiler can’t help. Consider returning a small struct (or a tuple of distinct wrapper types) with named fields to make the API self-documenting and harder to misuse.
| // Create stream behaviour for query protocols. | ||
| // IncomingStreams are returned to the caller for direct handling. | ||
| let query_stream_behaviour = libp2p_stream::Behaviour::new(); | ||
| let mut control = query_stream_behaviour.new_control(); | ||
| let query_streams = control | ||
| .accept(StreamProtocol::new(QUERY_PROTOCOL)) | ||
| .expect("Query protocol should not already be registered"); | ||
| let sql_query_streams = control | ||
| .accept(StreamProtocol::new(SQL_QUERY_PROTOCOL)) | ||
| .expect("SQL query protocol should not already be registered"); |
There was a problem hiding this comment.
The query_stream_behaviour variable actually registers both QUERY_PROTOCOL and SQL_QUERY_PROTOCOL. Renaming it (and the associated query_streams behaviour field in the worker) to something protocol-agnostic like stream_behaviour/query_protocols would reduce confusion when adding more protocols later.
| base: Wrapped<BaseBehaviour>, | ||
| query: QueryBehaviour, | ||
| sql_query: SqlQueryBehaviour, | ||
| query_streams: libp2p_stream::Behaviour, |
There was a problem hiding this comment.
InnerBehaviour’s query_streams field is a libp2p_stream::Behaviour that is used to accept both query and SQL query substreams. The field name implies it only handles queries, which can be confusing now that SQL is registered here too. Consider renaming the field (and corresponding constructor parameter) to reflect that it covers multiple protocols.
| query_streams: libp2p_stream::Behaviour, | |
| protocol_streams: libp2p_stream::Behaviour, |
b77d9ec to
282b6b4
Compare
Remove query and sql_query ServerBehaviour instances from WorkerBehaviour. Replace with a single libp2p_stream::Behaviour that accepts QUERY_PROTOCOL and SQL_QUERY_PROTOCOL. IncomingStreams are returned from build_worker() for direct handling by the worker crate. This eliminates: - Query/SqlQuery event variants (and their lossy queues) - query_results/sql_query_results response channels - send_query_result/send_sql_query_result on WorkerTransportHandle - 2 arms from the transport select! loop Queries are now handled directly on per-stream tasks with full backpressure from libp2p-stream's channel(0) semantics. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
282b6b4 to
0814d64
Compare
Summary
build_worker()now returnsIncomingStreamsfor both query and SQL query protocols, giving the caller (worker-rs) direct control over stream handlingWorkerBehaviourandWorkerTransportHandlelibp2p_stream::IncomingStreamsandlibp2p::Streamfrom the crateWhat changed
builder.rs:build_worker()creates alibp2p_stream::Behaviour, registers query and SQL query protocols viacontrol.accept(), and returns theIncomingStreamsalongside the event stream and transport handle.actors/worker.rs:WorkerBehaviourreplaces the twoServerBehaviour<ProtoCodec<Query, QueryResult>>with a singlelibp2p_stream::Behaviour. All query routing, result sending, and bounded queue logic is removed — the worker-rs crate now reads/writes query protobufs directly on each QUIC stream.lib.rs: ExportsIncomingStreamsandStreamfor use by worker-rs.Why
The old architecture routed queries through:
send_request → swarm event loop → bounded channel (try_send) → worker → send_response → swarm event loop. The bounded channel silently dropped requests under load. The new architecture spawns a task per stream with QUIC backpressure. See worker-rs PR #52 for benchmark results showing 3-9x improvement.Test plan
🤖 Generated with Claude Code