2015-11-06 68 views
0

我正在使用RxJava和RxNetty爲Apache Mesos的新HTTP計劃程序API編寫客戶端。建模RxJava中的事件需要onComplete/onError的事件

我已經成功地創建了與RxNetty的連接,並從生成的分塊流中創建Observable<Event>

現在我在努力,可用於爲了送話費回Mesos要求/下降資源計劃書,承認任務狀態更新水槽等

消息模型的點將發送到發送到Mesos是一個Call,我需要能夠提供一個onCompletedonError爲每個Call進入接收器。這是由於Mesos在發送給它的Call上執行同步驗證。

我基本上是試圖允許以下:

final MesosSchedulerClient client = new MesosSchedulerClient(); 
final Observable<Event> events = client.openEventStream(subscribeCall); 

final Observable<Observable<Call>> ackCalls = events 
    .filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid()) 
    .zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> { 
     final TaskStatus status = e.getUpdate().getStatus(); 
     final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId()); 
     return Observable.just(ackCall) 
      .doOnComplete(() -> { ... }) 
      .doOnError((e) -> { ... }); 
    }); 

client.sink(ackCalls); 

現在我已經想出了一個自定義對象[1]延伸主題,並指定onCompletedAction1<Throwable>CallAction0onError。儘管如此,如果可能的話,我寧願使用RxJava中的現有構造。我提出的示例用法[2]。

任何指導將不勝感激。

[1] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-core/src/main/java/org/apache/mesos/rx/java/SinkOperation.java#L17

[2] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-example/mesos-rxjava-example-framework/src/main/java/org/apache/mesos/rx/java/example/framework/sleepy/Main.java#L117-L124

+0

我對mesos不熟悉,不完全明白你想達到什麼目的。由於您不接受訂閱者,擴展主題似乎是不必要的;相反,您可能想使用Subscribers.create()或擴展Subscriber。 – akarnokd

+0

感謝您的指導@akarnokd我將調查創建訂閱者。 –

+0

感謝@akarnokd我切換到使用訂閱者。 –

回答

0

我結束了該解決方案是創建將處理事件流併發送請求回mesos自定義用戶。