有很多例子來說明如何在F#中異步執行的任務,如F#如何運行幾個異步任務並等待第一次完成的結果?
[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously
但我怎麼能異步等待只有第一個結果?
例如,如果我想運行一些平行查找任務,並且希望在獲得第一個成功結果時進行更深入的搜索。
有很多例子來說明如何在F#中異步執行的任務,如F#如何運行幾個異步任務並等待第一次完成的結果?
[dowork 1; work 2]
|> Async.Parallel
|> Async.RunSynchronously
但我怎麼能異步等待只有第一個結果?
例如,如果我想運行一些平行查找任務,並且希望在獲得第一個成功結果時進行更深入的搜索。
最簡單的實現,我能想到的看起來是這樣的:
open FSharp.Control
let getOneOrOther() =
let queue = BlockingQueueAgent(1)
let async1 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(1) } |> Async.Start
let async2 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
do! queue.AsyncAdd(2) } |> Async.Start
queue.Get()
for i in 1..10 do
printfn "%d" <| getOneOrOther()
Console.ReadLine() |> ignore
它依賴於從FSharpx項目,你可能會想其他原因阻塞隊列的實現。但是如果你不想要任何依賴關係,System.Collections.Concurrent
也包含一個阻塞隊列,並且有一個不太好的接口。
對於內置取消的更一般版本,以下版本需要Seq<unit -> Async<'T>>
並返回第一個結果,取消所有其他結果。
open FSharp.Control
open System.Threading
let async1() = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 1 }
let async2() = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 2 }
let getFirst asyncs =
let queue = BlockingQueueAgent(1)
let doWork operation = async {
let! result = operation()
do! queue.AsyncAdd(result) }
let start work =
let cts = new CancellationTokenSource()
Async.Start(work, cts.Token)
cts
let cancellationTokens =
asyncs
|> Seq.map doWork
|> Seq.map start
let result = queue.Get()
cancellationTokens
|> Seq.iter (fun cts -> cts.Cancel(); cts.Dispose())
result
for i in 1..10 do
printfn "%A" <| getFirst [async1;async2]
Console.ReadLine() |> ignore
通用解決方案可以在下面的代碼片斷中找到:http://fssnip.net/dN
Async.Choice
可以嵌入任何異步工作流,就像Async.Parallel
。可選的輸出類型編碼了一個子計算可以完成而沒有令人滿意的結果的可能性。
我很驚訝沒有包含在Async中的東西。感謝您的鏈接。 –
看來,這種解決方案是很簡單的,無阻塞和適合我的情況下
let any (list: Async<'T>[])=
let tcs = new TaskCompletionSource<'T>()
list |> Array.map (fun wf->Async.Start (async{
let! res=wf
tcs.TrySetResult (res) |> ignore
}))
|> ignore
Async.AwaitTask tcs.Task
let async1 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 1 }
let async2 = async {
do! Async.Sleep (System.Random().Next(1000, 2000))
return 2 }
printfn "%d" <| ([|async1;async2|] |> any |> Async.RunSynchronously)
基於事件的另一種實現:
let Choice (asyncs: seq<Async<'T>>) : Async<'T> =
async {
let e = Event<'T>()
let cts = new System.Threading.CancellationTokenSource()
do Async.Start(
asyncs
|> Seq.map (fun a -> async { let! x = a in e.Trigger x })
|> Async.Parallel
|> Async.Ignore,
cts.Token)
let! result = Async.AwaitEvent e.Publish
cts.Cancel()
return result
}
我會使用類似:
let any asyncs =
async {
let t =
asyncs
|> Seq.map Async.StartAsTask
|> System.Threading.Tasks.Task.WhenAny
return t.Result.Result }
雖然這不是封鎖嗎?如何將'WhenAny'的結果輸入'Async.AwaitTask',使用'let! t'然後返回't.Result'? – nphx
您是否期待其他任務被中止,或只是讓他們在後臺進行? – mavnn
我不想放棄它們,但可能在將來我想將此方法擴展到WaitForAnySuccessfullAndFaiIfAllFail之類的東西。我是f#中的新成員。在斯卡拉世界,用期貨和承諾來實現這一點非常簡單。 –