我已經使用RxJava(聚合服務)編寫了一個Spring Boot微服務來實現以下簡化用例。大的情況是,當教師上傳課程內容文檔時,應該生成並保存一組問題。使用RxJava並行創建對象
- 用戶上傳文件到系統。
- 系統調用文檔服務將文檔轉換爲文本。
- 然後它會調用另一個生成服務的問題來生成給定上述文本內容的一組問題。
- 最後,將這些問題發佈到基本的CRUD微服務中進行保存。
當用戶上傳文檔時,會產生很多問題(可能會有幾百個問題)。這裏的問題是我按順序逐個發佈問題,以便CRUD服務保存它們。這會由於IO密集型網絡調用而急劇減慢運行速度,因此完成整個過程大約需要20秒。這裏是目前的代碼,假設所有的問題都是公式化的。
questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList();
private Observable<QuestionDTO> createQuestion(QuestionDTO question) {
return Observable.<QuestionDTO> create(sub -> {
QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API,
new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody();
sub.onNext(questionCreated);
sub.onCompleted();
}).doOnNext(s -> log.debug("Question was created successfully."))
.doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage()));
}
現在我的要求是將所有問題並行發佈到CRUD服務,並在完成時合併結果。另請注意,CRUD服務一次只能接受一個問題對象,且不能更改。我知道我可以使用Observable.zip
運算符來達到這個目的,但是我不知道如何在這種情況下應用它,因爲問題的實際數量不是預先確定的。如何更改第1行中的代碼,以便我可以提高應用程序的性能。任何幫助表示讚賞。
http://stackoverflow.com/a/42823151/7045114 –
不要使用create,而應該使用fromCallable/defer。如果您使用create操作符的方式不是預期的,那麼您將終止背壓(rx1) –