0
我正在使用RxJava和RxNetty爲Apache Mesos的新HTTP計劃程序API編寫客戶端。建模RxJava中的事件需要onComplete/onError的事件
我已經成功地創建了與RxNetty的連接,並從生成的分塊流中創建Observable<Event>
。
現在我在努力,可用於爲了送話費回Mesos要求/下降資源計劃書,承認任務狀態更新水槽等
消息模型的點將發送到發送到Mesos是一個Call
,我需要能夠提供一個onCompleted
或onError
爲每個Call
進入接收器。這是由於Mesos在發送給它的Call
上執行同步驗證。
我基本上是試圖允許以下:
final MesosSchedulerClient client = new MesosSchedulerClient();
final Observable<Event> events = client.openEventStream(subscribeCall);
final Observable<Observable<Call>> ackCalls = events
.filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid())
.zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> {
final TaskStatus status = e.getUpdate().getStatus();
final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId());
return Observable.just(ackCall)
.doOnComplete(() -> { ... })
.doOnError((e) -> { ... });
});
client.sink(ackCalls);
現在我已經想出了一個自定義對象[1]延伸主題,並指定onCompleted
和Action1<Throwable>
的Call
和Action0
爲onError
。儘管如此,如果可能的話,我寧願使用RxJava中的現有構造。我提出的示例用法[2]。
任何指導將不勝感激。
我對mesos不熟悉,不完全明白你想達到什麼目的。由於您不接受訂閱者,擴展主題似乎是不必要的;相反,您可能想使用Subscribers.create()或擴展Subscriber。 – akarnokd
感謝您的指導@akarnokd我將調查創建訂閱者。 –
感謝@akarnokd我切換到使用訂閱者。 –