Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ time = { version = "0.3" }
tiny-keccak = { version = "2.0.2" }
tokio = { version = "1.45.0", default-features = false }
tokio-retry = { version = "0.3.0" }
tokio-stream = { version = "0.1.14" }
tokio-stream = { version = "0.1.17" }
tokio-test = { version = "0.4.4" }
tokio-tungstenite = { version = "0.26.2" }
tokio-util = { version = "0.7.8" }
Expand Down Expand Up @@ -1577,3 +1577,6 @@ wasmi = { opt-level = 3 }
x25519-dalek = { opt-level = 3 }
yamux = { opt-level = 3 }
zeroize = { opt-level = 3 }

[patch.crates-io]
litep2p = { git = "https://github.com/xDimon/litep2p.git", rev = "e44ab69084ff04f15a34b73866dbeeeac9ff06ec" }
7 changes: 7 additions & 0 deletions polkadot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ jemalloc-allocator = [
"polkadot-node-core-pvf/jemalloc-allocator",
"polkadot-overseer/jemalloc-allocator",
]
x-shadow = [
"polkadot-overseer/x-shadow",
"polkadot-node-core-pvf-prepare-worker/x-shadow",
"polkadot-node-core-pvf-execute-worker/x-shadow",
"polkadot-node-core-pvf/x-shadow",
"polkadot-cli/x-shadow"
]

# Generate the metadata hash needed for CheckMetadataHash
# in the builtin test runtimes (westend and rococo).
Expand Down
2 changes: 2 additions & 0 deletions polkadot/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,5 @@ runtime-metrics = [
"polkadot-node-metrics/runtime-metrics",
"polkadot-service/runtime-metrics",
]

x-shadow = ["sc-service/x-shadow","polkadot-service/x-shadow"]
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ sc-sysinfo = { workspace = true, default-features = true }
[features]
ci-only-tests = []
jemalloc-allocator = ["polkadot-node-core-pvf-common/jemalloc-allocator"]
x-shadow = ["polkadot-node-core-pvf-common/x-shadow"]
# This feature is used to export test code to other crates without putting it in the production build.
test-utils = [
"dep:is_executable",
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ libc = { workspace = true }
nix = { features = ["resource", "sched"], workspace = true }
thiserror = { workspace = true }

tokio = { version = "1", features = ["net", "io-util", "process", "time"] }

codec = { features = ["derive"], workspace = true }

polkadot-node-primitives = { workspace = true, default-features = true }
Expand Down Expand Up @@ -51,3 +53,4 @@ tempfile = { workspace = true }
# This feature is used to export test code to other crates without putting it in the production build.
test-utils = []
jemalloc-allocator = []
x-shadow = []
14 changes: 14 additions & 0 deletions polkadot/node/core/pvf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,21 @@ pub struct WorkerHandshake {
pub fn framed_send_blocking(w: &mut (impl Write + Unpin), buf: &[u8]) -> io::Result<()> {
let len_buf = buf.len().to_le_bytes();
w.write_all(&len_buf)?;

#[cfg(not(feature = "x-shadow"))]
w.write_all(buf)?;

#[cfg(feature = "x-shadow")]
{
// Under Shadow simulation, writes are performed in chunks because
// sending large blocks at once can, in some cases, cause a deadlock
// between the sender and the receiver.
const CHUNK_SIZE: usize = 1 << 15;
for chunk in buf.chunks(CHUNK_SIZE) {
w.write_all(chunk)?;
}
}

Ok(())
}

Expand Down
7 changes: 5 additions & 2 deletions polkadot/node/core/pvf/common/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ pub struct PrepareStats {
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryStats {
/// Memory stats from `tikv_jemalloc_ctl`, polling-based and not very precise.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
#[cfg(any(
all(target_os = "linux", not(feature = "x-shadow")),
feature = "jemalloc-allocator"
))]
pub memory_tracker_stats: Option<MemoryAllocationStats>,
/// `ru_maxrss` from `getrusage`. `None` if an error occurred.
#[cfg(target_os = "linux")]
Expand All @@ -66,7 +69,7 @@ pub struct MemoryStats {
}

/// Statistics of collected memory metrics.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
#[cfg(any(all(target_os = "linux", not(feature = "x-shadow")), feature = "jemalloc-allocator"))]
#[derive(Clone, Debug, Default, Encode, Decode)]
pub struct MemoryAllocationStats {
/// Total resident memory, in bytes.
Expand Down
87 changes: 68 additions & 19 deletions polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,36 @@ use std::{
fmt::{self},
fs::File,
io::{self, Read, Write},
os::{
fd::{AsRawFd, FromRawFd, RawFd},
unix::net::UnixStream,
},
os::fd::{AsRawFd, FromRawFd, RawFd},
path::PathBuf,
sync::mpsc::{Receiver, RecvTimeoutError},
time::Duration,
};

// ===== Unified endpoint & transport aliases =====
// Address used to bind/connect host<->worker
#[cfg(not(feature = "x-shadow"))]
pub type Endpoint = std::path::PathBuf;
#[cfg(feature = "x-shadow")]
pub type Endpoint = std::net::SocketAddr;

// Async transport (host side, tokio)
#[cfg(feature = "x-shadow")]
pub use tokio::net::TcpStream as HostStream;
#[cfg(not(feature = "x-shadow"))]
pub use tokio::net::UnixStream as HostStream;

#[cfg(feature = "x-shadow")]
pub use tokio::net::TcpListener as HostListener;
#[cfg(not(feature = "x-shadow"))]
pub use tokio::net::UnixListener as HostListener;

// Blocking transport (worker side, std)
#[cfg(not(feature = "x-shadow"))]
pub type WorkerStream = std::os::unix::net::UnixStream;
#[cfg(feature = "x-shadow")]
pub type WorkerStream = std::net::TcpStream;

/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for
/// spawning the desired worker.
#[macro_export]
Expand Down Expand Up @@ -157,15 +178,23 @@ macro_rules! decl_worker_main {
},
}

let mut socket_path = None;
let mut endpoint = None;
let mut worker_dir_path = None;
let mut node_version = None;

let mut i = 2;
while i < args.len() {
match args[i].as_ref() {
// UDS (default build)
#[cfg(not(feature = "x-shadow"))]
"--socket-path" => {
socket_path = Some(args[i + 1].as_str());
endpoint = Some(args[i + 1].as_str());
i += 1
},
// TCP (x-shadow build)
#[cfg(feature = "x-shadow")]
"--socket-addr" => {
endpoint = Some(args[i + 1].as_str());
i += 1
},
"--worker-dir-path" => {
Expand All @@ -180,14 +209,23 @@ macro_rules! decl_worker_main {
}
i += 1;
}
let socket_path = socket_path.expect("the --socket-path argument is required");
// Require the appropriate argument depending on build feature.
#[cfg(not(feature = "x-shadow"))]
let endpoint = endpoint.expect("the --socket-path argument is required");
#[cfg(feature = "x-shadow")]
let endpoint = endpoint.expect("the --socket-addr argument is required");
let worker_dir_path =
worker_dir_path.expect("the --worker-dir-path argument is required");

let socket_path = std::path::Path::new(socket_path).to_owned();
// Build endpoint according to transport
#[cfg(not(feature = "x-shadow"))]
let endpoint = std::path::Path::new(endpoint).to_owned();
#[cfg(feature = "x-shadow")]
let endpoint: std::net::SocketAddr =
endpoint.parse().expect("invalid --socket-addr, expected IP:PORT");
let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned();

$entrypoint(socket_path, worker_dir_path, node_version, Some($worker_version));
$entrypoint(endpoint, worker_dir_path, node_version, Some($worker_version));
}
};
}
Expand Down Expand Up @@ -309,13 +347,13 @@ pub struct WorkerInfo {
/// to securely handle each incoming request.
pub fn run_worker<F>(
worker_kind: WorkerKind,
socket_path: PathBuf,
endpoint: Endpoint,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(UnixStream, &WorkerInfo, SecurityStatus) -> io::Result<Never>,
F: FnMut(WorkerStream, &WorkerInfo, SecurityStatus) -> io::Result<Never>,
{
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
let mut worker_info = WorkerInfo {
Expand All @@ -327,7 +365,7 @@ pub fn run_worker<F>(
gum::debug!(
target: LOG_TARGET,
?worker_info,
?socket_path,
?endpoint,
"starting pvf worker ({})",
worker_info.kind
);
Expand Down Expand Up @@ -358,12 +396,17 @@ pub fn run_worker<F>(
},
}

// Connect to the socket.
let stream = || -> io::Result<UnixStream> {
let stream = UnixStream::connect(&socket_path)?;
let _ = std::fs::remove_file(&socket_path);
// Connect to the host, transport-specific.
// UDS: connect to path and remove the socket file afterwards (best-effort).
// TCP: connect to addr.
#[cfg(not(feature = "x-shadow"))]
let stream = || -> io::Result<WorkerStream> {
let stream = WorkerStream::connect(&endpoint)?;
let _ = std::fs::remove_file(&endpoint);
Ok(stream)
}();
#[cfg(feature = "x-shadow")]
let stream = || -> io::Result<WorkerStream> { WorkerStream::connect(endpoint) }();
let mut stream = match stream {
Ok(ok) => ok,
Err(err) => worker_shutdown_error(worker_info, &err.to_string()),
Expand Down Expand Up @@ -538,7 +581,7 @@ fn kill_parent_node_in_emergency() {
}

/// Receives a handshake with information for the worker.
fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake> {
fn recv_worker_handshake(stream: &mut WorkerStream) -> io::Result<WorkerHandshake> {
let worker_handshake = framed_recv_blocking(stream)?;
let worker_handshake = WorkerHandshake::decode(&mut &worker_handshake[..]).map_err(|e| {
io::Error::new(
Expand Down Expand Up @@ -585,7 +628,7 @@ where
}

pub fn send_result<T, E>(
stream: &mut UnixStream,
stream: &mut WorkerStream,
result: Result<T, E>,
worker_info: &WorkerInfo,
) -> io::Result<()>
Expand Down Expand Up @@ -736,7 +779,13 @@ pub mod thread {

/// Block the thread while it waits on the condvar or on a timeout. If the timeout is hit,
/// returns `None`.
#[cfg_attr(not(any(target_os = "linux", feature = "jemalloc-allocator")), allow(dead_code))]
#[cfg_attr(
not(any(
all(target_os = "linux", not(feature = "x-shadow")),
feature = "jemalloc-allocator"
)),
allow(dead_code)
)]
pub fn wait_for_threads_with_timeout(cond: &Cond, dur: Duration) -> Option<WaitOutcome> {
let (lock, cvar) = &**cond;
let result = cvar
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/execute-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ sp-maybe-compressed-blob = { workspace = true, default-features = true }

[features]
builder = []
x-shadow = ["polkadot-node-core-pvf-common/x-shadow"]
Loading