so you want to write a protocol client in Rust

Bhargav Voleti

A bit about me

  • CS grad student at UIC
  • Systems, networks and distributed systems
  • Using Rust seriously since over an year

What this talk is not about

  • How futures work
  • How tokio works
  • Major differences between futures 0.1 and 0.3
  • async/await

what this talk is about

  • What is Beanstalkd?
  • I/O in tokio
  • What is tokio-codec
  • Bringing it all together

Beanstalkd

A simple work queue

  • A simple work queue
  • Jobs
  • Tubes
  • Job priorities.
  • Delayed jobs

beanstalkd protocol

 A simple example

// The producer
extern crate beanstalkd;

use beanstalkd::Beanstalkd;

fn main() {
    let mut beanstalkd = Beanstalkd::localhost().unwrap();
    let _ = beanstalkd.put("Hello World", 0, 0, 10000);
}

// The consumer
let mut beanstalkd = Beanstalkd::localhost().unwrap();
let (id, body) = beanstalkd.reserve().unwrap();
println!("{}", body);
let _ = beanstalkd.delete(id);

Futures

Represents a value in the future

pub trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}

Combinators

Beanstalkd::connect(&"127.0.0.1:11300".parse()
    .unwrap())
    .and_then(|bean| {
        bean.put(0, 1, 100, &b"update:42"[..])
            .inspect(|(_, response)| {
                response.as_ref().unwrap();
            })
            .and_then(|(bean, _)| {
                // Use a particular tube
                bean.using("notifications")
            }).and_then(|(bean, _)| bean.put(..))
    })

useful combinators

  • and_then
  • map
  • then
  • map_err
  • inspect

Learn more

Tokio

asynchronous runtime

  • Scheduler
  • Reactor (event loop)
  • I/O
  • TCP & UDP
  • Timers

stream and sink

  • Traits
  • Working with IO
  • Wrap an IO object

Stream

pub trait Stream {
    type Item;
    type Error;
    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>;
}
  • A stream of futures
  • Can be thought of as an iterator
  • Wraps over IO object.

Sink

pub trait Sink {
    type SinkItem;
    type SinkError;
    fn start_send(
        &mut self, 
        item: Self::SinkItem
    ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError>;
    fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError>;
}
  • A place to write values to
  • Values written asynchronously
  • Done in a two phase way

tokio codec

decoder

pub trait Decoder {
    type Item;
    type Error: From<Error>;
    fn decode(
        &mut self, 
        src: &mut BytesMut
    ) -> Result<Option<Self::Item>, Self::Error>;
}
  • Decodes frames
  • Works with BytesMut buffer
  • Bytes crate is awesome!

Encoder

pub trait Encoder {
    type Item;
    type Error: From<Error>;
    fn encode(
        &mut self, 
        item: Self::Item, 
        dst: &mut BytesMut
    ) -> Result<(), Self::Error>;
}
  • Encodes frames
  • Similar to Decoder

codec

  • A type implementing Decoder and Encoder
  • The core of your protocol handling
  • Encodes Requests
  • Decodes Responses

framed

impl<T, U> Framed<T, U> 
where
    T: AsyncRead + AsyncWrite,
    U: Decoder + Encoder,
  • Wraps over an IO object and a Codec
  • Provides a Sink and a Stream
  • Unified way of writing to and reading from an IO object.

Bringing it all together

tokio-beanstalkd

Api

Beanstalkd::connect(&"127.0.0.1:11300".parse()
    .unwrap())
    .and_then(|bean| {
        bean.put(0, 1, 100, &b"update:42"[..])
            .inspect(|(_, response)| {
                response.as_ref().unwrap();
            })
            .and_then(|(bean, _)| {
                // Use a particular tube
                bean.using("notifications")
            }).and_then(|(bean, _)| bean.put(...))
    }),
  • Commands as methods
  • Heavy use of combinators.

using framed

fn setup(stream: tokio::net::TcpStream) -> Self {
    let bean = Framed::new(stream, proto::CommandCodec::new())
    Beanstalkd { connection: bean }
}
  • Use the TcpStream and Codec
  • Now we can send on the Sink and read from the Stream

the request

pub fn put<D>(self, priority: u32, delay: u32, ttr: u32, data: D) 
    -> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
    where
        D: Into<Cow<'static, [u8]>>,
    {
        let data = data.into();
        self.connection
        .send(Request::Put { .. } )
        //
}
  • Types ensure that erroneous commands are not possible. 
  • Send on the sink

Encoding

impl Encoder for CommandCodec {
    type Item = Request;
    type Error = failure::Error;

    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> 
        Result<(), Self::Error> {
        // serialize request
    }
}
  • When requests are written to the sink
  • Implemented for CommandCodec

Decoding

impl Decoder for CommandCodec {
    type Item = AnyResponse;
    type Error = Decode;

    fn decode(&mut self, src: &mut BytesMut) -> 
        Result<Option<Self::Item>, Self::Error> {
        // parse bytes into Response
    }
}
  • Called when reading from Framed using the Stream interface
  • Implemented on CommandCodec

response

pub fn put<D>(self, priority: u32, delay: u32, ttr: u32, data: D) 
    -> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
    where
        D: Into<Cow<'static, [u8]>>,
    {
        // After making the request
        .send(Request::Put { .. } )
        .and_then(|conn| { handle_response!(...) })
}
  • Get Response from server
  • Automatically decoded using CommandCodec
  • Option<Result<Response, Error>>

Response - cont

macro_rules! handle_response {
    ($input:ident, $mapping:tt, $error_mapping:tt) => {
        $input.into_future().then(|val| match val {
            Ok((Some(val), conn)) => // handle success case,
            // None is only returned when the stream is closed
            Ok((None, _)) => // stream closed unexpectedly,
            Err((e, conn)) => // some error occurred.,
        })
    };
}
  • into_response converts stream into future
  • Handle three possible cases

Errors

  • ProtocolErrors -> Internal
  • CommandErrors -> Custom Error Types
  • General Errors -> failure::Error

in closing

tokio-codec

  • Is awesome
  • Makes writing protocols far simpler.
  • Uses a Codec to Encode Requests and Decode Responses.
  • We can provide a simple API to use.

further reading

 Fin - questions?