2017-06-01 50 views
2

我試圖編寫一個函數(OnceAsync f),它確保在服務器(即多線程環境)上只運行一次異步函數。我認爲這很容易,但它很快變得複雜(鎖,忙等待!!)OnceAsync:一次運行f#異步函數一次

這是我的解決方案,但我認爲它是過度設計的;一定會有更好的辦法。這應該在FSI工作:

let locked_counter init = 
    let c = ref init 
    fun x -> lock c <| fun() -> 
     c := !c + x 
     !c 
let wait_until finished = async { 
    while not(finished()) do 
     do! Async.Sleep(1000) 
} 

let OnceAsync f = 
    // - ensure that the async function, f, is only called once 
    // - this function always returns the value, f() 
    let mutable res = None 
    let lock_inc = locked_counter 0 

    async { 
     let count = lock_inc 1 

     match res, count with 
     | None, 1 -> // 1st run 
      let! r = f 
      res <- Some r 
     | None, _ -> // nth run, wait for 1st run to finish 
      do! wait_until (fun() -> res.IsSome) 
     | _ ->()  // 1st run done, return result 

     return res.Value 
    } 

您可以使用此代碼來測試是否OnceAsync是正確的:

let test() = 
    let mutable count = 0 

    let initUser id = async { 
     do! Async.Sleep 1000 // simulate work 
     count <- count + 1 
     return count 
    } 

    //let fmem1 = (initUser "1234") 
    let fmem1 = OnceAsync (initUser "1234") 

    async { 
     let ps = Seq.init 20 (fun i -> fmem1) 
     let! rs = ps |> Async.Parallel 
     printfn "rs = %A" rs  // outputs: [|1; 1; 1; 1; 1; ....; 1|] 
    } 

test() |> Async.Start 

回答

3

如果它適合,最簡單的方法整體是使用Async.StartChild。與你的解決方案不同,即使結果從未被實際使用過,也會導致函數運行,例如,在Seq.init 0的情況下。

//let fmem1 = OnceAsync (initUser "1234") 

async { 
    let! fmem1 = Async.StartChild (initUser "1234") 
    let ps = Seq.init 20 (fun i -> fmem1) 
    let! rs = ps |> Async.Parallel 
    printfn "rs = %A" rs  // outputs: [|1; 1; 1; 1; 1; ....; 1|] 
} |> Async.RunSynchronously 

最相似的你最簡單的方法是使用一個TaskCompletionSource如下:

let OnceAsync f = 
    let count = ref 0 
    let tcs = TaskCompletionSource<_>() 
    async { 
    if Interlocked.Increment(count) = 1 then 
     let! r = f 
     tcs.SetResult r 
    return! Async.AwaitTask tcs.Task 
    } 

一個功能更強大的方法是使用一個MailboxProcessor並將它後第一次運行緩存的結果,隨後迴應所有後續請求。

let OnceAsync f = 
    let handler (agent: MailboxProcessor<AsyncReplyChannel<_>>) = 
    let rec run resultOpt = 
     async { 
     let! chan = agent.Receive() 
     let! result = 
      match resultOpt with 
      | None -> f 
      | Some result -> async.Return result 
     chan.Reply result 
     return! run (Some result) 
     } 
    run None 
    let mbp = MailboxProcessor.Start handler 
    async { return! mbp.PostAndAsyncReply id } 
+1

不錯!我希望擺脫鎖定和等待,但這比我所擁有的要好得多。 – Ray

+1

@Ray我添加了一個'MailboxProcessor'示例,但是IIRC它確實在引擎蓋下使用了鎖,所以我不知道是否可以完全脫離它們。 –

+1

看起來我無法擺脫鎖。 TaskCompletionSource解決方案起作用。雖然我不能使用Interlocked.Increment;我只會製作我自己的版本。 – Ray