Skip to content

Commit 7a7ae58

Browse files
Merge pull request #320 from rabbitmq/rust_dependecies
Update rust dependencies
2 parents 0d5d874 + 9b51b98 commit 7a7ae58

14 files changed

+901
-542
lines changed

rust/Cargo.lock

Lines changed: 757 additions & 422 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ authors = ["Michal Malek <[email protected]>"]
55
edition = "2018"
66

77
[dependencies]
8-
futures = "0.3.7"
9-
lapin = "1.5"
10-
tokio = { version = "0.3.3", features = ["macros", "rt-multi-thread", "stream"] }
11-
uuid = { version = "0.8.1", features = ["v4"] }
8+
futures = "0.3.21"
9+
lapin = "2.1.1"
10+
tokio = { version = "1.19.2", features = ["full"] }
11+
uuid = { version = "1.1.1", features = ["v4"] }
12+
tokio-stream = "0.1"

rust/src/bin/emit_log.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use lapin::{
66
async fn main() -> Result<(), Box<dyn std::error::Error>> {
77
let args: Vec<_> = std::env::args().skip(1).collect();
88
let message = match args.len() {
9-
0 => b"hello".to_vec(),
10-
_ => args.join(" ").into_bytes(),
9+
0 => "hello".to_string(),
10+
_ => args.join(" ").to_string(),
1111
};
1212

1313
let addr = "amqp://127.0.0.1:5672";
@@ -28,12 +28,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2828
"logs",
2929
"",
3030
BasicPublishOptions::default(),
31-
message.clone(),
31+
message.as_bytes(),
3232
BasicProperties::default(),
3333
)
3434
.await?;
3535

36-
println!(" [x] Sent {:?}", std::str::from_utf8(&message)?);
36+
println!(" [x] Sent {:?}", std::str::from_utf8(&message.as_bytes())?);
3737

3838
conn.close(0, "").await?;
3939

rust/src/bin/emit_log_direct.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
77
let args: Vec<_> = std::env::args().skip(1).collect();
88
let severity = args.first().map(String::as_str).unwrap_or("info");
99
let message = match args.len() {
10-
x if x < 2 => b"Hello, world!".to_vec(),
11-
_ => args[1..].join(" ").into_bytes(),
10+
x if x < 2 => "Hello, world!".to_string(),
11+
_ => args[1..].join(" ").to_string(),
1212
};
1313

1414
let addr = "amqp://127.0.0.1:5672";
@@ -29,15 +29,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2929
"direct_logs",
3030
severity,
3131
BasicPublishOptions::default(),
32-
message.clone(),
32+
message.as_bytes(),
3333
BasicProperties::default(),
3434
)
3535
.await?;
3636

3737
println!(
3838
" [x] Sent {}:{:?}",
3939
severity,
40-
std::str::from_utf8(&message)?
40+
std::str::from_utf8(message.as_bytes())?
4141
);
4242

4343
conn.close(0, "").await?;

rust/src/bin/emit_log_topic.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
77
let args: Vec<_> = std::env::args().skip(1).collect();
88
let routing_key = args.first().map(String::as_str).unwrap_or("anonymous.info");
99
let message = match args.len() {
10-
x if x < 2 => b"Hello, world!".to_vec(),
11-
_ => args[1..].join(" ").into_bytes(),
10+
x if x < 2 => "Hello, world!".to_string(),
11+
_ => args[1..].join(" ").to_string(),
1212
};
1313

1414
let addr = "amqp://127.0.0.1:5672";
@@ -29,15 +29,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2929
"topic_logs",
3030
routing_key,
3131
BasicPublishOptions::default(),
32-
message.clone(),
32+
message.as_bytes(),
3333
BasicProperties::default(),
3434
)
3535
.await?;
3636

3737
println!(
3838
" [x] Sent {}:{:?}",
3939
routing_key,
40-
std::str::from_utf8(&message)?
40+
std::str::from_utf8(message.as_bytes())?
4141
);
4242

4343
conn.close(0, "").await?;

rust/src/bin/new_task.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
1+
use lapin::{BasicProperties, Connection, ConnectionProperties, options::*, types::FieldTable};
22

33
#[tokio::main]
44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
55
let args: Vec<_> = std::env::args().skip(1).collect();
66
let message = match args.len() {
7-
0 => b"hello".to_vec(),
8-
_ => args.join(" ").into_bytes(),
7+
0 => "hello".to_string(),
8+
_ => args.join(" ").to_string(),
99
};
1010

1111
let addr = "amqp://127.0.0.1:5672";
@@ -25,12 +25,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2525
"",
2626
"task_queue",
2727
BasicPublishOptions::default(),
28-
message.clone(),
28+
message.as_bytes(),
2929
BasicProperties::default(),
3030
)
3131
.await?;
3232

33-
println!(" [x] Sent {:?}", std::str::from_utf8(&message)?);
33+
println!(" [x] Sent {:?}", std::str::from_utf8(message.as_bytes())?);
3434

3535
conn.close(0, "").await?;
3636

rust/src/bin/receive.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
1+
use futures::StreamExt;
2+
use lapin::{Connection, ConnectionProperties, options::*, types::FieldTable};
23

34
#[tokio::main]
45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -14,23 +15,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1415
)
1516
.await?;
1617

17-
let consumer = channel
18+
let mut consumer = channel
1819
.basic_consume(
1920
"hello",
2021
"consumer",
21-
BasicConsumeOptions {
22-
no_ack: true,
23-
..Default::default()
24-
},
22+
BasicConsumeOptions::default(),
2523
FieldTable::default(),
2624
)
27-
.await?;
25+
.await
26+
.expect("basic_consume");
2827

2928
println!(" [*] Waiting for messages. To exit press CTRL+C");
3029

31-
for delivery in consumer {
32-
let (_, delivery) = delivery?;
33-
println!(" [x] Received {:?}", std::str::from_utf8(&delivery.data)?);
30+
31+
while let Some(delivery) = consumer.next().await {
32+
if let Ok(delivery) = delivery {
33+
println!(" [x] Received {:?}", std::str::from_utf8(&delivery.data)?);
34+
delivery.ack(BasicAckOptions::default())
35+
.await
36+
.expect("basic_ack");
37+
}
3438
}
3539

3640
Ok(())

rust/src/bin/receive_logs.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use futures::StreamExt;
12
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
23

34
#[tokio::main]
@@ -36,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3637
)
3738
.await?;
3839

39-
let consumer = channel
40+
let mut consumer = channel
4041
.basic_consume(
4142
queue.name().as_str(),
4243
"consumer",
@@ -50,10 +51,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5051

5152
println!(" [*] Waiting for logs. To exit press CTRL+C");
5253

53-
for delivery in consumer {
54-
let (_, delivery) = delivery?;
55-
println!(" [x] {:?}", std::str::from_utf8(&delivery.data)?);
54+
55+
while let Some(delivery) = consumer.next().await {
56+
if let Ok(delivery) = delivery {
57+
println!(" [x] Received {:?}", std::str::from_utf8(&delivery.data)?);
58+
}
5659
}
5760

61+
5862
Ok(())
5963
}

rust/src/bin/receive_logs_direct.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
1+
use futures::StreamExt;
2+
use lapin::{Connection, ConnectionProperties, ExchangeKind, options::*, types::FieldTable};
23

34
#[tokio::main]
45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -35,18 +36,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3536
)
3637
.await?;
3738

38-
futures::future::join_all(severities.into_iter().map(|severity| {
39+
40+
futures::future::join_all(severities.iter().map(|severity| {
3941
channel.queue_bind(
4042
queue.name().as_str(),
4143
"direct_logs",
4244
&severity,
4345
QueueBindOptions::default(),
4446
FieldTable::default(),
4547
)
46-
}))
47-
.await;
48+
})).await;
4849

49-
let consumer = channel
50+
let mut consumer = channel
5051
.basic_consume(
5152
queue.name().as_str(),
5253
"consumer",
@@ -60,14 +61,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6061

6162
println!(" [*] Waiting for logs. To exit press CTRL+C");
6263

63-
for delivery in consumer {
64-
let (_, delivery) = delivery?;
65-
println!(
66-
" [x] {}:{:?}",
67-
delivery.routing_key,
68-
std::str::from_utf8(&delivery.data)?
69-
);
64+
while let Some(delivery) = consumer.next().await {
65+
if let Ok(delivery) = delivery {
66+
println!(
67+
" [x] {}:{:?}",
68+
delivery.routing_key,
69+
std::str::from_utf8(&delivery.data)?
70+
);
71+
}
7072
}
71-
7273
Ok(())
7374
}

rust/src/bin/receive_logs_topic.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use futures::StreamExt;
12
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, ExchangeKind};
23

34
#[tokio::main]
@@ -35,7 +36,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3536
)
3637
.await?;
3738

38-
futures::future::join_all(binding_keys.into_iter().map(|binding_key| {
39+
futures::future::join_all(binding_keys.iter().map(|binding_key| {
3940
channel.queue_bind(
4041
queue.name().as_str(),
4142
"topic_logs",
@@ -46,7 +47,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4647
}))
4748
.await;
4849

49-
let consumer = channel
50+
let mut consumer = channel
5051
.basic_consume(
5152
queue.name().as_str(),
5253
"consumer",
@@ -60,14 +61,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6061

6162
println!(" [*] Waiting for logs. To exit press CTRL+C");
6263

63-
for delivery in consumer {
64-
let (_, delivery) = delivery?;
65-
println!(
66-
" [x] {}:{:?}",
67-
delivery.routing_key,
68-
std::str::from_utf8(&delivery.data)?
69-
);
64+
65+
while let Some(delivery) = consumer.next().await {
66+
if let Ok(delivery) = delivery {
67+
println!(
68+
" [x] {}:{:?}",
69+
delivery.routing_key,
70+
std::str::from_utf8(&delivery.data)?
71+
);
72+
}
7073
}
7174

75+
7276
Ok(())
7377
}

0 commit comments

Comments
 (0)