Bhargav Voleti
// The producer
extern crate beanstalkd;
use beanstalkd::Beanstalkd;
fn main() {
let mut beanstalkd = Beanstalkd::localhost().unwrap();
let _ = beanstalkd.put("Hello World", 0, 0, 10000);
}
// The consumer
let mut beanstalkd = Beanstalkd::localhost().unwrap();
let (id, body) = beanstalkd.reserve().unwrap();
println!("{}", body);
let _ = beanstalkd.delete(id);
pub trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
Beanstalkd::connect(&"127.0.0.1:11300".parse()
.unwrap())
.and_then(|bean| {
bean.put(0, 1, 100, &b"update:42"[..])
.inspect(|(_, response)| {
response.as_ref().unwrap();
})
.and_then(|(bean, _)| {
// Use a particular tube
bean.using("notifications")
}).and_then(|(bean, _)| bean.put(..))
})
pub trait Stream {
type Item;
type Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>;
}
pub trait Sink {
type SinkItem;
type SinkError;
fn start_send(
&mut self,
item: Self::SinkItem
) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError>;
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError>;
}
pub trait Decoder {
type Item;
type Error: From<Error>;
fn decode(
&mut self,
src: &mut BytesMut
) -> Result<Option<Self::Item>, Self::Error>;
}
pub trait Encoder {
type Item;
type Error: From<Error>;
fn encode(
&mut self,
item: Self::Item,
dst: &mut BytesMut
) -> Result<(), Self::Error>;
}
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
Beanstalkd::connect(&"127.0.0.1:11300".parse()
.unwrap())
.and_then(|bean| {
bean.put(0, 1, 100, &b"update:42"[..])
.inspect(|(_, response)| {
response.as_ref().unwrap();
})
.and_then(|(bean, _)| {
// Use a particular tube
bean.using("notifications")
}).and_then(|(bean, _)| bean.put(...))
}),
fn setup(stream: tokio::net::TcpStream) -> Self {
let bean = Framed::new(stream, proto::CommandCodec::new())
Beanstalkd { connection: bean }
}
pub fn put<D>(self, priority: u32, delay: u32, ttr: u32, data: D)
-> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
where
D: Into<Cow<'static, [u8]>>,
{
let data = data.into();
self.connection
.send(Request::Put { .. } )
//
}
impl Encoder for CommandCodec {
type Item = Request;
type Error = failure::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) ->
Result<(), Self::Error> {
// serialize request
}
}
impl Decoder for CommandCodec {
type Item = AnyResponse;
type Error = Decode;
fn decode(&mut self, src: &mut BytesMut) ->
Result<Option<Self::Item>, Self::Error> {
// parse bytes into Response
}
}
pub fn put<D>(self, priority: u32, delay: u32, ttr: u32, data: D)
-> impl Future<Item = (Self, Result<Id, Put>), Error = failure::Error>
where
D: Into<Cow<'static, [u8]>>,
{
// After making the request
.send(Request::Put { .. } )
.and_then(|conn| { handle_response!(...) })
}
macro_rules! handle_response {
($input:ident, $mapping:tt, $error_mapping:tt) => {
$input.into_future().then(|val| match val {
Ok((Some(val), conn)) => // handle success case,
// None is only returned when the stream is closed
Ok((None, _)) => // stream closed unexpectedly,
Err((e, conn)) => // some error occurred.,
})
};
}