2016-04-21 16 views
1

我是Rust的初學者。如何通過Rest api實現一個長時間運行的進程,在Rust中可以使用?

我有一個長期運行的IO綁定進程,我想通過REST API產生和監視。我選擇了Iron,繼此tutorial。監測意味着取得進展和最終結果。

當我產卵時,我給它一個id並將該id映射到可以獲取進度的資源。我不必確切地瞭解進展;我可以從5秒前報告進度。

我的第一次嘗試是有一個通道,通過它發送進度請求並接收狀態。我卡在存儲接收器的地方,因爲在我的理解中,它只屬於一個線程。我想把它放在請求的上下文中,但這不會工作,因爲有不同的線程處理後續請求。

在Rust裏做到這一點的慣用方法是什麼?

我有一個sample project

後來編輯

這裏是一個自包含的例子如下樣品原則回答,即地圖,每個線程更新其進度:

extern crate iron; 
extern crate router; 
extern crate rustc_serialize; 

use iron::prelude::*; 
use iron::status; 
use router::Router; 
use rustc_serialize::json; 
use std::io::Read; 
use std::sync::{Mutex, Arc}; 

use std::thread; 
use std::time::Duration; 
use std::collections::HashMap; 

#[derive(Debug, Clone, RustcEncodable, RustcDecodable)] 
pub struct Status { 
    pub progress: u64, 
    pub context: String 
} 

#[derive(RustcEncodable, RustcDecodable)] 
struct StartTask { 
    id: u64 
} 

fn start_process(status: Arc<Mutex<HashMap<u64, Status>>>, task_id: u64) { 
    let c = status.clone(); 
    thread::spawn(move || { 
     for i in 1..100 { 
      { 
       let m = &mut c.lock().unwrap(); 
       m.insert(task_id, Status{ progress: i, context: "in progress".to_string()}); 
      } 
      thread::sleep(Duration::from_secs(1)); 
     } 
     let m = &mut c.lock().unwrap(); 
     m.insert(task_id, Status{ progress: 100, context: "done".to_string()}); 
    }); 
} 

fn main() { 
    let status: Arc<Mutex<HashMap<u64, Status>>> = Arc::new(Mutex::new(HashMap::new())); 
    let status_clone: Arc<Mutex<HashMap<u64, Status>>> = status.clone(); 

    let mut router = Router::new(); 

    router.get("/:taskId", move |r: &mut Request| task_status(r, &status.lock().unwrap())); 
    router.post("/start", move |r: &mut Request| 
     start_task(r, status_clone.clone())); 

    fn task_status(req: &mut Request, statuses: & HashMap<u64,Status>) -> IronResult<Response> { 
     let ref task_id = req.extensions.get::<Router>().unwrap().find("taskId").unwrap_or("/").parse::<u64>().unwrap(); 
     let payload = json::encode(&statuses.get(&task_id)).unwrap(); 
     Ok(Response::with((status::Ok, payload))) 
    } 

    // Receive a message by POST and play it back. 
    fn start_task(request: &mut Request, statuses: Arc<Mutex<HashMap<u64, Status>>>) -> IronResult<Response> { 
     let mut payload = String::new(); 
     request.body.read_to_string(&mut payload).unwrap(); 
     let task_start_request: StartTask = json::decode(&payload).unwrap(); 
     start_process(statuses, task_start_request.id); 
     Ok(Response::with((status::Ok, json::encode(&task_start_request).unwrap()))) 
    } 

    Iron::new(router).http("localhost:3000").unwrap(); 
} 

回答

1

一種可能性是使用全局的HashMap將每個工號與進程(和結果)相關聯。這裏是簡單的例子(沒有休息的東西)

#[macro_use] 
extern crate lazy_static; 

use std::sync::Mutex; 
use std::collections::HashMap; 
use std::thread; 
use std::time::Duration; 

lazy_static! { 
    static ref PROGRESS: Mutex<HashMap<usize, usize>> = Mutex::new(HashMap::new()); 
} 

fn set_progress(id: usize, progress: usize) { 
    // insert replaces the old value if there was one. 
    PROGRESS.lock().unwrap().insert(id, progress); 
} 

fn get_progress(id: usize) -> Option<usize> { 
    PROGRESS.lock().unwrap().get(&id).cloned() 
} 

fn work(id: usize) { 
    println!("Creating {}", id); 
    set_progress(id, 0); 
    for i in 0..100 { 
     set_progress(id, i + 1); 
     // simulates work 
     thread::sleep(Duration::new(0, 50_000_000)); 
    } 
} 

fn monitor(id: usize) { 
    loop { 
     if let Some(p) = get_progress(id) { 
      if p == 100 { 
       println!("Done {}", id); 
       // to avoid leaks, remove id from PROGRESS. 
       // maybe save that the task ends in a data base. 
       return 
      } else { 
       println!("Progress {}: {}", id, p); 
      } 
     } 
     thread::sleep(Duration::new(1, 0)); 
    } 
} 

fn main() { 
    let w = thread::spawn(|| work(1)); 
    let m = thread::spawn(|| monitor(1)); 
    w.join().unwrap(); 
    m.join().unwrap(); 
} 
+0

謝謝你的回答。正如你在我的更新問題中看到的那樣,我最終做的就是這樣。我最初的想法與演員類似。我在Scala中用演員完成了一些類似的事情,開始一個長時間運行的過程意味着產生一個演員(實際上2,做一個工作,另一個保持進度),我在隨後的進度報告請求上查詢。 –

0

您需要註冊一個通道每請求線程,因爲如果克隆Receiver是可能的,如果兩個請求同時運行,則響應可能/將以錯誤線程結束。

而不是讓你的線程創建一個回答請求的通道,使用future。 A future允許您有一個對象的句柄,其中該對象尚不存在。您可以將輸入通道更改爲接收Promise,然後執行,不需要輸出通道。

+0

我不知道我明白我怎麼可以使用未來,因爲我不需要一個結果。長期運行的過程必須在要求時提供進展,而不僅僅是最終結果。 –

+0

我誤解了。 –

相關問題