2017-01-23 45 views
1

我正在嘗試使用Future.rs來管理一個單獨的過程中的一些任務。我看到了如何等待每個創建的未來以及如何一個接一個地處理它們,但我無法在執行過程中調查未來以瞭解其狀態。我總是有錯誤:如何在未等待的情況下輪詢未來狀態?

thread 'main' panicked at 'no Task is currently running'

我想在將來的處理過程中做某件事,直到它完成。也許我沒有正確地使用它?我設法通過使用頻道使其工作,但我認爲應該有可能輪詢未來,以及何時可以得到結果。 我用它來測試它的代碼是:

fn main() { 
    println!("test future"); 
    let thread_pool = CpuPool::new(4); 
    let mut future_execution_list = vec![]; 
    let mutex = Arc::new(AtomicUsize::new(0)); 
    //create the future to process 
    for _ in 0..10 { 
     let send_mutex = mutex.clone(); 
     let future = thread_pool.spawn_fn(move || { 
      //Simulate long processing 
      thread::sleep(time::Duration::from_millis(10)); 
      let num = send_mutex.load(Ordering::Relaxed); 
      send_mutex.store(num + 1, Ordering::Relaxed); 
      let res: Result<usize,()> = Ok(num); 
      res 

     }); 
     future_execution_list.push(future); 
    } 
    // do the job 
    loop { 
     for future in &mut future_execution_list { 
      match future.poll() { 
       Ok(Async::NotReady) =>(), //do nothing 
       Ok(Async::Ready(num)) => { 
        //update task status 
        println!(" future {:?}", num); 
       } 
       Err(_) => { 
        //log error and set task status to err 
        () 
       } 
      }; 
     } 
     //do something else 
    } 
} 

所以我完成Shepmaster答案後,我的問題。您的評論非常有趣,但我仍無法找到解決我的問題的方案。我會添加一些關於我的問題的信息。我想在自動化計劃任務上同時管理多個任務。有一個循環,其中管理事件並計算任務計劃。當一個任務被安排時,它就產生了。當任務結束時,新的調度完成。在任務執行期間,管理事件。一個speudo代碼可以是:

loop { 
    event.try_recv() { ...} //manage user command for exemple 
    if (schedule) { 
     let tasks_to_spawn = schedule_task(); 
     let futures = tasks_to_spawn.map(|task| { 
      thread_pool.spawn_fn(....)}); 
     let mut one = future::select_all(futures); 
     while let Ok((value, _idx, remaining)) = one.wait() {..} //wait here 
    } 
    //depend on task end state and event set schedule to true or false. 

} 

我可以聯合調度,並像在未來的任務:

let future = schedule.and_them(|task| execute_task); 

但我仍然需要等待第一任務執行結束。 我可以把所有事情放在未來(事件管理,時間表,任務),並等待第一個像你提議的結束。我嘗試了,但是我沒有看到如何用不同的Item和Error類型來製作未來的vec。有了這個概念,我必須在線程之間管理更多的數據。事件管理和調度不必在不同的線程中執行。

我看到另一個問題,select_all採取vec的所有權。如果在執行另一個任務期間需要新的任務,我該如何改變vec並添加新的未來?

不知道你是否有一個簡單的解決方案。我在想,使用isDone()等方法在執行過程中未來的狀態很容易,而不用等待。也許是有計劃的,我沒有看到有關這方面的公關。 如果你有一個簡單的解決方案,它會很好,否則我會重新思考我的概念。

+0

有一個99.9%的機會,你做**不**要以這種方式使用原子變量。相反,你希望'fetch_add'和絕大多數人不需要'Relaxed'排序。 – Shepmaster

+0

您的權利,我只是複製/粘貼一些代碼,以表明我想從未來的執行中得到一個結果,這取決於其他期貨的執行情況。 –

+0

這是[*非常糟糕的形式*在您收到答案後更改您的問題**,特別是當更改使這些答案無效時](http://meta.stackoverflow.com/q/309237/155423)。從一開始就提出一個包含任何相關細節的好問題是提問者的問題。 – Shepmaster

回答

2

要投票Future您必須有Task。要獲得Task,您可以輪詢Future傳遞給futures::executor::spawn()。如果你重寫你的例子的loop像這樣:

futures::executor::spawn(futures::lazy(|| { 
    // existing loop goes here 
})).wait_future(); 

它運行。

至於爲什麼 a Future只能在任務中進行輪詢,我相信這樣可以讓輪詢可以撥打Task::Unpark

0

I want to do something during the future processing

據我瞭解,這就是期貨 - 東西,可以並行發生。如果你想做其他事情,那麼再創造一個未來並投入其中!

你基本上已經在做這個 - 你的每個線程都是「做別的事情」。

poll the future and when it's ready get the result

使用future::select_all,您可以合併多個期貨並獲得取其完成第一。然後由你決定等待下一個。

一種可能實現:

extern crate rand; 
extern crate futures; 
extern crate futures_cpupool; 

use rand::Rng; 
use futures::{future, Future}; 
use futures_cpupool::CpuPool; 

use std::{thread, time}; 

fn main() { 
    let thread_pool = CpuPool::new(4); 

    let futures = (0..10).map(|i| { 
     thread_pool.spawn_fn(move || -> Result<usize,()> { 
      let mut rng = rand::thread_rng(); 
      // Simulate long processing 
      let sleep_time = rng.gen_range(10, 100); 
      let sleep_time = time::Duration::from_millis(sleep_time); 
      for _ in 0..10 { 
       println!("Thread {} sleeping", i); 
       thread::sleep(sleep_time); 
      } 
      Ok(i) 
     }) 
    }); 

    let mut one = future::select_all(futures); 
    while let Ok((value, _idx, remaining)) = one.wait() { 
     println!("Future #{} finished", value); 
     if remaining.is_empty() { 
      break; 
     } 
     one = future::select_all(remaining); 
    } 
} 

在通話過程中,以wait,多事情發生的同時!這可以通過對交織的輸出中可以看出:

Thread 2 sleeping 
Thread 0 sleeping 
Thread 3 sleeping 
Thread 1 sleeping 
Thread 3 sleeping 
Thread 0 sleeping 
Thread 1 sleeping 
Thread 2 sleeping 
Thread 3 sleeping 

可以驗證的東西被並行由睡眠時間設定爲1秒爲每個線程和定時的整體方案發生。由於有10個期貨,需要1秒,並行度爲4,整個程序需要3秒才能運行。


紅利代碼審查:

  1. 不拆裝載並設置一個原子變量來實現遞增 - 存儲的值可能已經由兩個電話之間另一個線程改變。使用fetch_add
  2. 您真的在使用它們之前應該知道what the orderings mean。我不知道他們,所以我總是使用SeqCst
  3. 由於這個例子並不重要,我完全刪除了原子變量。
  4. 總是喜歡收集到Vec而不是在循環內推入。這允許更優化的分配。
  5. 在這種情況下,根本不需要Vec,因爲select_all接受任何可以變成迭代器的東西。
相關問題