2017-10-19 36 views
1

我想寫一個TCP客戶端來打印傳入的消息。我想出了下面的代碼:如何在不使用tokio_proto箱子的情況下從tokio TCP連接讀取數據?

extern crate bytes; 
extern crate futures; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::Future; 
use tokio_core::net::TcpStream; 
use tokio_core::reactor::Core; 
use tokio_io::AsyncRead; 
use bytes::BytesMut; 

fn main() { 
    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 

    let connection = TcpStream::connect(&"127.0.0.1:8081".parse().unwrap(), &handle); 

    let server = connection.and_then(move |mut stream| { 
     let mut buf = BytesMut::with_capacity(1000); 
     stream 
      .read_buf(&mut buf) 
      .map(|buf| print!("Buffer {:?}", buf)) 
      .map_err(|e| eprintln!("Error: {}", e)); 
     Ok(()) 
    }); 

    core.run(server).unwrap(); 
} 

它編譯,但它無法用Buffer NotReady錯誤。

+0

爲什麼如果你想運行一個服務器使用'TcpStream :: connect'? – Shepmaster

+0

@Shepmaster現在我正在嘗試讀取任何東西。這樣更方便 - 我只是用'ncat'運行監聽器。我認爲閱讀套接字應該類似於服務器和客戶端。 – Sergey

回答

2

鏽病是編譯型語言,這意味着你應該注意的警告,編譯器生成:

warning: unused `std::result::Result` which must be used 
    --> src/main.rs:20:9 
    | 
20 |/  stream 
21 | |    .read_buf(&mut buf) 
22 | |    .map(|buf| print!("Buffer {:?}", buf)) 
23 | |    .map_err(|e| eprintln!("Error: {}", e)); 
    | |____________________________________________________^ 
    | 
    = note: #[warn(unused_must_use)] on by default 

此外,tokio has an entire chapter dedicated to low-level IO我會假設你已經讀不煩你詳情你已經知道了。

首先我們拿connectionFuture並將其轉換爲Stream。一個流可以產生多個值 - 在這種情況下,我們爲每次成功讀取返回一個值。我們創建了AsWeGetIt來實現這個最簡單的實現。

然後我們使用Stream::for_each打印出每個數據流的值。方便的是,這將執行相應的轉換回Future,這是and_then需要的。

extern crate bytes; 
extern crate futures; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{Future, Poll, Stream}; 
use tokio_core::net::TcpStream; 
use tokio_core::reactor::Core; 
use tokio_io::AsyncRead; 
use bytes::BytesMut; 

struct AsWeGetIt<R>(R); 

impl<R> Stream for AsWeGetIt<R> 
where 
    R: AsyncRead, 
{ 
    type Item = BytesMut; 
    type Error = std::io::Error; 

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { 
     let mut buf = BytesMut::with_capacity(1000); 

     self.0 
      .read_buf(&mut buf) 
      .map(|async| async.map(|_| Some(buf))) 
    } 
} 

fn main() { 
    let mut core = Core::new().unwrap(); 
    let handle = core.handle(); 

    let address = "127.0.0.1:8081".parse().expect("Unable to parse address"); 
    let connection = TcpStream::connect(&address, &handle); 

    let client = connection 
     .and_then(|tcp_stream| { 
      AsWeGetIt(tcp_stream).for_each(|buf| { 
       println!("Buffer {:?}", buf); 
       Ok(()) 
      }) 
     }) 
     .map_err(|e| eprintln!("Error: {}", e)); 

    core.run(client).expect("Unable to run the event loop"); 
} 
相關問題