Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

start tutorial #1

Merged
merged 11 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use std::{collections::HashMap, net::ToSocketAddrs};

use futures::{
io::{BufReader, WriteHalf},
stream::FuturesUnordered,
io::{BufReader, WriteHalf, AsyncRead},
stream::{FuturesUnordered, Stream},
channel::mpsc::{self, unbounded},
SinkExt,
select,
Expand Down
112 changes: 112 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#![feature(async_await)]

use std::{net::ToSocketAddrs, sync::Arc, collections::HashMap};

use futures::channel::mpsc;
use futures::SinkExt;

use async_std::{
io::BufReader,
prelude::*,
task,
net::{TcpListener, TcpStream},
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;


fn main() -> Result<()> {
task::block_on(server("127.0.0.1:8080"))
}

async fn server(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;

let (broker_sender, broker_receiver) = mpsc::unbounded();
let broker = task::spawn(broker(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
let _handle = task::spawn(client(broker_sender.clone(), stream));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be best to unwrap the error here so that a panic crashes the program instead of silently continuing, perhaps like this:

let broker_sender = broker_sender.clone();
task::spawn(async move {
    client(broker_sender, stream).unwrap();
});

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is the case where we should not crash. If a single client fails due to some IO error (which might be on the client's side!) other clients should not be affected.

I am planing to write about graceful shutdown next (so that we can reap errors at least in the end) and then about client's disconnections (which will bring us to futures_unordered and just in time error reaping).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough! But, at the very least, surely we should at least print an error on the screen? Maybe like this:

let broker_sender = broker_sender.clone();
task::spawn(async move {
    if let Err(err) = client(broker_sender, stream) {
        eprintln!("{:?}", err);
    }
});

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense, especially if we gloss over the fact that eprintln! is blocking :-)

I've actually got a better idea of how to handle disconnections overnight. Instead of futures unorderned and waiting for the tasks to join, we'll just use channels. I think that's what Go does: IIRC, there's no way to wait until go routine is joined, you can only wait until it sends an "I am about to die" message over a channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the strategy that I usually use.

Btw, considering that println! and eprintln! are blocking, should we provide our own versions of them?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, considering that println! and eprintln! are blocking, should we provide our own versions of them?

Maybe, but I'm not sure what's the best way of approaching that.

Copy link
Contributor

@skade skade Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cough is it possible to extend clippy by using it as a library? (async-clippy)

I see a problem here: is it async_std::println!("fooo").await or async_std::eprintln!("foo"). In general, I think we should encourage using log! here anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking further about this, println and friends in an async block is probably a good clippy lint to suggest.

}
broker.await?;
Ok(())
}

async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream);
let reader = BufReader::new(&*stream);
let mut lines = reader.lines();

let name = match lines.next().await {
None => Err("peer disconnected immediately")?,
Some(line) => line?,
};
broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await.unwrap();

while let Some(line) = lines.next().await {
let line = line?;
let (dest, msg) = match line.find(':') {
None => continue,
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
};
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
let msg: String = msg.trim().to_string();

broker.send(Event::Message {
from: name.clone(),
to: dest,
msg,
}).await.unwrap();
}
Ok(())
}

async fn client_writer(
mut messages: Receiver<String>,
stream: Arc<TcpStream>,
) -> Result<()> {
let mut stream = &*stream;
while let Some(msg) = messages.next().await {
stream.write_all(msg.as_bytes()).await?;
}
Ok(())
}

#[derive(Debug)]
enum Event {
NewPeer {
name: String,
stream: Arc<TcpStream>,
},
Message {
from: String,
to: Vec<String>,
msg: String,
},
}

async fn broker(mut events: Receiver<Event>) -> Result<()> {
let mut peers: HashMap<String, Sender<String>> = HashMap::new();

while let Some(event) = events.next().await {
match event {
Event::Message { from, to, msg } => {
for addr in to {
if let Some(peer) = peers.get_mut(&addr) {
peer.send(format!("from {}: {}\n", from, msg)).await?
}
}
}
Event::NewPeer { name, stream} => {
let (client_sender, client_receiver) = mpsc::unbounded();
peers.insert(name.clone(), client_sender);
let _handle = task::spawn(client_writer(client_receiver, stream));
}
}
}
Ok(())
}
Loading