2013-12-11 18 views
9

有很多例子來說明如何在F#中異步執行的任務,如F#如何運行幾個異步任務並等待第一次完成的結果?

[dowork 1; work 2] 
|> Async.Parallel 
|> Async.RunSynchronously 

但我怎麼能異步等待只有第一個結果?

例如,如果我想運行一些平行查找任務,並且希望在獲得第一個成功結果時進行更深入的搜索。

+0

您是否期待其他任務被中止,或只是讓他們在後臺進行? – mavnn

+0

我不想放棄它們,但可能在將來我想將此方法擴展到WaitForAnySuccessfullAndFaiIfAllFail之類的東西。我是f#中的新成員。在斯卡拉世界,用期貨和承諾來實現這一點非常簡單。 –

回答

4

最簡單的實現,我能想到的看起來是這樣的:

open FSharp.Control 

let getOneOrOther() = 
    let queue = BlockingQueueAgent(1) 
    let async1 = async { 
      do! Async.Sleep (System.Random().Next(1000, 2000)) 
      do! queue.AsyncAdd(1) } |> Async.Start 
    let async2 = async { 
      do! Async.Sleep (System.Random().Next(1000, 2000)) 
      do! queue.AsyncAdd(2) } |> Async.Start 

    queue.Get() 

for i in 1..10 do 
    printfn "%d" <| getOneOrOther() 

Console.ReadLine() |> ignore 

它依賴於從FSharpx項目,你可能會想其他原因阻塞隊列的實現。但是如果你不想要任何依賴關係,System.Collections.Concurrent也包含一個阻塞隊列,並且有一個不太好的接口。

對於內置取消的更一般版本,以下版本需要Seq<unit -> Async<'T>>並返回第一個結果,取消所有其他結果。

open FSharp.Control 
open System.Threading 

let async1() = async { 
     do! Async.Sleep (System.Random().Next(1000, 2000)) 
     return 1 } 
let async2() = async { 
     do! Async.Sleep (System.Random().Next(1000, 2000)) 
     return 2 } 

let getFirst asyncs = 
    let queue = BlockingQueueAgent(1) 
    let doWork operation = async { 
      let! result = operation() 
      do! queue.AsyncAdd(result) } 
    let start work = 
     let cts = new CancellationTokenSource() 
     Async.Start(work, cts.Token) 
     cts 
    let cancellationTokens = 
     asyncs 
     |> Seq.map doWork 
     |> Seq.map start 
    let result = queue.Get() 
    cancellationTokens 
    |> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose()) 
    result 

for i in 1..10 do 
    printfn "%A" <| getFirst [async1;async2] 

Console.ReadLine() |> ignore 
3

通用解決方案可以在下面的代碼片斷中找到:http://fssnip.net/dN

Async.Choice可以嵌入任何異步工作流,就像Async.Parallel。可選的輸出類型編碼了一個子計算可以完成而沒有令人滿意的結果的可能性。

+0

我很驚訝沒有包含在Async中的東西。感謝您的鏈接。 –

2

看來,這種解決方案是很簡單的,無阻塞和適合我的情況下

let any (list: Async<'T>[])= 
    let tcs = new TaskCompletionSource<'T>() 

    list |> Array.map (fun wf->Async.Start (async{ 
       let! res=wf 
       tcs.TrySetResult (res) |> ignore 
      })) 
     |> ignore 

    Async.AwaitTask tcs.Task 

let async1 = async { 
     do! Async.Sleep (System.Random().Next(1000, 2000)) 
     return 1 } 
let async2 = async { 
     do! Async.Sleep (System.Random().Next(1000, 2000)) 
     return 2 } 

printfn "%d" <| ([|async1;async2|] |> any |> Async.RunSynchronously) 
1

基於事件的另一種實現:

let Choice (asyncs: seq<Async<'T>>) : Async<'T> = 
    async { 
     let e = Event<'T>() 
     let cts = new System.Threading.CancellationTokenSource() 
     do Async.Start(
      asyncs 
      |> Seq.map (fun a -> async { let! x = a in e.Trigger x }) 
      |> Async.Parallel 
      |> Async.Ignore, 
      cts.Token) 
     let! result = Async.AwaitEvent e.Publish 
     cts.Cancel() 
     return result 
    } 
7

我會使用類似:

let any asyncs = 
    async { 
     let t = 
      asyncs 
      |> Seq.map Async.StartAsTask 
      |> System.Threading.Tasks.Task.WhenAny 
     return t.Result.Result } 
+1

雖然這不是封鎖嗎?如何將'WhenAny'的結果輸入'Async.AwaitTask',使用'let! t'然後返回't.Result'? – nphx

相關問題