2011-07-24 41 views
2

我寫了一系列的F#圖的搜索算法,並認爲這將是很好的採取並行的優勢。我想要並行執行多個線程,並將第一個線程的結果完成。我有一個實現,但它不漂亮。加入第一道成品線?

兩個問題:是有這種功能的標準名稱?不是Join或JoinAll,而是JoinFirst?其次,是否有更習慣的方式來做到這一點?

//implementation 
let makeAsync (locker:obj) (shared:'a option ref) (f:unit->'a) = 
    async { 
     let result = f() 
     Monitor.Enter locker 
     shared := Some result 
     Monitor.Pulse locker 
     Monitor.Exit locker 
    } 

let firstFinished test work = 
    let result = ref Option.None 
    let locker = new obj() 
    let cancel = new CancellationTokenSource()  
    work |> List.map (makeAsync locker result) |> List.map (fun a-> Async.StartAsTask(a, TaskCreationOptions.None, cancel.Token)) |> ignore 
    Monitor.Enter locker 
    while (result.Value.IsNone || (not <| test result.Value.Value)) do 
     Monitor.Wait locker |> ignore 
    Monitor.Exit locker 
    cancel.Cancel() 
    match result.Value with 
    | Some x-> x 
    | None -> failwith "Don't pass in an empty list" 
//end implentation 

//testing 
let delayReturn (ms:int) value = 
    fun()-> 
     Thread.Sleep ms 
     value 

let test() = 
    let work = [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ] 
    let result = firstFinished (fun _->true) work 
    printfn "%s" result 
+0

如果必須將結果存儲在共享的狀態,爲什麼不能讓每個工作者線程定期檢查,如果已經產生的結果,如果是這樣,結束?然後你可以使用['Async.Parallel'](http://msdn.microsoft.com/en-us/library/ee353779.aspx),你的代碼應該相對簡單。 – Daniel

+0

兩個原因,建築和實用。在架構上,我的搜索算法不需要知道並行化的細節,也不需要知道它們是否並行運行。實際上,圖搜索是大量遞歸的;共享狀態是針對單個節點的孩子的。在所有節點之間共享該狀態將會很糟糕。 – zmj

+0

我建議選擇'async'或'Task',並堅持下去,但不要混合兩者。他們是做同樣事情的兩種方式。在F#中'async'更具慣用性。 – Daniel

回答

2

隨着.NET 4任務並行庫,這就是所謂的WaitAny。例如,下面的代碼片段創建10個任務,並等待任何人來完成:

open System.Threading 

Array.init 10 (fun _ -> 
    Tasks.Task.Factory.StartNew(fun() -> 
    Thread.Sleep 1000)) 
|> Tasks.Task.WaitAny 
+0

我想等待的任何方法僅適用於任務,而不適用於任務類型。但那不會很難實現。將嘗試併發布 – Ankur

+0

WaitAny不取消其他任務:) – Ankur

+0

這似乎是在框架內最接近的東西,但我需要添加一個循環來檢查新取得的線程的結果是否通過測試函數,然後取消其他任務。希望這成熟一點,並在異步方面實現。 – zmj

3

將它的工作傳遞CancellationTokenSourcetest每個異步和有計算有效結果取消別人的第一?

let makeAsync (cancel:CancellationTokenSource) test f = 
    let rec loop() = 
    async { 
     if cancel.IsCancellationRequested then 
     return None 
     else 
     let result = f() 
     if test result then 
      cancel.Cancel() 
      return Some result 
     else return! loop() 
    } 
    loop() 

let firstFinished test work = 
    match work with 
    | [] -> invalidArg "work" "Don't pass in an empty list" 
    | _ -> 
    let cancel = new CancellationTokenSource()  
    work 
    |> Seq.map (makeAsync cancel test) 
    |> Seq.toArray 
    |> Async.Parallel 
    |> Async.RunSynchronously 
    |> Array.pick id 

這種方法使得幾個改進:1)它僅使用async(它不與Task混合,這是做同樣事情async是F#更地道)的替代; 2)除了專爲此目的設計的CancellationTokenSource以外,沒有共享狀態; 3)乾淨的函數鏈方法可以很容易地向流水線添加額外的邏輯/轉換,包括平凡啓用/禁用並行性。

2

如果您即可使用 「無擴展(Rx)的」 在你的項目中,joinFirst方法可以實現爲:

let joinFirst (f : (unit->'a) list) = 
    let c = new CancellationTokenSource() 
    let o = f |> List.map (fun i -> 
        let j = fun() -> Async.RunSynchronously (async {return i() },-1,c.Token) 
        Observable.Defer(fun() -> Observable.Start(j)) 
        ) 
      |> Observable.Amb 
    let r = o.First() 
    c.Cancel() 
    r 

用法示例:

[20..30] |> List.map (fun i -> fun() -> Thread.Sleep(i*100); printfn "%d" i; i) 
|> joinFirst |> printfn "Done %A" 
Console.Read() |> ignore 

更新:

使用郵箱處理器:

type WorkMessage<'a> = 
     Done of 'a 
    | GetFirstDone of AsyncReplyChannel<'a> 

let joinFirst (f : (unit->'a) list) = 
    let c = new CancellationTokenSource() 
    let m = MailboxProcessor<WorkMessage<'a>>.Start(
       fun mbox -> async { 
       let afterDone a m = 
        match m with 
        | GetFirstDone rc -> 
         rc.Reply(a); 
         Some(async {return()}) 
        | _ -> None 
       let getDone m = 
        match m with 
        |Done a -> 
         c.Cancel() 
         Some (async { 
           do! mbox.Scan(afterDone a) 
           }) 
        |_ -> None 
       do! mbox.Scan(getDone) 
       return() 
      }) 
    f 
    |> List.iter(fun t -> try 
          Async.RunSynchronously (async {let out = t() 
                  m.Post(Done out) 
                  return()},-1,c.Token) 
          with 
          _ ->()) 
    m.PostAndReply(fun rc -> GetFirstDone rc) 
2

不幸的是,這個由Async沒有提供內置的操作,但我仍然使用F#異步操作,因爲它們直接支持取消。當你開始使用Async.Start工作流,你可以通過它取消令牌,如果令牌被取消的工作流程將自動停止。

這意味着,你必須明確地(而不是使用Async.Parallel)啓動工作流,所以synchronizataion必須手工編寫。這裏是Async.Choice方法的簡單版本,這是否(在目前,它不處理例外):

open System.Threading 

type Microsoft.FSharp.Control.Async with 
    /// Takes several asynchronous workflows and returns 
    /// the result of the first workflow that successfuly completes 
    static member Choice(workflows) = 
    Async.FromContinuations(fun (cont, _, _) -> 
     let cts = new CancellationTokenSource() 
     let completed = ref false 
     let lockObj = new obj() 
     let synchronized f = lock lockObj f 

     /// Called when a result is available - the function uses locks 
     /// to make sure that it calls the continuation only once 
     let completeOnce res = 
     let run = 
      synchronized(fun() -> 
      if completed.Value then false 
      else completed := true; true) 
     if run then cont res 

     /// Workflow that will be started for each argument - run the 
     /// operation, cancel pending workflows and then return result 
     let runWorkflow workflow = async { 
     let! res = workflow 
     cts.Cancel() 
     completeOnce res } 

     // Start all workflows using cancellation token 
     for work in workflows do 
     Async.Start(runWorkflow work, cts.Token)) 

一旦我們寫這個操作(這是一個有點複雜,但必須只能寫入一次),解決這個問題是相當容易的。你可以寫你的操作爲異步工作流程,當第一個完成,他們將被自動取消:「第一回」

let delayReturn n s = async { 
    do! Async.Sleep(n) 
    printfn "returning %s" s 
    return s } 

Async.Choice [ delayReturn 1000 "First!"; delayReturn 5000 "Second!" ] 
|> Async.RunSynchronously 

當你運行它,它會打印只因爲第二個工作流程將被取消。

相關問題