2
我試圖編寫一個函數(OnceAsync f),它確保在服務器(即多線程環境)上只運行一次異步函數。我認爲這很容易,但它很快變得複雜(鎖,忙等待!!)OnceAsync:一次運行f#異步函數一次
這是我的解決方案,但我認爲它是過度設計的;一定會有更好的辦法。這應該在FSI工作:
let locked_counter init =
let c = ref init
fun x -> lock c <| fun() ->
c := !c + x
!c
let wait_until finished = async {
while not(finished()) do
do! Async.Sleep(1000)
}
let OnceAsync f =
// - ensure that the async function, f, is only called once
// - this function always returns the value, f()
let mutable res = None
let lock_inc = locked_counter 0
async {
let count = lock_inc 1
match res, count with
| None, 1 -> // 1st run
let! r = f
res <- Some r
| None, _ -> // nth run, wait for 1st run to finish
do! wait_until (fun() -> res.IsSome)
| _ ->() // 1st run done, return result
return res.Value
}
您可以使用此代碼來測試是否OnceAsync是正確的:
let test() =
let mutable count = 0
let initUser id = async {
do! Async.Sleep 1000 // simulate work
count <- count + 1
return count
}
//let fmem1 = (initUser "1234")
let fmem1 = OnceAsync (initUser "1234")
async {
let ps = Seq.init 20 (fun i -> fmem1)
let! rs = ps |> Async.Parallel
printfn "rs = %A" rs // outputs: [|1; 1; 1; 1; 1; ....; 1|]
}
test() |> Async.Start
不錯!我希望擺脫鎖定和等待,但這比我所擁有的要好得多。 – Ray
@Ray我添加了一個'MailboxProcessor'示例,但是IIRC它確實在引擎蓋下使用了鎖,所以我不知道是否可以完全脫離它們。 –
看起來我無法擺脫鎖。 TaskCompletionSource解決方案起作用。雖然我不能使用Interlocked.Increment;我只會製作我自己的版本。 – Ray