2014-09-02 106 views
2

是否有異步運算符來獲取由兩個異步值(Async < _>)返回的值?選擇第一個異步結果

例如,給定兩個Async < _>值,其中一個A1在1秒後返回,A2在2秒後返回,則我需要A1的結果。

原因是我想要實現用於異步序列的交織的功能,所以,如果有「定義」像這樣兩個異步序列(與空間表示時間與大理石圖):

S1 = -+-----+------------+----+ 
S2 = ---+-------+----------+-----+ 

然後我想產生的作用就像這一個新的異步序列:

S3 = -+-+---+---+--------+-+--+--+ 

交錯S1,S2,S3 =

但有兩個這樣做,我可能需要一種異步選擇運算符來選擇選擇值。

我認爲這將會像Go中的「select」一樣,您可以從兩個通道獲取第一個可用值。

TPL有一個叫做Task.WhenAny的函數 - 我可能在這裏需要類似的東西。

+1

我不確定你可以非常容易地從異步選擇中真正構建異步交錯(因爲你所描述的選擇必須運行A1和A2並丟棄A2,而你想使用它)。什麼是你的異步序列的類型? – 2014-09-02 16:13:03

+1

您是否可以將消費者實現爲[MailboxProcessor](http://msdn.microsoft.com/en-us/library/ee370357.aspx),並讓S1和S2發佈消息給它? MailboxProcessor將以S3順序獲取消息。 – 2014-09-02 16:31:27

+0

類型定義在這裏:http://tomasp.net/blog/async-sequences.aspx/。你是對的,我不想扔掉其他計算,我只是想按照他們完成的順序使用他們的結果。 – 2014-09-02 16:34:25

回答

3

我不認爲該操作符在F#庫中可用。要將這與現有操作結合使用,您可以使用Async.StartAsTask,然後使用現有的Task.WhenAny運算符。但是,我不完全確定在取消方面會有怎樣的表現。

另一種選擇是使用在F# Snippets web site上實施的Async.Choose運營商。這不是特別優雅,但它應該做的伎倆!爲了使答案獨立,下面附上代碼。

/// Creates an asynchronous workflow that non-deterministically returns the 
/// result of one of the two specified workflows (the one that completes 
/// first). This is similar to Task.WaitAny. 
static member Choose(a, b) : Async<'T> = 
    Async.FromContinuations(fun (cont, econt, ccont) -> 
     // Results from the two 
     let result1 = ref (Choice1Of3()) 
     let result2 = ref (Choice1Of3()) 
     let handled = ref false 
     let lockObj = new obj() 
     let synchronized f = lock lockObj f 

     // Called when one of the workflows completes 
     let complete() = 
     let op = 
      synchronized (fun() -> 
      // If we already handled result (and called continuation) 
      // then ignore. Otherwise, if the computation succeeds, then 
      // run the continuation and mark state as handled. 
      // Only throw if both workflows failed. 
      match !handled, !result1, !result2 with 
      | true, _, _ -> ignore 
      | false, (Choice2Of3 value), _ 
      | false, _, (Choice2Of3 value) -> 
       handled := true 
       (fun() -> cont value) 
      | false, Choice3Of3 e1, Choice3Of3 e2 -> 
       handled := true; 
       (fun() -> 
        econt (new AggregateException 
           ("Both clauses of a choice failed.", [| e1; e2 |]))) 
      | false, Choice1Of3 _, Choice3Of3 _ 
      | false, Choice3Of3 _, Choice1Of3 _ 
      | false, Choice1Of3 _, Choice1Of3 _ -> ignore) 
     op() 

     // Run a workflow and write result (or exception to a ref cell 
     let run resCell workflow = async { 
     try 
      let! res = workflow 
      synchronized (fun() -> resCell := Choice2Of3 res) 
     with e -> 
      synchronized (fun() -> resCell := Choice3Of3 e) 
     complete() } 

     // Start both work items in thread pool 
     Async.Start(run result1 a) 
     Async.Start(run result2 b)) 
3

托馬斯已經回答了精確的問題。不過,您可能有興趣知道我的F#的Hopac庫直接支持Concurrent ML風格的一階高階選擇性事件,名爲alternatives,它直接提供choose -combinator並提供比Go更具表現力的併發抽象機制選擇語句。

關於交叉兩個異步序列的更具體的問題,我最近開始嘗試關於如何使用Hopac完成Rx風格編程的想法。我提出的一種可能的方法是定義一種短暫的事件流。你可以在這裏找到了實驗代碼:

正如你所看到的,一個事件定義流的操作是merge。你在尋找的內容可能在語義上略有不同,但是使用Hopac風格的替代方案(或併發ML風格的事件)可能很簡單。

+0

這看起來有趣的Vesa。我將不得不對此進行試驗。 – 2014-09-04 16:52:28