Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ url = "2.5.2"
walkdir = "2.5.0"
zstd = "0.13"

sqd-assignments = { git = "https://github.com/subsquid/sqd-network.git", rev = "020e5e9", features = ["reader"] }
sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", rev = "020e5e9", version = "1.2.1" }
sqd-messages = { git = "https://github.com/subsquid/sqd-network.git", rev = "020e5e9", version = "2.0.2", features = ["bitstring"] }
sqd-network-transport = { git = "https://github.com/subsquid/sqd-network.git", rev = "020e5e9", version = "3.0.0", features = ["worker", "metrics"] }
sqd-assignments = { git = "https://github.com/subsquid/sqd-network.git", rev = "7ac249a", features = ["reader"] }
sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", rev = "7ac249a", version = "1.2.1" }
sqd-messages = { git = "https://github.com/subsquid/sqd-network.git", rev = "7ac249a", version = "2.0.2", features = ["bitstring"] }
sqd-network-transport = { git = "https://github.com/subsquid/sqd-network.git", rev = "7ac249a", version = "3.0.0", features = ["worker", "metrics"] }

sqd-query = { git = "https://github.com/subsquid/data.git", rev = "3c76f02", features = ["parquet"] }
sqd-polars = { git = "https://github.com/subsquid/data.git", rev = "3c76f02" }
Expand Down
40 changes: 30 additions & 10 deletions src/controller/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::{FutureExt, Stream, StreamExt};
use parking_lot::RwLock;
use sqd_messages::{
query_error, query_executed, BitString, LogsRequest, ProstMsg, Query, QueryExecuted, QueryLogs,
WorkerStatus,
TimeReport, WorkerStatus,
};
use sqd_network_transport::{
protocol, Keypair, P2PTransportBuilder, PeerId, QueueFull, ResponseChannel, WorkerConfig,
Expand Down Expand Up @@ -403,15 +403,15 @@ impl<EventStream: Stream<Item = WorkerEvent> + Send + 'static> P2PController<Eve
let query_id = query.query_id.clone();
let compression = query.compression();

let (result, retry_after) = self.process_query(peer_id, &query, query_type).await;
let (mut result, retry_after) = self.process_query(peer_id, &query, query_type).await;
if let Err(e) = &result {
warn!("Query {query_id} by {peer_id} execution failed: {e:?}");
}

metrics::query_executed(&result);

// Cloning is much cheaper than hash computation and we need to keep the result for logging
if let Err(e) = self
match self
.send_query_result(
query_id,
result.clone(),
Expand All @@ -421,7 +421,13 @@ impl<EventStream: Stream<Item = WorkerEvent> + Send + 'static> P2PController<Eve
)
.await
{
tracing::error!("Couldn't send query result: {e:?}");
Ok((compression_duration, signing_duration)) => {
let _ = result.as_mut().map(|v| {
v.time_report.compression_time = compression_duration;
v.time_report.signing_time = signing_duration;
});
}
Err(e) => tracing::error!("Couldn't send query result: {e:?}"),
}

if let Some(log) = self.generate_log(&result, query, peer_id).await {
Expand Down Expand Up @@ -496,7 +502,8 @@ impl<EventStream: Stream<Item = WorkerEvent> + Send + 'static> P2PController<Eve
resp_chan: ResponseChannel<sqd_messages::QueryResult>,
retry_after: Option<Duration>,
compression: sqd_messages::Compression,
) -> Result<()> {
) -> Result<(Duration, Duration)> {
let compression_timer = std::time::Instant::now();
let query_result = match result {
Ok(result) => {
let data = match compression {
Expand All @@ -511,15 +518,18 @@ impl<EventStream: Stream<Item = WorkerEvent> + Send + 'static> P2PController<Eve
}
Err(e) => query_error::Err::from(e).into(),
};
let compression_duration = compression_timer.elapsed();
let mut msg = sqd_messages::QueryResult {
query_id,
result: Some(query_result),
retry_after_ms: retry_after.map(|duration| duration.as_millis() as u32),
signature: Default::default(),
};
let signing_timer = std::time::Instant::now();
let _span = tracing::debug_span!("sign_query_result");
tokio::task::block_in_place(|| msg.sign(&self.keypair).map_err(|e| anyhow!(e)))?;
drop(_span);
let signing_duration = signing_timer.elapsed();

let result_size = msg.encoded_len() as u64;
if result_size > protocol::MAX_QUERY_RESULT_SIZE {
Expand All @@ -532,7 +542,7 @@ impl<EventStream: Stream<Item = WorkerEvent> + Send + 'static> P2PController<Eve
.send_query_result(msg, resp_chan)
.map_err(|_| anyhow!("queue full"))?;

Ok(())
Ok((compression_duration, signing_duration))
}

#[instrument(skip_all)]
Expand All @@ -553,15 +563,25 @@ impl<EventStream: Stream<Item = WorkerEvent> + Send + 'static> P2PController<Eve
Err(QueryError::NoAllocation) => return None,
Err(e) => query_error::Err::from(e).into(),
};
let exec_time = match query_result {
Ok(result) => result.exec_time.as_micros() as u32,
Err(_) => 0, // TODO: always measure execution time

let exec_time_report = match query_result {
Ok(result) => Some(TimeReport {
parsing_time_micros: result.time_report.parsing_time.as_micros() as u32,
execution_time_micros: result.time_report.execution_time.as_micros() as u32,
compression_time_micros: result.time_report.compression_time.as_micros() as u32,
signing_time_micros: result.time_report.signing_time.as_micros() as u32,
serialization_time_micros: result.time_report.serialization_time.as_micros() as u32,
}),
Err(_) => None, // TODO: always measure execution time
};

Some(QueryExecuted {
client_id: client_id.to_string(),
query: Some(query),
exec_time_micros: exec_time,
exec_time_micros: exec_time_report
.as_ref()
.map_or(0, |report| report.execution_time_micros),
exec_time_report,
timestamp_ms: timestamp_now_ms(), // TODO: use time of receiving query
result: Some(result),
worker_version: WORKER_VERSION.to_string(),
Expand Down
35 changes: 28 additions & 7 deletions src/controller/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ impl Worker {
.map(|id| id.to_string())
.unwrap_or("{unknown}".to_string())
);
// self.execute_query(query_str, dataset, block_range, chunk_id)
// .await
match query_type {
QueryType::PlainQuery => {
self.execute_query(query_str, dataset, block_range, chunk_id)
Expand Down Expand Up @@ -136,13 +134,16 @@ impl Worker {
let (tx, rx) = tokio::sync::oneshot::channel();
sqd_polars::POOL.spawn(move || {
let result = (move || {
let start_time = std::time::Instant::now();

let data_chunk = ParquetChunk::new(chunk_guard.as_str());
let parse_timer = std::time::Instant::now();
let plan = query.compile();
let parse_duration = parse_timer.elapsed();
let exec_timer = std::time::Instant::now();
let data = Vec::with_capacity(1024 * 1024);
let mut writer = sqd_query::JsonLinesWriter::new(data);
let blocks = plan.execute(&data_chunk)?;
let exec_duration = exec_timer.elapsed();
let serialization_timer = std::time::Instant::now();
let last_block = if let Some(mut blocks) = blocks {
writer.write_blocks(&mut blocks)?;
blocks.last_block()
Expand All @@ -154,12 +155,20 @@ impl Worker {
}
};
let bytes = writer.finish()?;
let serialization_duration = serialization_timer.elapsed();

if bytes.len() > RESPONSE_LIMIT {
return Err(QueryError::from(anyhow::anyhow!("Response too large")));
}

Ok(QueryOk::new(bytes, 1, last_block, start_time.elapsed()))
Ok(QueryOk::new(
bytes,
1,
last_block,
parse_duration,
exec_duration,
serialization_duration,
))
})();
tx.send(result).unwrap_or_else(|_| {
tracing::warn!("Query runner didn't wait for the result");
Expand Down Expand Up @@ -207,15 +216,17 @@ impl Worker {
let local_chunk_id = chunk_id.to_owned().clone();
sqd_polars::POOL.spawn(move || {
let result = (move || {
let start_time = std::time::Instant::now();
let data_source = WorkerChunkStore {
path: chunk_guard.as_str().to_owned(),
};
let parse_timer = std::time::Instant::now();
let (context, target) = plan::transform_plan::<polars_target::PolarsTarget>(&plan)
.map_err(|err| anyhow::anyhow!("Transform error: {:?}", err))?;
let lf = target
.compile(&context, &dataset, &[local_chunk_id], &data_source)
.map_err(|err| anyhow::anyhow!("Compile error: {:?}", err))?;
let parse_duration = parse_timer.elapsed();
let exec_timer = std::time::Instant::now();
let mut df = match lf {
Some(lf) => {
tracing::debug!("LF Plan: {:?}", lf.describe_plan());
Expand All @@ -226,6 +237,8 @@ impl Worker {
return Err(QueryError::from(anyhow::anyhow!("Planning error: No data")))
}
};
let exec_duration = exec_timer.elapsed();
let serialization_timer = std::time::Instant::now();
let mut buf = std::io::BufWriter::new(Vec::new());
JsonWriter::new(&mut buf)
.with_json_format(JsonFormat::JsonLines)
Expand All @@ -234,12 +247,20 @@ impl Worker {
let bytes = buf
.into_inner()
.map_err(|err| anyhow::anyhow!("Serialization error: {:?}", err))?;
let serialization_duration = serialization_timer.elapsed();

if bytes.len() > RESPONSE_LIMIT {
return Err(QueryError::from(anyhow::anyhow!("Response too large")));
}

Ok(QueryOk::new(bytes, 1, 0, start_time.elapsed()))
Ok(QueryOk::new(
bytes,
1,
0,
parse_duration,
exec_duration,
serialization_duration,
))
})();
tx.send(result).unwrap_or_else(|_| {
tracing::warn!("Query runner didn't wait for the result");
Expand Down
23 changes: 20 additions & 3 deletions src/query/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,42 @@ use crate::util::hash::sha3_256;

pub type QueryResult = std::result::Result<QueryOk, QueryError>;

#[derive(Debug, Clone, Default)]
pub struct WorkerTimeReport {
pub parsing_time: Duration,
pub execution_time: Duration,
pub serialization_time: Duration,
pub compression_time: Duration,
pub signing_time: Duration,
}

#[derive(Debug, Clone)]
pub struct QueryOk {
pub data: Vec<u8>,
pub num_read_chunks: usize,
pub last_block: u64,
pub exec_time: Duration,
pub time_report: WorkerTimeReport,
}

impl QueryOk {
pub fn new(
data: Vec<u8>,
num_read_chunks: usize,
last_block: u64,
exec_time: Duration,
parsing_time: Duration,
execution_time: Duration,
serialization_time: Duration,
) -> Self {
Self {
data,
num_read_chunks,
exec_time,
time_report: WorkerTimeReport {
parsing_time,
execution_time,
serialization_time,
compression_time: Duration::from_secs(0),
signing_time: Duration::from_secs(0),
},
last_block,
}
}
Expand Down
Loading