2017-04-13 75 views
3

我已經使用RxJava(聚合服務)編寫了一個Spring Boot微服務來實現以下簡化用例。大的情況是,當教師上傳課程內容文檔時,應該生成並保存一組問題。使用RxJava並行創建對象

  • 用戶上傳文件到系統。
  • 系統調用文檔服務將文檔轉換爲文本。
  • 然後它會調用另一個生成服務的問題來生成給定上述文本內容的一組問題。
  • 最後,將這些問題發佈到基本的CRUD微服務中進行保存。

當用戶上傳文檔時,會產生很多問題(可能會有幾百個問題)。這裏的問題是我按順序逐個發佈問題,以便CRUD服務保存它們。這會由於IO密集型網絡調用而急劇減慢運行速度,因此完成整個過程大約需要20秒。這裏是目前的代碼,假設所有的問題都是公式化的。

questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList(); 

private Observable<QuestionDTO> createQuestion(QuestionDTO question) { 
    return Observable.<QuestionDTO> create(sub -> { 
     QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API, 
       new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody(); 
     sub.onNext(questionCreated); 
     sub.onCompleted(); 
    }).doOnNext(s -> log.debug("Question was created successfully.")) 
      .doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage())); 
} 

現在我的要求是將所有問題並行發佈到CRUD服務,並在完成時合併結果。另請注意,CRUD服務一次只能接受一個問題對象,且不能更改。我知道我可以使用Observable.zip運算符來達到這個目的,但是我不知道如何在這種情況下應用它,因爲問題的實際數量不是預先確定的。如何更改第1行中的代碼,以便我可以提高應用程序的性能。任何幫助表示讚賞。

+1

http://stackoverflow.com/a/42823151/7045114 –

+0

不要使用create,而應該使用fromCallable/defer。如果您使用create操作符的方式不是預期的,那麼您將終止背壓(rx1) –

回答

1

默認情況下,flatMap中的observalbes在您訂閱它的同一個調度程序上運行。爲了並行運行你的觀察對象,你必須在計算調度器上訂閱它們。

questions.flatMapIterable(list -> list) 
     .flatMap(q -> createQuestion(q).subscribeOn(Schedulers.computation())) 
     .toList(); 

檢查this article的完整說明。

+0

非常感謝您的迴應。我在閱讀那篇文章的同時嘗試了一些例子,其中一些沒有按預期工作。無論如何,我現在有另一個問題。 createQuestion是異步執行的。以前它是在一個線程中調用的。隨着新的修改,它將在並行線程中調用。在每個問題的兩種情況下,都會有一個單獨的線索來處理封面背後的創作。由於這種變化會有巨大的收益嗎?如果是這樣如何? –

+0

@RavindraRanwala所以flatMap的工作就是爲每個物品創建一個新的Observable,並將這些observable合併爲一個。所以如果你的物品發射很快(這是你的列表),你的createQuestion工作真的很慢。然後你的createQuestion將以「並行」的方式工作。 –

+0

@PhoenixWang謝謝,我會試一試,讓你發佈性能增益。離開我的辦公室需要幾天的時間。 –