This is the second of four posts in a series on combining web and gRPC services into a single service using Tower, Hyper, Axum, and Tonic. The full four parts are:
I recommend checking out the first post in the series if you haven’t already.
Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.
Service
trait, which is basically an asynchronous function from requests to responsesService
is parameterized on the request type, and has an associated type for Response
Error
type, and an associated Future
typeService
allows async behavior in both checking whether the service is ready to accept a request, and for handling the requestWith that in mind, let’s look at Hyper.
Now that we’ve got Tower under our belts a bit, it’s time to dive into the specific world of Hyper. Much of what we saw above will apply directly to Hyper. But Hyper has a few additional curveballs to deal with:
Request
and Response
types are parameterized over the representation of the request/response bodiesIn place of the run
function we had in our previous fake server example, Hyper follows a builder pattern for initializing HTTP servers. After providing configuration values, you create an active Server
value from your Builder
with the serve
method. Just to get it out of the way now, this is the type signature of serve
from the public docs:
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
I: Accept,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
That’s a lot of requirements, and not all of them are clear from the docs. Hopefully we can bring some clarity to this. But for now, let’s start off with something simpler: the “Hello world” example from the Hyper homepage:
use std::{convert::Infallible, net::SocketAddr};
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
async fn handle(_: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new("Hello, World!".into()))
}
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let make_svc = make_service_fn(|_conn| async {
Ok::<_, Infallible>(service_fn(handle))
});
let server = Server::bind(&addr).serve(make_svc);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
This follows the same pattern we established above:
handle
is an async function from a Request
to a Response
, which may fail with an Infallible
value.
Request
and Response
are parameterized with Body
, a default HTTP body representation.handle
gets wrapped up in service_fn
to produce a Service<Request<Body>>
. This is like app_fn
above.make_service_fn
, like app_factory_fn
above, to produce the Service<&AddrStream>
(we’ll get to that &AddrStream
shortly).
&AddrStream
value, so we ignore itmake_service_fn
must be a Future
, so we wrap with async
Future
must be a Result
, so we wrap with an Ok
Infallible
, otherwise it won’t know the type of the Ok(service_fn(handle))
expressionUsing this level of abstraction for writing a normal web app is painful for (at least) three different reasons:
Service
pieces manually is a painSo we’ll be more than happy to move on from Hyper to Axum a bit later. But for now, let’s continue exploring things at the Hyper layer.
service_fn
and make_service_fn
What I found most helpful when trying to grok Hyper was implementing a simple app without service_fn
and make_service_fn
. So let’s go through that ourselves here. We’re going to create a simple counter app (I’m nothing if not predictable). We’ll need two different data types: one for the “app factory”, and one for the app itself. Let’s start with the app itself:
struct DemoApp {
counter: Arc<AtomicUsize>,
}
impl Service<Request<Body>> for DemoApp {
type Response = Response<Body>;
type Error = hyper::http::Error;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _req: Request<Body>) -> Self::Future {
let counter = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let res = Response::builder()
.status(200)
.header("Content-Type", "text/plain; charset=utf-8")
.body(format!("Counter is at: {}", counter).into());
std::future::ready(res)
}
}
This implementation uses the std::future::Ready
struct to create a Future
which is immediately ready. In other words, our application doesn’t perform any async actions. I’ve set the Error
associated type to hyper::http::Error
. This error would be generated if, for example, you provided invalid strings to the header
method call, such as non-ASCII characters. As we’ve seen multiple times, poll_ready
just advertises that it’s always ready to handle another request.
The implementation of DemoAppFactory
isn’t terribly different:
struct DemoAppFactory {
counter: Arc<AtomicUsize>,
}
impl Service<&AddrStream> for DemoAppFactory {
type Response = DemoApp;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, conn: &AddrStream) -> Self::Future {
println!("Accepting a new connection from {:?}", conn);
std::future::ready(Ok(DemoApp {
counter: self.counter.clone()
}))
}
}
We have a different parameter to Service
, this time &AddrStream
. I did initially find the naming here confusing. In Tower, a Service
takes some Request
. And with our DemoApp
, the Request
it takes is a Hyper Request<Body>
. But in the case of DemoAppFactory
, the Request
it’s taking is a &AddrStream
. Keep in mind that a Service
is really just a generalization of failable, async functions from input to output. The input may be a Request<Body>
, or may be a &AddrStream
, or something else entirely.
Similarly, the “response” here isn’t an HTTP response, but a DemoApp
. I again find it easier to use the terms “input” and “output” to avoid the name overloading of request and response.
Finally, our main
function looks much the same as the original from the “Hello world” example:
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
let factory = DemoAppFactory {
counter: Arc::new(AtomicUsize::new(0)),
};
let server = Server::bind(&addr).serve(factory);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
If you’re looking to extend your understanding here, I’d recommend extending this example to perform some async actions within the app. How would you modify Future
? If you use a trait object, how exactly do you pin?
But now it’s time to take a dive into a topic I’ve avoided for a while.
Let’s refresh our memory from above on the signature of serve
:
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
I: Accept,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
Up until preparing this blog post, I have never tried to take a deep dive into understanding all of these bounds. So this will be an adventure for us all! (And perhaps it should end up with some documentation PRs by me…) Let’s start off with the type variables. Altogether, we have four: two on the impl
block itself, and two on this method:
I
represents the incoming stream of connections.E
represents the executor.S
is the service we’re going to run. Using our terminology from above, this would be the “app factory.” Using Tower/Hyper terminology, this is the “make service.”B
is the choice of response body the service returns (the “app”, not the “app factory”, using nomenclature above).I: Accept
I
needs to implement the Accept
trait, which represents the ability to accept a new connection from some a source. The only implementation out of the box is for AddrIncoming
, which can be created from a SocketAddr
. And in fact, that’s exactly what Server::bind
does.
Accept
has two associated types. Error
must be something that can be converted into an error object, or Into<Box<dyn StdError + Send + Sync>>
. This is the requirement of (almost?) every associated error type we look at, so from now on I’ll just skip over them. We need to be able to convert whatever error happened into a uniform representation.
The Conn
associated type represents an individual connection. In the case of AddrIncoming
, the associated type is AddrStream
. This type must implement AsyncRead
and AsyncWrite
for communication, Send
and 'static
so it can be sent to different threads, and Unpin
. The requirement for Unpin
bubbles up from deeper in the stack, and I honestly don’t know what drives it.
S: MakeServiceRef
MakeServiceRef
is one of those traits that doesn’t appear in the public documentation. This seems to be intentional. Reading the source:
Just a sort-of “trait alias” of
MakeService
, not to be implemented by anyone, only used as bounds.
Were you confused as to why we were receiving a reference with &AddrStream
? This is the trait that powers that transformation. Overall, the trait bound S: MakeServiceRef<I::Conn, Body, ResBody = B>
means:
S
must be a Service
S
will accept input of type &I::Conn
Service
as outputRequest<Body>
as input, and produce Response<ResBody>
as outputAnd while we’re talking about it: that ResBody
has the restriction that it must implement HttpBody
. As you might guess, the Body
struct mentioned above implements HttpBody
. There are a number of implementations too. When we get to Tonic and gRPC, we’ll see that there are, in fact, other response bodies we have to deal with.
NewSvcExec
and ConnStreamExec
The default value for the E
parameter is Exec
, which does not appear in the generated docs. But of course you can find it in the source. The concept of Exec
is to specify how tasks are spawned off. By default, it leverages tokio::spawn
.
I’m not entirely certain of how all of these plays out, but I believe the two traits in the heading allow for different handling of spawning for the connection service (app factory) versus the request service (app).
Axum is the new web framework that kicked off this whole blog post. Instead of dealing directly with Hyper like we did above, let’s reimplement our counter web service using Axum. We’ll be using axum = "0.2"
. The crate docs provide a great overview of Axum, and I’m not going to try to replicate that information here. Instead, here’s my rewritten code. We’ll analyze a few key pieces below:
use axum::extract::Extension;
use axum::handler::get;
use axum::{AddExtensionLayer, Router};
use hyper::{HeaderMap, Server, StatusCode};
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
#[derive(Clone, Default)]
struct AppState {
counter: Arc<AtomicUsize>,
}
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
let app = Router::new()
.route("/", get(home))
.layer(AddExtensionLayer::new(AppState::default()));
let server = Server::bind(&addr).serve(app.into_make_service());
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
async fn home(state: Extension<AppState>) -> (StatusCode, HeaderMap, String) {
let counter = state
.counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut headers = HeaderMap::new();
headers.insert("Content-Type", "text/plain; charset=utf-8".parse().unwrap());
let body = format!("Counter is at: {}", counter);
(StatusCode::OK, headers, body)
}
The first thing I’d like to get out of the way is this whole AddExtensionLayer
/Extension
bit. This is how we’re managing shared state within our application. It’s not directly relevant to our overall analysis of Tower and Hyper, so I’ll suffice with a link to the docs demonstrating how this works. Interestingly, you may notice that this implementation relies on middlewares, which does in fact leverage Tower, so it’s not completely separate.
Anyway, back to our point at hand. Within our main
function, we’re now using this Router
concept to build up our application:
let app = Router::new()
.route("/", get(home))
.layer(AddExtensionLayer::new(AppState::default()));
This says, essentially, “please call the home
function when you receive a request for /
, and add a middleware that does that whole extension thing.” The home
function uses an extractor to get the AppState
, and returns a value of type (StatusCode, HeaderMap, String)
to represent the response. In Axum, any implementation of the appropriately named IntoResponse
trait can be returned from handler functions.
Anyway, our app
value is now a Router
. But a Router
cannot be directly run by Hyper. Instead, we need to convert it into a MakeService
(a.k.a. an app factory). Fortunately, that’s easy: we call app.into_make_service()
. Let’s look at that method’s signature:
impl<S> Router<S> {
pub fn into_make_service(self) -> IntoMakeService<S>
where
S: Clone;
}
And going down the rabbit hole a bit further:
pub struct IntoMakeService<S> { /* fields omitted */ }
impl<S: Clone, T> Service<T> for IntoMakeService<S> {
type Response = S;
type Error = Infallible;
// other stuff omitted
}
The type Router<S>
is a value that can produce a service of type S
. IntoMakeService<S>
will take some kind of connection info, T
, and produce that service S
asynchronously. And since Error
is Infallible
, we know it can’t fail. But as much as we say “asynchronously”, looking at the implementation of Service
for IntoMakeService
, we see a familiar pattern:
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _target: T) -> Self::Future {
future::MakeRouteServiceFuture {
future: ready(Ok(self.service.clone())),
}
}
Also, notice how that T
value for connection info doesn’t actually have any bounds or other information. IntoMakeService
just throws away the connection information. (If you need it for some reason, see into_make_service_with_connect_info
.) In other words:
Router<S>
is a type that lets us add routes and middleware layersRouter<S>
into an IntoMakeService<S>
IntoMakeService<S>
is really just a fancy wrapper around an S
to appease the Hyper requirements around app factoriesS
So where does that S
type come from? It’s built up by all the route
and layer
calls you make. For example, check out the get
function’s signature:
pub fn get<H, B, T>(handler: H) -> OnMethod<H, B, T, EmptyRouter>
where
H: Handler<B, T>,
pub struct OnMethod<H, B, T, F> { /* fields omitted */ }
impl<H, B, T, F> Service<Request<B>> for OnMethod<H, B, T, F>
where
H: Handler<B, T>,
F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone,
B: Send + 'static,
{
type Response = Response<BoxBody>;
type Error = Infallible;
// and more stuff
}
get
returns an OnMethod
value. And OnMethod
is a Service
that takes a Request<B>
and returns a Response<BoxBody>
. There’s some funny business at play regarding the representations of bodies, which we’ll eventually dive into a bit more. But with our newfound understanding of Tower and Hyper, the types at play here are no longer inscrutable. In fact, they may even be scrutable!
And one final note on the example above. Axum works directly with a lot of the Hyper machinery. And that includes the Server
type. While the axum
crate reexports many things from Hyper, you can use those types directly from Hyper instead if so desired. In other words, Axum is pretty close to the underlying libraries, simply providing some convenience on top. It’s one of the reasons I’m pretty excited to get a bit deeper into my experiments with Axum.
So to sum up at this point:
Next step on our journey: let’s look at another library for building Hyper services. We’ll follow up on this in our next post.
If you’re looking for more Rust content from FP Complete, check out:
Subscribe to our blog via email
Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.