Bhargav Voleti
// The producer
extern crate beanstalkd;
use beanstalkd::Beanstalkd;
fn main() {
    let mut beanstalkd = Beanstalkd::localhost().unwrap();
    let _ = beanstalkd.put("Hello World", 0, 0, 10000);
}
// The consumer
let mut beanstalkd = Beanstalkd::localhost().unwrap();
let (id, body) = beanstalkd.reserve().unwrap();
println!("{}", body);
let _ = beanstalkd.delete(id);pub trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}Beanstalkd::connect(&"127.0.0.1:11300".parse()
    .unwrap())
    .and_then(|bean| {
        bean.put(0, 1, 100, &b"update:42"[..])
            .inspect(|(_, response)| {
                response.as_ref().unwrap();
            })
            .and_then(|(bean, _)| {
                // Use a particular tube
                bean.using("notifications")
            }).and_then(|(bean, _)| bean.put(..))
    })pub trait Stream {
    type Item;
    type Error;
    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>;
}pub trait Sink {
    type SinkItem;
    type SinkError;
    fn start_send(
        &mut self, 
        item: Self::SinkItem
    ) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError>;
    fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError>;
}pub trait Decoder {
    type Item;
    type Error: From<Error>;
    fn decode(
        &mut self, 
        src: &mut BytesMut
    ) -> Result<Option<Self::Item>, Self::Error>;
}pub trait Encoder {
    type Item;
    type Error: From<Error>;
    fn encode(
        &mut self, 
        item: Self::Item, 
        dst: &mut BytesMut
    ) -> Result<(), Self::Error>;
}impl<T, U> Framed<T, U> 
where
    T: AsyncRead + AsyncWrite,
    U: Decoder + Encoder,Beanstalkd::connect(&"127.0.0.1:11300".parse()
    .unwrap())
    .and_then(|bean| {
        bean.put(0, 1, 100, &b"update:42"[..])
            .inspect(|(_, response)| {
                response.as_ref().unwrap();
            })
            .and_then(|(bean, _)| {
                // Use a particular tube
                bean.using("notifications")
            }).and_then(|(bean, _)| bean.put(...))
    }),fn setup(stream: tokio::net::TcpStream) -> Self {
    let bean = Framed::new(stream, proto::CommandCodec::new())
    Beanstalkd { connection: bean }
}pub fn put<D>(self, priority: u32, delay: u32, ttr: u32, data: D) 
    -> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
    where
        D: Into<Cow<'static, [u8]>>,
    {
        let data = data.into();
        self.connection
        .send(Request::Put { .. } )
        //
}impl Encoder for CommandCodec {
    type Item = Request;
    type Error = failure::Error;
    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> 
        Result<(), Self::Error> {
        // serialize request
    }
}impl Decoder for CommandCodec {
    type Item = AnyResponse;
    type Error = Decode;
    fn decode(&mut self, src: &mut BytesMut) -> 
        Result<Option<Self::Item>, Self::Error> {
        // parse bytes into Response
    }
}pub fn put<D>(self, priority: u32, delay: u32, ttr: u32, data: D) 
    -> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
    where
        D: Into<Cow<'static, [u8]>>,
    {
        // After making the request
        .send(Request::Put { .. } )
        .and_then(|conn| { handle_response!(...) })
}macro_rules! handle_response {
    ($input:ident, $mapping:tt, $error_mapping:tt) => {
        $input.into_future().then(|val| match val {
            Ok((Some(val), conn)) => // handle success case,
            // None is only returned when the stream is closed
            Ok((None, _)) => // stream closed unexpectedly,
            Err((e, conn)) => // some error occurred.,
        })
    };
}