This blog post is a direct follow up on my previous blog post on different levels of async in Rust. You may want to check that one out before diving in here.
Alright, so now we know that we can make our programs asynchronous by using non-blocking I/O calls. But last time we only saw examples that remained completely sequential, defeating the whole purpose of async. Let’s change that with something more sophisticated.
A few months ago I needed to ensure that all the URLs for a domain name resolved to either a real web page (200 status code) or redirected to somewhere else with a real web page. To make that happen, I needed a program that would:
To make this simple, we’re going to take a lot of shortcuts like:
println!
for generating CSV output instead of using a libraryFor the curious: the original version of this was a really short Haskell program that had these properties. For fun a few weeks back, I rewrote it in two ways in Rust, which ultimately led to this pair of blog posts.
Like last time, I recommend following along with my code. I’ll kick this off with cargo new httpstatus
. And then to avoid further futzing with our Cargo.toml
, let’s add our dependencies preemptively:
[dependencies]
tokio = { version = "0.2.22", features = ["full"] }
reqwest = { version = "0.10.8", features = ["blocking"] }
async-channel = "1.4.1"
is_type = "0.2.1"
That features = ["blocking"]
should hopefully grab your attention. The reqwest
library provides an optional, fully blocking API. That seems like a great place to get started. Here’s a nice, simple program that does what we need:
// To use .lines() before, just like last time
use std::io::BufRead;
// We'll return _some_ kind of an error
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Open the file for input
let file = std::fs::File::open("urls.txt")?;
// Make a buffered version so we can read lines
let buffile = std::io::BufReader::new(file);
// CSV header
println!("URL,Status");
// Create a client so we can make requests
let client = reqwest::blocking::Client::new();
for line in buffile.lines() {
// Error handling on reading the lines in the file
let line = line?;
// Make a request and send it, getting a response
let resp = client.get(&line).send()?;
// Print the status code
println!("{},{}", line, resp.status().as_u16());
}
Ok(())
}
Thanks to Rust’s ?
syntax, error handling is pretty easy here. In fact, there are basically no gotchas here. reqwest
makes this code really easy to write!
Once you put a urls.txt
file together, such as the following:
https://www.wikipedia.org
https://www.wikipedia.org/path-the-does-not-exist
http://wikipedia.org
You’ll hopefully get output such as:
URL,Status
https://www.wikipedia.org,200
https://www.wikipedia.org/path-the-does-not-exist,404
http://wikipedia.org,200
The logic above is pretty easy to follow, and hopefully the inline comments explain anything confusing. With that idea in mind, let’s up our game a bit.
Let’s first move away from the blocking API in reqwest
, but still keep all the sequential nature of the program. This involves four relatively minor changes to the code, all spelled out below:
use std::io::BufRead;
// First change: add the Tokio runtime
#[tokio::main]
// Second: turn this into an async function
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = std::fs::File::open("urls.txt")?;
let buffile = std::io::BufReader::new(file);
println!("URL,Status");
// Third change: Now we make an async Client
let client = reqwest::Client::new();
for line in buffile.lines() {
let line = line?;
// Fourth change: We need to .await after send()
let resp = client.get(&line).send().await?;
println!("{},{}", line, resp.status().as_u16());
}
Ok(())
}
The program is still fully sequential: we fully send a request, then get the response, before we move onto the next URL. But we’re at least ready to start playing with different async approaches.
IF you remember from last time, we had a bit of a philosophical discussion on the nature of blocking, and that ultimately some blocking is OK in a program. In order to both simplify what we do here, as well as provide some real-world recommendations, let’s list all of the blocking I/O we’re doing:
urls.txt
stdout
with println!
Note that, even though we’re sequentially running our HTTP requests right now, those are in fact using non-blocking I/O. Therefore, I haven’t included anything related to HTTP in the list above. We’ll start dealing with the sequential nature next.
Returning to the four blocking I/O calls above, I’m going to make a bold statement: don’t bother making them non-blocking. It’s not actually terribly difficult to do the file I/O using tokio
(we saw how last time). But we get virtually no benefit from doing so. The latency for local disk access, especially when we’re talking a file as small as urls.txt
is likely to be, and especially in contrast to a bunch of HTTP requests, is miniscule.
Feel free to disagree with me, or to take on making those calls non-blocking as an exercise. But I’m going to focus instead on higher value targets.
The real problem here is that we have sequential HTTP requests going on. Instead, we would much prefer to make our requests concurrently. If we assume there are 100 URLs, and each request takes 1 second (hopefully an overestimation), a sequential algorithm can at best finish in 100 seconds. However, a concurrent algorithm could in theory finish all 100 requests in just 1 second. In reality that’s pretty unlikely to happen, but it is completely reasonable to expect a significant speedup factor, depending on network conditions, number of hosts you’re connecting to, and other similar factors.
So how exactly do we do concurrency with tokio
? The most basic answer is the tokio::spawn
function. This spawns a new task in the tokio
runtime. This is similar in principle to spawning a new system thread. But instead, running and scheduling is managed by the runtime instead of the operating system. Let’s take a first stab at spawning each HTTP request into its own task:
tokio::spawn(async move {
let resp = client.get(&line).send().await?;
println!("{},{}", line, resp.status().as_u16());
});
That looks nice, but we have a problem:
error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
--> srcmain.rs:16:24
|
15 | tokio::spawn(async move {
| _________________________________-
16 | | let resp = client.get(&line).send().await?;
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in an async block that returns `()`
17 | |
18 | | println!("{},{}", line, resp.status().as_u16());
19 | | });
| |_________- this function should return `Result` or `Option` to accept `?`
Our task doesn’t return a Result
, and therefore has no way to complain about errors. This is actually indicating a far more serious issue, which we’ll get to later. But for now, let’s just pretend errors won’t happen, and cheat a bit with .unwrap()
:
let resp = client.get(&line).send().await.unwrap();
This also fails, now with an ownership issue:
error[E0382]: use of moved value: `client`
--> srcmain.rs:15:33
|
10 | let client = reqwest::Client::new();
| ------ move occurs because `client` has type `reqwest::async_impl::client::Client`, which does not implement the `Copy` trait
This one is easier to address. The Client
is being shared by multiple tasks. But each task needs to make its own clone of the Client
. If you read the docs, you’ll see that this is recommended behavior:
The
Client
holds a connection pool internally, so it is advised that you create one and reuse it.You do not have to wrap the
Client
it in anRc
orArc
to reuse it, because it already uses anArc
internally.
Once we add this line before our tokio::spawn
, our code will compile:
let client = client.clone();
Unfortunately, things fail pretty spectacularly at runtime:
URL,Status
thread 'thread 'tokio-runtime-workerthread 'tokio-runtime-worker' panicked at '' panicked at 'tokio-runtime-workercalled `Result::unwrap()` on an `Err` value: reqwest::Error { kind: Request, url: "https://www.wikipedia.org/path-the-does-not-exist", source: hyper::Error(Connect, ConnectError("dns error", Custom { kind: Interrupted, error: JoinError::Cancelled })) }called `Result::unwrap()` on an `Err` value: reqwest::Error { kind: Request, url: "https://www.wikipedia.org/", source: hyper::Error(Connect, ConnectError("dns error", Custom { kind: Interrupted, error: JoinError::Cancelled })) }' panicked at '', ', called `Result::unwrap()` on an `Err` value: reqwest::Error { kind: Request, url: "http://wikipedia.org/", source: hyper::Error(Connect, ConnectError("dns error", Custom { kind: Interrupted, error: JoinError::Cancelled })) }srcmain.rssrcmain.rs', ::srcmain.rs1717:::241724
That’s a big error message, but the important bit for us is a bunch of JoinError::Cancelled
stuff all over the place.
Let’s talk through what’s happening in our program:
Client
main
function, which triggers the runtime to shut downThe problem is that we reach (5) long before we finish (4). When this happens, all in-flight I/O will be cancelled, which leads to the error messages we saw above. Instead, we need to ensure we wait for each task to complete before we exit. The easiest way to do this is to call .await
on the result of the tokio::spawn
call. (Those results, by the way, are called JoinHandle
s.) However, doing so immediately will completely defeat the purpose of our concurrent work, since we will once again be sequential!
Instead, we want to spawn all of the tasks, and then wait for them all to complete. One easy way to achieve this is to put all of the JoinHandle
s into a Vec
. Let’s look at the code. And since we’ve made a bunch of changes since our last complete code dump, I’ll show you the full current status of our source file:
use std::io::BufRead;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = std::fs::File::open("urls.txt")?;
let buffile = std::io::BufReader::new(file);
println!("URL,Status");
let client = reqwest::Client::new();
let mut handles = Vec::new();
for line in buffile.lines() {
let line = line?;
let client = client.clone();
let handle = tokio::spawn(async move {
let resp = client.get(&line).send().await.unwrap();
println!("{},{}", line, resp.status().as_u16());
});
handles.push(handle);
}
for handle in handles {
handle.await?;
}
Ok(())
}
And finally we have a concurrent program! This is actually pretty good, but it has two flaws we’d like to fix:
.unwrap()
. I mentioned this above, and said our usage of .unwrap()
was indicating a “far more serious issue.” That issue was the fact that the result values from spawning subthreads are never noticed by the main thread, which is really the core issue causing the cancellation we discussed above. It’s always nice when type-driven error messages indicate a runtime bug in our code!NOTE It would be possible in the program above to skip the spawn
s and collect a Vec
of Future
s, then await
on those. However, that would once again end up sequential in nature. Spawning allows all of those Future
s to run concurrently, and be polled by the tokio
runtime itself. It would also be possible to use join_all
to poll all of the Future
s, but it has some performance issues. So best to stick with tokio::spawn
.
Let’s address the simpler one first: proper error handling.
The basic concept of error handling is that we want the errors from the spawned tasks to be detected in the main tasks, and then cause the application to exit. One way to handle that is to return the Err
values from the spawned tasks directly, and then pick them up with the JoinHandle
that spawn
returns. This sounds nice, but naively implemented will result in checking the error responses one at a time. Instead, we’d rather fail early, by detecting that (for example) the 57th request failed and immediately terminating the application.
You could do some kind of a “tell me which is the first JoinHandle
that’s ready,” but it’s not the way I initially implemented it, and some quick Googling indicated you’d have to be careful about which library functions you use. Instead, we’ll try a different approach using an mpsc
(multi-producer, single-consumer).
Here’s the basic idea. Let’s pretend there are 100 URLs in the file. We’ll spawn 100 tasks. Each of those tasks will write a single value onto the mpsc
channel: a Result<(), Error>
. Then, in the main
task, we’ll read 100 values off of the channel. If any of them are Err
, we exit the program immediately. Otherwise, if we read off 100 Ok
values, we exit successfully.
Before we read the file, we don’t know how many lines will be in it. So we’re going to use an unbounded channel. This isn’t generally recommended practice, but it ties in closely with my second complaint above: we’re spawning a separate task for each line in the file instead of doing something more intelligent like a job queue. In other words, if we can safely spawn N tasks, we can safely have an unbounded channel of size N.
Alright, let’s see the code in question!
use std::io::BufRead;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let file = std::fs::File::open("urls.txt")?;
let buffile = std::io::BufReader::new(file);
println!("URL,Status");
let client = reqwest::Client::new();
// Create the channel. tx will be the sending side (each spawned task),
// and rx will be the receiving side (the main task after spawning).
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
// Keep track of how many lines are in the file, and therefore
// how many tasks we spawned
let mut count = 0;
for line in buffile.lines() {
let line = line?;
let client = client.clone();
// Each spawned task gets its own copy of tx
let tx = tx.clone();
tokio::spawn(async move {
// Use a map to say: if the request went through
// successfully, then print it. Otherwise:
// keep the error
let msg = client.get(&line).send().await.map(|resp| {
println!("{},{}", line, resp.status().as_u16());
});
// And send the message to the channel. We ignore errors here.
// An error during sending would mean that the receiving side
// is already closed, which would indicate either programmer
// error, or that our application is shutting down because
// another task generated an error.
tx.send(msg).unwrap();
});
// Increase the count of spawned tasks
count += 1;
}
// Drop the sending side, so that we get a None when
// calling rx.recv() one final time. This allows us to
// test some extra assertions below
std::mem::drop(tx);
let mut i = 0;
loop {
match rx.recv().await {
// All senders are gone, which must mean that
// we're at the end of our loop
None => {
assert_eq!(i, count);
break Ok(());
}
// Something finished successfully, make sure
// that we haven't reached the final item yet
Some(Ok(())) => {
assert!(i < count);
}
// Oops, an error! Time to exit!
Some(Err(e)) => {
assert!(i < count);
return Err(From::from(e));
}
}
i += 1;
}
}
With this in place, we now have a proper concurrent program that does error handling correctly. Nifty! Before we hit the job queue, let’s clean this up a bit.
The previous code works well. It allows us to spawn multiple worker tasks, and then wait for all of them to complete, handling errors when they occur. Let’s generalize this! We’re doing this now since it will make the final step in this blog post much easier.
We’ll put all of the code for this in a separate module of our project. The code will be mostly the same as what we had before, except we’ll have a nice struct
to hold onto our data, and we’ll be more explicit about the error type. Put this code into src/workers.rs
:
use is_type::Is; // fun trick, we'll look at it below
use std::future::Future;
use tokio::sync::mpsc;
/// Spawn and then run workers to completion, handling errors
pub struct Workers<E> {
count: usize,
tx: mpsc::UnboundedSender<Result<(), E>>,
rx: mpsc::UnboundedReceiver<Result<(), E>>,
}
impl<E: Send + 'static> Workers<E> {
/// Create a new Workers value
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded_channel();
Workers { count: 0, tx, rx }
}
/// Spawn a new task to run inside this Workers
pub fn spawn<T>(&mut self, task: T)
where
// Make sure we can run the task
T: Future + Send + 'static,
// And a weird trick: make sure that the output
// from the task is Result<(), E>
// Equality constraints would make this much nicer
// See: https://github.com/rust-lang/rust/issues/20041
T::Output: Is<Type = Result<(), E>>,
{
// Get a new copy of the send side
let tx = self.tx.clone();
// Spawn a new task
tokio::spawn(async move {
// Run the provided task and get its result
let res = task.await;
// Send the task to the channel
// This should never fail, so we panic if something goes wrong
match tx.send(res.into_val()) {
Ok(()) => (),
// could use .unwrap, but that would require Debug constraint
Err(_) => panic!("Impossible happend! tx.send failed"),
}
});
// One more worker to wait for
self.count += 1;
}
/// Finish running all of the workers, exiting when the first one errors or all of them complete
pub async fn run(mut self) -> Result<(), E> {
// Make sure we don't wait for ourself here
std::mem::drop(self.tx);
// How many workers have completed?
let mut i = 0;
loop {
match self.rx.recv().await {
None => {
assert_eq!(i, self.count);
break Ok(());
}
Some(Ok(())) => {
assert!(i < self.count);
}
Some(Err(e)) => {
assert!(i < self.count);
return Err(e);
}
}
i += 1;
}
}
}
Now in src/main.rs
, we’re going to get to focus on just our business logic… and error handling. Have a look at the new contents:
// Indicate that we have another module
mod workers;
use std::io::BufRead;
/// Create a new error type to handle the two ways errors can happen.
#[derive(Debug)]
enum AppError {
IO(std::io::Error),
Reqwest(reqwest::Error),
}
// And now implement some boilerplate From impls to support ? syntax
impl From<std::io::Error> for AppError {
fn from(e: std::io::Error) -> Self {
AppError::IO(e)
}
}
impl From<reqwest::Error> for AppError {
fn from(e: reqwest::Error) -> Self {
AppError::Reqwest(e)
}
}
#[tokio::main]
async fn main() -> Result<(), AppError> {
let file = std::fs::File::open("urls.txt")?;
let buffile = std::io::BufReader::new(file);
println!("URL,Status");
let client = reqwest::Client::new();
let mut workers = workers::Workers::new();
for line in buffile.lines() {
let line = line?;
let client = client.clone();
// Use workers.spawn, and no longer worry about results
// ? works just fine inside!
workers.spawn(async move {
let resp = client.get(&line).send().await?;
println!("{},{}", line, resp.status().as_u16());
Ok(())
})
}
// Wait for the workers to complete
workers.run().await
}
There’s more noise around error handling, but overall the code is easier to understand. Now that we have that out of the way, we’re finally ready to tackle the last piece of this…
Let’s review again at a high level how we do error handling with workers. We set up a channel to allow each worker task to send its results to a single receiver, the main task. We used mpsc
, or “multi-producer single-consumer.” That matches up with what we just described, right?
OK, a job queue is kind of similar. We want to have a single task that reads lines from the file and feeds them into a channel. Then, we want multiple workers to read values from the channel. This is “single-producer multi-consumer.” Unfortunately, tokio
doesn’t provide such a channel out of the box. After I asked on Twitter, I was recommended to use async-channel, which provides a “multi-producer multi-consumer.” That works for us!
Thanks to our work before with the Workers
struct
refactor, this is now pretty easy. Let’s have a look at the modified main
function:
#[tokio::main]
async fn main() -> Result<(), AppError> {
let file = std::fs::File::open("urls.txt")?;
let buffile = std::io::BufReader::new(file);
println!("URL,Status");
// Feel free to define to any numnber (> 0) you want
// At a value of 4, this could comfortably fit in OS threads
// But tasks are certainly up to the challenge, and will scale
// up more nicely for large numbers and more complex applications
const WORKERS: usize = 4;
let client = reqwest::Client::new();
let mut workers = workers::Workers::new();
// Buffers double the size of the number of workers are common
let (tx, rx) = async_channel::bounded(WORKERS * 2);
// Spawn the task to fill up the queue
workers.spawn(async move {
for line in buffile.lines() {
let line = line?;
tx.send(line).await.unwrap();
}
Ok(())
});
// Spawn off the individual workers
for _ in 0..WORKERS {
let client = client.clone();
let rx = rx.clone();
workers.spawn(async move {
loop {
match rx.recv().await {
// uses Err to represent a closed channel due to tx being dropped
Err(_) => break Ok(()),
Ok(line) => {
let resp = client.get(&line).send().await?;
println!("{},{}", line, resp.status().as_u16());
}
}
}
})
}
// Wait for the workers to complete
workers.run().await
}
And just like that, we have a concurrent job queue! It’s everything we could have wanted!
I’ll admit, when I wrote the post last week, I didn’t think I’d be going this deep into the topic. But once I started playing with solutions, I decided I wanted to implement a full job queue for this.
I hope you found this topic interesting! If you want more Rust content, please hit me up on Twitter. Also, feel free to check out some of our other Rust content:
Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.