2014-03-07 115 views
7

我真的很喜歡F#的async工作流程,但就我而言,它有一個嚴重問題:它不允許創建不應超過某個特定時間段的工作流。F#異步工作流程

使其更清晰,這裏有一個簡單的功能,我寫我自己:

let withTimeout operation timeout = async { 
    try 
     return Some <| Async.RunSynchronously (operation, timeout) 
    with :? TimeoutException -> return None 
} 

即簽名是

val withTimeout : operation:Async<'a> -> timeout:int -> Async<'a option> 

用法示例這裏:

let op = async { 
    do! Async.Sleep(1000) 
    return 1 
} 
#time 
withTimeout op 2000 |> Async.RunSynchronously;; 
// Real: 00:00:01.116, CPU: 00:00:00.015, GC gen0: 0, gen1: 0, gen2: 0 
// val it : unit option = Some 1 
withTimeout op 2000 |> Async.RunSynchronously;; 
// Real: 00:00:01.004, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 
// val it : unit option = Some 1 
withTimeout op 500 |> Async.RunSynchronously;; 
// Real: 00:00:00.569, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 
// val it : unit option = None  

你可以看到,它按預期工作。這非常好,但它也有點尷尬,我不確定它是否會出現安全問題和其他問題。也許我正在重新發明輪子,並且有非常簡潔的方式來編寫這樣的工作流程?

+1

這種方法的問題是,工作流將繼續超時後執行。你應該看看是否使用取消,然後在超時期限之後用信號通知取消令牌。這需要您的工作流程中的各個階段對取消通知做出響應。 – Lee

+0

執行'Async.RunSynchronously'時,如果底層的異步操作受I/O限制,則會阻止當前線程並失去效率。我想我還沒有看到在野外的工作實現,但我記得使用一個醜陋的黑客來做到這一點,將異步comps轉換爲observables,然後合併它們。 – MisterMetaphor

+0

@MisterMetaphor這就是爲什麼我把'RunSynchronously'換成另一個'async'的原因。我想這將允許保持併發。但這很醜,我同意。我更關心@李的話 - 我並沒有真正殺死正在運行的任務。但目前我不知道如何解決這個問題 – Rustam

回答

4

請參閱的執行this,這應該表現得類似於Task.WhenAny

通過使用它,你同樣可以實現withTimeout它是如何爲Taskhere實現:

let withTimeout dueTime comp = 
    let success = async { 
     let! x = comp 
     return (Some x) 
    } 
    let timeout = async { 
     do! Async.Delay(dueTime) 
     return None 
    } 
    Async.WhenAny(success, timeout) 

我不相信,當第一個完成,將取消其他的計算,但至少這個實現不會不必要地阻塞一個線程。

+0

+1感謝您的考慮。我會考慮如何擴展它以照顧取消 – Rustam

7

UPD:此刻的最佳選擇是Vesa A.J.K這裏提出:https://stackoverflow.com/a/26230245/1554463。隨着我的編輯的那樣:

type Async with 
    static member WithTimeout (timeout : int option) operation = 
     match timeout with 
     | Some time when time > 0 -> 
      async { 
       let! child = Async.StartChild (operation, time) 
       try 
        let! result = child 
        return Some result 
       with :? TimeoutException -> return None 
      } 
     | _ -> 
      async { 
       let! result = operation 
       return Some result 
      } 

這裏的另一種選擇:

type Async with 
    static member WithCancellation (token:CancellationToken) operation = 
     async { 
      try 
       let task = Async.StartAsTask (operation, cancellationToken = token) 
       task.Wait() 
       return Some task.Result 
      with 
       | :? TaskCanceledException -> return None 
       | :? AggregateException -> return None 
     } 

    static member WithTimeout (timeout:int option) operation = 
     match timeout with 
     | Some(time) -> 
      async { 
       use tokenSource = new CancellationTokenSource (time) 
       return! operation |> Async.WithCancellation tokenSource.Token 
      } 

     | _ -> 
      async { 
       let! res = operation 
       return Some res 
      } 

這裏我使用的.Net任務和CancellationToken

3

只需使用Async.StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>

let with_timeout timeout action = 
    async { 
    let! child = Async.StartChild(action, timeout) 
    return! child 
    }