2012-07-23 70 views
4

CancellationTokenSource對象的CancellationTokenSource成員「傳遞取消請求」,我認爲這意味着它是火併忘記,並且不會等到取消完成爲止(例如,所有異常處理程序已經運行)。這很好,但我需要等到一個出色的異步完全取消之後再創建另一個異步。有沒有簡單的方法來完成這個?等待取消異步工作流程

回答

4

我不認爲有任何直接的辦法,使用標準庫函數從F#異步庫。其中,當工作流運行的回調最近的操作我們Async.TryCancelled(實際上可以)取消,但發送從回調的消息到開始工作流的代碼必須由手工完成。

這是比較容易的使用事件和從F#異步擴展,我寫(也包含在FSharpX包)的擴展來解決 - 擴展名是GuardedAwaitObservable可用於等待一個事件的發生(這可通過一些操作立即觸發)。

以下Async.StartCancellable方法採用異步工作流程,並返回Async<Async<unit>>。當您綁定在外工作流程,它開始參數(如Async.StartChild),當您綁定在返回的內部工作流程,它取消了計算和等待,直到它實際上是取消:

open System.Threading 

module Async = 
    /// Returns an asynchronous workflow 'Async<Async<unit>>'. When called 
    /// using 'let!', it starts the workflow provided as an argument and returns 
    /// a token that can be used to cancel the started work - this is an 
    /// (asynchronously) blocking operation that waits until the workflow 
    /// is actually cancelled 
    let StartCancellable work = async { 
    let cts = new CancellationTokenSource() 
    // Creates an event used for notification 
    let evt = new Event<_>() 
    // Wrap the workflow with TryCancelled and notify when cancelled 
    Async.Start(Async.TryCancelled(work, ignore >> evt.Trigger), cts.Token) 
    // Return a workflow that waits for 'evt' and triggers 'Cancel' 
    // after it attaches the event handler (to avoid missing event occurrence) 
    let waitForCancel = Async.GuardedAwaitObservable evt.Publish cts.Cancel 
    return async.TryFinally(waitForCancel, cts.Dispose) } 

編輯包裹結果TryFinally由Jon的建議處置CancellationTokenSource的。我認爲這應該足以確保它被正確處置。

下面是一個使用該方法的例子。功能是一個簡單的工作流程,我用於測試。的代碼的其餘部分啓動它,等待5.5秒,然後取消它:

/// Sample workflow that repeatedly starts and stops long running operation 
let loop = async { 
    for i in 0 .. 9999 do 
    printfn "Starting: %d" i 
    do! Async.Sleep(1000) 
    printfn "Done: %d" i } 

// Start the 'loop' workflow, wait for 5.5 seconds and then 
// cancel it and wait until it finishes current operation 
async { let! cancelToken = Async.StartCancellable(loop) 
     printfn "started" 
     do! Async.Sleep(5500) 
     printfn "cancelling" 
     do! cancelToken 
     printfn "done" } 
|> Async.Start 

爲了完整起見,從用FSharpX必要的定義中,樣品是here on F# snippets

+1

你應該處理'CancellationTokenSource'嗎? – 2012-07-23 13:23:19

+0

我認爲這很重要。我曾經寫過關於'FSharp.Core'中的泄漏,我認爲這是由於完全相同的問題導致的,而不是處置CTS:http://t0yv0.blogspot.com/2011/12/solving-f-asyncstartchild-leak- futures.html – t0yv0 2012-07-23 14:17:10

+0

@JonHarrop這是一個很好的觀點。我不確定在這種情況下是否會導致泄漏,但最好調用Dispose。在計算被取消(並且取消完成)之後,我編輯了在終結器中調用「Dispose」的答案。 – 2012-07-23 14:44:26

4

這應該不難給予使用方便同步原語。我特別喜歡一次性寫入「邏輯」的變量:

type Logic<'T> = 
    new : unit -> Logic<'T> 
    member Set : 'T -> unit 
    member Await : Async<'T> 

這是很容易包裹一個異步設置完成時邏輯變量,然後等待就可以了,例如:

type IWork = 
    abstract member Cancel : unit -> Async<unit> 

let startWork (work: Async<unit>) = 
    let v = Logic<unit>() 
    let s = new CancellationTokenSource() 
    let main = async.TryFinally(work, fun() -> s.Dispose(); v.Set()) 
    Async.Start(main, s.Token) 
    { 
     new IWork with 
      member this.Cancel() = s.Cancel(); v.Await 
    } 

一種可能的邏輯變量的實現可能是:

type LogicState<'T> = 
    | New 
    | Value of 'T 
    | Waiting of ('T -> unit) 

[<Sealed>] 
type Logic<'T>() = 
    let lockRoot = obj() 
    let mutable st = New 
    let update up = 
     let k = 
      lock lockRoot <| fun() -> 
       let (n, k) = up st 
       st <- n 
       k 
     k() 

    let wait (k: 'T -> unit) = 
     update <| function 
      | New -> (Waiting k, ignore) 
      | Value value as st -> (st, fun() -> k value) 
      | Waiting f -> (Waiting (fun x -> f x; k x), ignore) 

    let await = 
     Async.FromContinuations(fun (ok, _, _) -> wait ok) 

    member this.Set<'T>(value: 'T) = 
     update <| function 
      | New -> (Value value, ignore) 
      | Value _ as st -> (st, ignore) 
      | Waiting f as st -> (Value value, fun() -> f value) 

    member this.Await = await