Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 10 additions & 147 deletions crates/transport/src/actors/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,18 @@ use libp2p_swarm_derive::NetworkBehaviour;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;

use sqd_messages::{LogsRequest, Query, QueryLogs, QueryResult, WorkerStatus};
use sqd_messages::{LogsRequest, QueryLogs, WorkerStatus};

use crate::{
QueueFull, behaviour::{
base::{BaseBehaviour, BaseBehaviourEvent}, noise::NoiseBehaviour, request_server::{Request, ServerBehaviour}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped}
}, codec::ProtoCodec, protocol::{
MAX_HEARTBEAT_SIZE, MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE, MAX_QUERY_MSG_SIZE, MAX_QUERY_RESULT_SIZE, QUERY_PROTOCOL, SQL_QUERY_PROTOCOL, WORKER_LOGS_PROTOCOL, WORKER_STATUS_PROTOCOL
MAX_HEARTBEAT_SIZE, MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE, WORKER_LOGS_PROTOCOL, WORKER_STATUS_PROTOCOL
}, record_event, util::{DEFAULT_SHUTDOWN_TIMEOUT, Receiver, Sender, TaskManager, new_queue}
};

#[derive(Debug)]
pub enum WorkerEvent {
/// Query received from a portal
Query {
peer_id: PeerId,
query: Query,
/// If this channel is dropped, the connection will be closed
resp_chan: ResponseChannel<QueryResult>,
},
/// SQLQuery received from a portal
SqlQuery {
peer_id: PeerId,
query: Query,
/// If this channel is dropped, the connection will be closed
resp_chan: ResponseChannel<QueryResult>,
},
/// Logs requested by a collector
LogsRequest {
request: LogsRequest,
Expand All @@ -50,26 +36,20 @@ pub enum WorkerEvent {
},
}

type QueryBehaviour = Wrapped<ServerBehaviour<ProtoCodec<Query, QueryResult>>>;
type SqlQueryBehaviour = Wrapped<ServerBehaviour<ProtoCodec<Query, QueryResult>>>;
type LogsBehaviour = Wrapped<ServerBehaviour<ProtoCodec<LogsRequest, QueryLogs>>>;
type StatusBehaviour = Wrapped<ServerBehaviour<ProtoCodec<(), WorkerStatus>>>;

#[derive(NetworkBehaviour)]
pub struct InnerBehaviour {
base: Wrapped<BaseBehaviour>,
query: QueryBehaviour,
sql_query: SqlQueryBehaviour,
query_streams: libp2p_stream::Behaviour,
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InnerBehaviour’s query_streams field is a libp2p_stream::Behaviour that is used to accept both query and SQL query substreams. The field name implies it only handles queries, which can be confusing now that SQL is registered here too. Consider renaming the field (and corresponding constructor parameter) to reflect that it covers multiple protocols.

Suggested change
query_streams: libp2p_stream::Behaviour,
protocol_streams: libp2p_stream::Behaviour,

Copilot uses AI. Check for mistakes.
logs: LogsBehaviour,
status: StatusBehaviour,
noise: NoiseBehaviour,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
pub heartbeats_queue_size: usize,
pub query_results_queue_size: usize,
pub sql_query_results_queue_size: usize,
pub logs_queue_size: usize,
pub status_queue_size: usize,
pub events_queue_size: usize,
Expand All @@ -81,9 +61,6 @@ pub struct WorkerConfig {
impl Default for WorkerConfig {
fn default() -> Self {
Self {
heartbeats_queue_size: 100,
query_results_queue_size: 100,
sql_query_results_queue_size: 100,
logs_queue_size: 1,
status_queue_size: 10,
events_queue_size: 100,
Expand All @@ -99,23 +76,16 @@ pub struct WorkerBehaviour {
}

impl WorkerBehaviour {
pub fn new(mut base: BaseBehaviour, config: &WorkerConfig) -> Wrapped<Self> {
pub fn new(
mut base: BaseBehaviour,
config: &WorkerConfig,
query_streams: libp2p_stream::Behaviour,
) -> Wrapped<Self> {
base.set_server_mode();
Self {
inner: InnerBehaviour {
base: base.into(),
query: ServerBehaviour::new(
ProtoCodec::new(MAX_QUERY_MSG_SIZE, MAX_QUERY_RESULT_SIZE),
QUERY_PROTOCOL,
config.query_execution_timeout,
)
.into(),
sql_query: ServerBehaviour::new(
ProtoCodec::new(MAX_QUERY_MSG_SIZE, MAX_QUERY_RESULT_SIZE),
SQL_QUERY_PROTOCOL,
config.query_execution_timeout,
)
.into(),
query_streams,
logs: ServerBehaviour::new(
ProtoCodec::new(MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE),
WORKER_LOGS_PROTOCOL,
Expand All @@ -138,68 +108,6 @@ impl WorkerBehaviour {
None
}

fn on_query(
&mut self,
peer_id: PeerId,
query: Query,
resp_chan: ResponseChannel<QueryResult>,
) -> Option<WorkerEvent> {
// Drop empty messages
if query == Query::default() {
None
} else {
Some(WorkerEvent::Query {
peer_id,
query,
resp_chan,
})
}
}

fn on_sql_query(
&mut self,
peer_id: PeerId,
query: Query,
resp_chan: ResponseChannel<QueryResult>,
) -> Option<WorkerEvent> {
// Drop empty messages
if query == Query::default() {
None
} else {
Some(WorkerEvent::SqlQuery {
peer_id,
query,
resp_chan,
})
}
}

pub fn send_query_result(
&mut self,
result: QueryResult,
resp_chan: ResponseChannel<QueryResult>,
) {
log::debug!("Sending query result {result:?}");

self.inner
.query
.try_send_response(resp_chan, result)
.unwrap_or_else(|e| log::error!("Cannot send result for query {}", e.query_id));
}

pub fn send_sql_query_result(
&mut self,
result: QueryResult,
resp_chan: ResponseChannel<QueryResult>,
) {
log::debug!("Sending sql query result {result:?}");

self.inner
.sql_query
.try_send_response(resp_chan, result)
.unwrap_or_else(|e| log::error!("Cannot send sql result for query {}", e.query_id));
}

fn on_logs_request(
&mut self,
_peer_id: PeerId,
Expand Down Expand Up @@ -252,16 +160,7 @@ impl BehaviourWrapper for WorkerBehaviour {
) -> impl IntoIterator<Item = TToSwarm<Self>> {
let ev = match ev {
InnerBehaviourEvent::Base(ev) => self.on_base_event(ev),
InnerBehaviourEvent::Query(Request {
peer_id,
request,
response_channel,
}) => self.on_query(peer_id, request, response_channel),
InnerBehaviourEvent::SqlQuery(Request {
peer_id,
request,
response_channel,
}) => self.on_sql_query(peer_id, request, response_channel),
InnerBehaviourEvent::QueryStreams(()) => None,
InnerBehaviourEvent::Logs(Request {
peer_id,
request,
Expand All @@ -279,8 +178,6 @@ impl BehaviourWrapper for WorkerBehaviour {

struct WorkerTransport {
swarm: Swarm<Wrapped<WorkerBehaviour>>,
query_results_rx: Receiver<(QueryResult, ResponseChannel<QueryResult>)>,
sql_query_results_rx: Receiver<(QueryResult, ResponseChannel<QueryResult>)>,
logs_rx: Receiver<(QueryLogs, ResponseChannel<QueryLogs>)>,
status_rx: Receiver<(WorkerStatus, ResponseChannel<WorkerStatus>)>,
events_tx: Sender<WorkerEvent>,
Expand All @@ -293,8 +190,6 @@ impl WorkerTransport {
tokio::select! {
_ = cancel_token.cancelled() => break,
ev = self.swarm.select_next_some() => self.on_swarm_event(ev),
Some((res, resp_chan)) = self.query_results_rx.recv() => self.swarm.behaviour_mut().send_query_result(res, resp_chan),
Some((res, resp_chan)) = self.sql_query_results_rx.recv() => self.swarm.behaviour_mut().send_sql_query_result(res, resp_chan),
Some((logs, resp_chan)) = self.logs_rx.recv() => self.swarm.behaviour_mut().send_logs(logs, resp_chan),
Some((status, resp_chan)) = self.status_rx.recv() => self.swarm.behaviour_mut().send_status(status, resp_chan),
}
Expand All @@ -313,17 +208,13 @@ impl WorkerTransport {

#[derive(Clone)]
pub struct WorkerTransportHandle {
query_results_tx: Sender<(QueryResult, ResponseChannel<QueryResult>)>,
sql_query_results_tx: Sender<(QueryResult, ResponseChannel<QueryResult>)>,
logs_tx: Sender<(QueryLogs, ResponseChannel<QueryLogs>)>,
status_tx: Sender<(WorkerStatus, ResponseChannel<WorkerStatus>)>,
_task_manager: Arc<TaskManager>, // This ensures that transport is stopped when the last handle is dropped
}

impl WorkerTransportHandle {
fn new(
query_results_tx: Sender<(QueryResult, ResponseChannel<QueryResult>)>,
sql_query_results_tx: Sender<(QueryResult, ResponseChannel<QueryResult>)>,
logs_tx: Sender<(QueryLogs, ResponseChannel<QueryLogs>)>,
status_tx: Sender<(WorkerStatus, ResponseChannel<WorkerStatus>)>,
transport: WorkerTransport,
Expand All @@ -332,32 +223,12 @@ impl WorkerTransportHandle {
let mut task_manager = TaskManager::new(shutdown_timeout);
task_manager.spawn(|c| transport.run(c));
Self {
query_results_tx,
sql_query_results_tx,
logs_tx,
status_tx,
_task_manager: Arc::new(task_manager),
}
}

pub fn send_query_result(
&self,
result: QueryResult,
resp_chan: ResponseChannel<QueryResult>,
) -> Result<(), QueueFull> {
log::debug!("Queueing query result {result:?}");
self.query_results_tx.try_send((result, resp_chan))
}

pub fn send_sql_query_result(
&self,
result: QueryResult,
resp_chan: ResponseChannel<QueryResult>,
) -> Result<(), QueueFull> {
log::debug!("Queueing sql query result {result:?}");
self.sql_query_results_tx.try_send((result, resp_chan))
}

pub fn send_logs(
&self,
logs: QueryLogs,
Expand All @@ -381,24 +252,16 @@ pub fn start_transport(
swarm: Swarm<Wrapped<WorkerBehaviour>>,
config: &WorkerConfig,
) -> (impl Stream<Item = WorkerEvent>, WorkerTransportHandle) {
let (query_results_tx, query_results_rx) =
new_queue(config.query_results_queue_size, "query_results");
let (sql_query_results_tx, sql_query_results_rx) =
new_queue(config.sql_query_results_queue_size, "sql_query_results");
let (logs_tx, logs_rx) = new_queue(config.logs_queue_size, "logs");
let (status_tx, status_rx) = new_queue(config.status_queue_size, "status");
let (events_tx, events_rx) = new_queue(config.events_queue_size, "events");
let transport = WorkerTransport {
swarm,
query_results_rx,
sql_query_results_rx,
logs_rx,
status_rx,
events_tx,
};
let handle = WorkerTransportHandle::new(
query_results_tx,
sql_query_results_tx,
logs_tx,
status_tx,
transport,
Expand Down
30 changes: 27 additions & 3 deletions crates/transport/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,17 @@ impl P2PTransportBuilder {
pub async fn build_worker(
self,
config: WorkerConfig,
) -> Result<(impl Stream<Item = WorkerEvent>, WorkerTransportHandle), Error> {
) -> Result<
(
impl Stream<Item = WorkerEvent>,
WorkerTransportHandle,
libp2p_stream::IncomingStreams,
libp2p_stream::IncomingStreams,
),
Comment on lines +218 to +224
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_worker now returns a 4-tuple with two IncomingStreams values of the same type. This is easy to accidentally swap at call sites (query vs SQL query), and the compiler can’t help. Consider returning a small struct (or a tuple of distinct wrapper types) with named fields to make the API self-documenting and harder to misuse.

Copilot uses AI. Check for mistakes.
Error,
> {
use crate::protocol::{QUERY_PROTOCOL, SQL_QUERY_PROTOCOL};

let local_peer_id = self.local_peer_id();
// Wait for the worker to be registered on chain
loop {
Expand All @@ -233,8 +243,22 @@ impl P2PTransportBuilder {
}
break;
}
let swarm = self.build_swarm(|base| WorkerBehaviour::new(base, &config))?;
Ok(worker::start_transport(swarm, &config))

// Create stream behaviour for query protocols.
// IncomingStreams are returned to the caller for direct handling.
let query_stream_behaviour = libp2p_stream::Behaviour::new();
let mut control = query_stream_behaviour.new_control();
let query_streams = control
.accept(StreamProtocol::new(QUERY_PROTOCOL))
.expect("Query protocol should not already be registered");
let sql_query_streams = control
.accept(StreamProtocol::new(SQL_QUERY_PROTOCOL))
.expect("SQL query protocol should not already be registered");
Comment on lines +247 to +256
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query_stream_behaviour variable actually registers both QUERY_PROTOCOL and SQL_QUERY_PROTOCOL. Renaming it (and the associated query_streams behaviour field in the worker) to something protocol-agnostic like stream_behaviour/query_protocols would reduce confusion when adding more protocols later.

Copilot uses AI. Check for mistakes.

let swarm =
self.build_swarm(|base| WorkerBehaviour::new(base, &config, query_stream_behaviour))?;
let (events, handle) = worker::start_transport(swarm, &config);
Ok((events, handle, query_streams, sql_query_streams))
}

#[cfg(feature = "sql-client")]
Expand Down
4 changes: 3 additions & 1 deletion crates/transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::sync::mpsc;
pub use libp2p::{
identity::{Keypair, ParseError as IdParseError, PublicKey},
request_response::ResponseChannel,
Multiaddr, PeerId,
Multiaddr, PeerId, Stream,
};

#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -70,6 +70,8 @@ pub use crate::actors::portal_logs_collector::{
pub use crate::actors::worker::{
WorkerBehaviour, WorkerConfig, WorkerEvent, WorkerTransportHandle,
};
#[cfg(feature = "worker")]
pub use libp2p_stream::IncomingStreams;
#[cfg(feature = "sql-client")]
pub use crate::actors::sql_client::{
SQLClientBehaviour, SQLClientConfig, SQLClientTransport, SQLQueryFailure,
Expand Down