Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion experiments/zenoh/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "orb-zenoh-rpc"
name = "orb-zenoh"
version = "0.0.0"
description = "Experiments with the zenoh messaging system"
publish = false
Expand Down
158 changes: 158 additions & 0 deletions experiments/zenoh/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ enum Args {
#[clap(long)]
use_contiguous: bool,
},
Sub {
#[clap(long)]
key: String,
#[clap(long)]
connect: Option<String>,
},
Query {
#[clap(long)]
key: String,
#[clap(long)]
connect: Option<String>,
},
Pub {
#[clap(long)]
key: String,
#[clap(long)]
value: String,
#[clap(long)]
connect: Option<String>,
},
}

#[tokio::main]
Expand All @@ -25,11 +45,149 @@ async fn main() -> color_eyre::Result<()> {
let result = match args {
Args::Alice { .. } => alice(args).await,
Args::Bob { .. } => bob(args).await,
Args::Sub { .. } => sub(args).await,
Args::Query { .. } => query(args).await,
Args::Pub { .. } => publish(args).await,
};
telemetry.flush().await;
result
}

async fn sub(args: Args) -> color_eyre::Result<()> {
let Args::Sub {
key: zenoh_key,
connect,
} = args
else {
unreachable!()
};

let connect_config = connect
.map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr))
.unwrap_or_default();

let cfg = zenoh::Config::from_json5(&format!(
r#"{{
mode: "peer",
{connect_config}
}}"#
))
.unwrap_or_else(|e| panic!("failed to parse config: {}", e));

let session = zenoh::open(cfg)
.await
.unwrap_or_else(|e| panic!("failed to open session: {}", e));
let sub = session
.declare_subscriber(&zenoh_key)
.await
.unwrap_or_else(|e| panic!("failed to declare subscriber: {}", e));

tracing::info!("Subscribed to {zenoh_key}");
while let Ok(sample) = sub.recv_async().await {
if let Ok(payload_str) = sample.payload().try_to_string() {
tracing::info!("recv key={} payload={:?}", sample.key_expr(), payload_str);
} else {
tracing::info!(
"recv key={} payload={:?}",
sample.key_expr(),
sample.payload()
);
}
}

Ok(())
}

async fn query(args: Args) -> color_eyre::Result<()> {
let Args::Query {
key: zenoh_key,
connect,
} = args
else {
unreachable!()
};

let connect_config = connect
.map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr))
.unwrap_or_default();

let cfg = zenoh::Config::from_json5(&format!(
r#"{{
mode: "peer",
{connect_config}
}}"#
))
.unwrap_or_else(|e| panic!("failed to parse config: {}", e));

let session = zenoh::open(cfg)
.await
.unwrap_or_else(|e| panic!("failed to open session: {}", e));

tracing::info!("Querying key: {zenoh_key}");
let replies = session
.get(&zenoh_key)
.await
.unwrap_or_else(|e| panic!("failed to query: {}", e));

while let Ok(reply) = replies.recv_async().await {
match reply.result() {
Ok(sample) => {
if let Ok(payload_str) = sample.payload().try_to_string() {
println!("key={} payload={}", sample.key_expr(), payload_str);
} else {
println!(
"key={} payload={:?}",
sample.key_expr(),
sample.payload()
);
}
}
Err(err) => {
tracing::warn!("Query error for key {}: {:?}", zenoh_key, err);
}
}
}

Ok(())
}

async fn publish(args: Args) -> color_eyre::Result<()> {
let Args::Pub {
key: zenoh_key,
value,
connect,
} = args
else {
unreachable!()
};

let connect_config = connect
.map(|addr| format!(r#"connect: {{ endpoints: ["tcp/{}"] }},"#, addr))
.unwrap_or_default();

let cfg = zenoh::Config::from_json5(&format!(
r#"{{
mode: "peer",
{connect_config}
}}"#
))
.unwrap_or_else(|e| panic!("failed to parse config: {}", e));

let session = zenoh::open(cfg)
.await
.unwrap_or_else(|e| panic!("failed to open session: {}", e));

tracing::info!("Publishing to key: {zenoh_key}");
session
.put(&zenoh_key, value.as_bytes())
.await
.unwrap_or_else(|e| panic!("failed to put: {}", e));

tracing::info!("Published value: {value}");

Ok(())
}

async fn alice(args: Args) -> Result<()> {
let Args::Alice { payload_size } = args else {
unreachable!()
Expand Down
Loading