2010-12-03 74 views
6

我有,比方說,有1000個可觀察數據。現在我想將所有事件聚合到一個新的可觀察事件中,一旦所有其他事件都發送了事件,就會觸發OnNext事件。使用Rx的最佳方式是什麼?將大量的可觀察數據彙總到新的可觀察數據中

更新: 一些很好的反饋在Rx論壇上,尤其是由戴夫塞克斯頓。他展示瞭如何創建一個需要多個可觀察對象的Zip擴展方法:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/

+0

1000個觀察對象的所有類型都一樣嗎?你對聚合觀察值的類型是什麼? – 2010-12-03 19:28:57

+0

所有1000個觀察對象都是相同的類型,新的聚合可以是一個新類型。例如。事件變爲AggregateEvent。 – lukebuehler 2010-12-03 19:33:51

回答

2

F#中有一個MailboxProcessor ...我將在C#中使用SynchronizationContext來達到同樣的目的。給我幾分鐘,我會寫一個例子。

另外:這裏是我在F#中的代碼,它做了類似的事情......這將會花費更多的精力,但仍然可以在C#中用Rx實現。

open System.Diagnostics 

let numWorkers = 20 
let asyncDelay = 100 

type MessageForMailbox = 
    | DataMessage of AsyncReplyChannel<unit> 
    | GetSummary of AsyncReplyChannel<unit> 

let main = 
    let actor = 
     MailboxProcessor.Start(fun inbox -> 
     let rec loop acc = 
      async { 
       let! message = inbox.Receive() 
       match message with 
       | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc 
       | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc 
      } 

     loop 0 // seed for acc 
    ) 

    let codeBlocks = [for i in 1..numWorkers -> 
         async { 
          do! Async.Sleep asyncDelay 
          return! actor.PostAndAsyncReply DataMessage 
         } ] 

    while true do 
     printfn "Concurrent started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore 
     actor.PostAndReply GetSummary 
     sw.Stop() 
     printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * 100)/sw.ElapsedMilliseconds) 

     printfn "Synchronous started..." 
     let sw = new Stopwatch() 
     sw.Start() 
     for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore 
     sw.Stop() 
     printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds 
     printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100)/sw.ElapsedMilliseconds) 

main 
相關問題