In my journey learning Rust I decided to take a look to the Tokio project. Tokio is a project to write fast networking code in Rust, it uses a set of concepts new to me, like futures among other things.
It’s worth to say that I failed in my intention of create a network application, it was difficult due to my lack of knowledge of Rust and all this new concepts at the same time. So I decide to create a code review of one of their examples, the chat example.
The chat example
This example creates a “chat server” that receives connections and then every client can type messages. All the clients will receive the message but not the sender.
When the application is ready its only needed to run.
nc 127.0.0.1 8080
And start typing. This can be launched in different terminal to see the messages appear.
Creating the project
Like any other Rust project we create it using cargo
, just run:
cargo init --bin chatserver
We will write all the program content on main.rs
file. Also we need to update the Cargo.toml
file the required dependencies.
[dependencies]
futures = "0.1"
tokio-core = "0.1"
Looking in the code
The included crates
This example only uses two crates, so they only require:
extern crate tokio_core;
extern crate futures;
tokio-core
manages the low level networking as well as other tools, futures
is the crate that enable us to have asynchronous events.
Included components
use std::collections::HashMap;
use std::rc::Rc;
use std::cell::RefCell;
use std::iter;
use std::env;
use std::io::{Error, ErrorKind, BufReader};
use std::net::SocketAddr;
These are the components needed from the standard library. The HashMap
will be used to save all the references to the connected clients.
Rc
and RefCell
are type wrappers to provide mutability features. Rc
let us to share data having multiple “owners” by using a reference count, the data is freed once the counter decreases to zero. RefCell
provides internal mutability. Here is a good article on both topics.
The iter
library let us use iterators, env
is to use the command line arguments in this program. Error
and ErrorKind
are used to create errors when the connection is lost. BufReader
its used to read incoming data.
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_core::io::{self, Io};
use futures::stream::{self, Stream};
use futures::Future;
These are the tokio
and futures
use statements. Basically we are using a TcpListener
to create the sockets, and Stream
to handle all the incoming connections and some helpers to read and write data.
Creating the TCP listener
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
let connections = Rc::new(RefCell::new(HashMap::new()));
Let’s go line by line:
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
The env::args()
function returns the arguments used to run the program, an Args
iterator is returned. Args
has the nth
method which returns the n
element of the iterator. nth
will return and Option<Self::Item>
with the supplied argument or will be None
if there’s nothing in the iterator at that index value.
So unwrap_or
is a method of that Option
returned that unlike unwrap
that will panic if None
is received, it will set 127.0.0.1:8080
as the default value. String literals are of str
type, a slice of characters which is not the same as a String
. The to_string()
is used to convert a slice of characters into a String. Here is a good article on str vs String.
let addr = addr.parse::<SocketAddr>().unwrap();
The first thing that can be noticed in this line is that the addr
binding is declared again. This is possible but we’ll lose the old value, in this case we don’t care about that because we are saving a new value of addr
from the parsed old one. The addr
string has a method parse
that parses the current string into a new one depending on the specified type. The “turbofish” syntax, ::<>
, helps to determine which algorithm will be use for the parsing. In this case the SocketAddr
type is used to parse a correct IP. Forget about regex to parse a correct IP address, this does the job :).
If a malformed string is passed the program will panic due to the unwrap
at the end of the line.
let mut core = Core::new().unwrap();
let handle = core.handle();
The Core::new()
creates a new event loop which is the core of this program. The Core
will receive Futures
to handle them when are ready. Think on a Future
as something that will be finished in the future. The concept of callback can sound similar, although from a functionality perspective it would be the same, internally is complete different. The good part of this is that a single threaded program can handle multiples futures. More on event loops in tokio can be found here.
A handle
is created to get access to the loop.
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
A socket is created by binding the address with the handle of the event loop. Now the socket is also bind with the event loop. socket
is TcpListener
type. A message is printed to show that we are listening in the specified connection.
let connections = Rc::new(RefCell::new(HashMap::new()));
The connections
hashmap is created to be referenced counted by Rc
and internally mutable with RefCell
. connections
will store the list of open connections to the server.
The stream of incoming sockets
let srv = socket.incoming().for_each(move |(stream, addr)| {
// A lot of code here...
});
This is a big block in the code, so I’m separating the explanation in parts. socket
is a TcpListener
that has a incoming
method. This method will return a stream of the sockets this listener support. incoming()
returns an iterator and that is way there is a for_earch
function that is in charge of handle each received connection.
The for_each
function uses a closure as parameter with two arguments, stream
and addr
.
For each socket
All the code inside the for_each
will be explained here. Remember, we have two bindings in this scope stream
and addr
.
println!("New Connection: {}", addr);
let (reader, writer) = stream.split();
First, the information that a new connection was received is printed. Then the stream is split into two pieces, the reader and the writer.
let (tx, rx) = futures::sync::mpsc::unbounded();
connections.borrow_mut().insert(addr, tx);
In the first line, two channels are created to send and receive data, the sender then is inserted into the connections
hashmap. At this point we have the addr
as a key and the sender channel.
let connections_inner = connections.clone();
let reader = BufReader::new(reader);
The connections
hashmap is duplicated into connections_inner
for later use. Also a new BufReader
is created.
let iter = stream::iter(iter::repeat(()).map(Ok::<(), Error>));
The iter
function of stream
creates an iterator which is created by iter::repeat(()).map(Ok::<(), Error>)
. The iter::repeat(())
creates an infinite iterator of ()
value.