2010-02-11 46 views
4

目前,我正在使用RX Framework來實現類似工作流的消息處理管道。本質上,我有一個消息生產者(反序列化網絡消息和調用一個主題的OnNext()),我有幾個消費者。RX IObservable作爲管道

注意:如果和變換是擴展方法,我編碼只是返回一個IObservable。

消費者做類似如下:

var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x) 
           .Where(y => y.Value > 5) 
           .Select(y => y.ComplexObject) 
           .If(z => z.IsPaid, respond(z)) 
           .Do(z => SendError(z)); 

commerceRequest然後由另一個類似的管道消耗,這種情況持續下去,直到它與人的最終管道調用Subscribe()結束頂部。我遇到的問題是來自基地的消息不會傳播,除非直接在消息上直接調用訂閱。

如何將消息推送到堆棧頂部?我知道這是一種非正統的方法,但我覺得它使得代碼非常容易理解消息的發生。任何人都可以提出另一種方式來做同樣的事情,如果你覺得這是一個完全可怕的想法?

+0

是的......在我看來,這種一般風格的面向對象的任何東西都會看起來有點流水。這是一件好事! – kyoryu 2010-02-15 04:48:17

回答

1

如果沒有訂戶,他們爲什麼要經過管道?如果您的某個中間步驟對其副作用很有用(即使沒有其他用戶,您也希望它們運行),您應該將副作用操作重寫爲用戶。

如果您想繼續連鎖,您也可以作爲傳遞操作(或tee,如果您願意的話)做副作用。