我和我的同事有爭議。我們正在編寫處理大量數據的.NET應用程序。它接收數據元素,根據某些標準將它們的子集分組成塊,並處理這些塊。我應該在我的接口公開的IObservable <T>?
比方說,我們有Foo
抵達的一些源類型的數據項(從網絡,例如)一個接一個。我們希望收集亞Foo
類型的相關對象的,從Bar
類型的每個這樣的子集和處理的對象構建Bar
類型的對象。
我們中提出了以下設計。它的主題是直接從我們組件的接口暴露IObservable<T>
對象。
// ********* Interfaces **********
interface IFooSource
{
// this is the event-stream of objects of type Foo
IObservable<Foo> FooArrivals { get; }
}
interface IBarSource
{
// this is the event-stream of objects of type Bar
IObservable<Bar> BarArrivals { get; }
}
/********* Implementations *********
class FooSource : IFooSource
{
// Here we put logic that receives Foo objects from the network and publishes them to the FooArrivals event stream.
}
class FooSubsetsToBarConverter : IBarSource
{
IFooSource fooSource;
IObservable<Bar> BarArrivals
{
get
{
// Do some fancy Rx operators on fooSource.FooArrivals, like Buffer, Window, Join and others and return IObservable<Bar>
}
}
}
// this class will subscribe to the bar source and do processing
class BarsProcessor
{
BarsProcessor(IBarSource barSource);
void Subscribe();
}
// ******************* Main ************************
class Program
{
public static void Main(string[] args)
{
var fooSource = FooSourceFactory.Create();
var barsProcessor = BarsProcessorFactory.Create(fooSource) // this will create FooSubsetToBarConverter and BarsProcessor
barsProcessor.Subscribe();
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
其他建議,它的主題是用我們自己的發佈/訂閱接口和僅在需要時使用的Rx的實現內的另一個設計。
//********** interfaces *********
interface IPublisher<T>
{
void Subscribe(ISubscriber<T> subscriber);
}
interface ISubscriber<T>
{
Action<T> Callback { get; }
}
//********** implementations *********
class FooSource : IPublisher<Foo>
{
public void Subscribe(ISubscriber<Foo> subscriber) { /* ... */ }
// here we put logic that receives Foo objects from some source (the network?) publishes them to the registered subscribers
}
class FooSubsetsToBarConverter : ISubscriber<Foo>, IPublisher<Bar>
{
void Callback(Foo foo)
{
// here we put logic that aggregates Foo objects and publishes Bars when we have received a subset of Foos that match our criteria
// maybe we use Rx here internally.
}
public void Subscribe(ISubscriber<Bar> subscriber) { /* ... */ }
}
class BarsProcessor : ISubscriber<Bar>
{
void Callback(Bar bar)
{
// here we put code that processes Bar objects
}
}
//********** program *********
class Program
{
public static void Main(string[] args)
{
var fooSource = fooSourceFactory.Create();
var barsProcessor = barsProcessorFactory.Create(fooSource) // this will create BarsProcessor and perform all the necessary subscriptions
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
你認爲哪一個更好?揭露IObservable<T>
,使我們的組件創建新的事件流正在從RX運營商,或定義我們自己的發佈/訂閱接口,如果需要,內部使用的Rx?
這裏有一些事情要考慮有關設計:
在第一次設計了接口的消費者有Rx的整個電力在他/她的指尖,可以執行任何的Rx運營商。我們中的一個人聲稱這是一個優勢,另一個人聲稱這是一個缺點。
第二個設計允許我們使用任何發佈/訂閱架構引擎蓋下。第一個設計將我們與Rx聯繫在一起。
如果我們希望使用Rx的功率,它需要在第二個設計中更多的工作,因爲我們需要翻譯的自定義發佈/訂閱實施的Rx和背部。它需要爲每個希望進行事件處理的類編寫膠水代碼。
我喜歡你開始這個問題的方式。 「我和我的同事有爭議。」 +1。 – 2012-07-09 11:40:23
爲什麼不公開所有IObservable東西作爲*擴展方法*來處理所有「膠水代碼」。保持所有IObs與您的對象模型分離,同時提供選項。 '公共IObservable AsObservable(這個IPublisher 發佈者)'或類似的東西 –
Will
2012-07-09 12:24:04
你做得很好,以平衡的方式提出問題。 – 2012-07-09 12:46:42