2016-09-12 131 views
3

在我的應用程序中,我有一項服務跟蹤用戶位置,然後使用RxJava將其發送到服務器。如果請求成功,我收到插入的ID,然後我可以從我的本地數據庫中刪除它們。使RxJava異步任務線程安全

  1. 查詢了點數據庫發送
  2. 如果不是空的,我張貼從數據庫
  3. 所有的聚集點。如果請求成功,我從數據庫中刪除張貼的點

我的問題是,我在前一個任務結束前快速調用該observable,以便服務器接收重複點(兩個請求都是相同的點)。我需要在單線程上執行Observable以避免在前一個任務結束之前讓另一個任務查詢數據庫。我創建了一個Looper線程,但仍然,我發送重複,我不知道爲什麼。服務器請求現在似乎要等到它在執行下一個請求之前結束,但仍然在下一個請求中發送相同的點! Gahh

final StoreChangeEvent finalEvent = event; 
    Observable 
      .defer(() -> Observable.just(database.getAllPoints())) 
      .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList) 
        .map(result -> deletePoint(result)) 
        .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage()))) 
        .doOnCompleted(() -> emitStoreChange(finalEvent)) 
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribeOn(AndroidSchedulers.from(backgroundLooper))) 
      .subscribe(); 

好像database.getAllPoints()被稱爲太快......我應該添加.blocking()?

比方說,我有5點發布到服務器(A,B,C,d

  1. 我查詢數據庫和發送ABCD服務器
  2. 我reveive另一點從該裝置(點Ë
  3. 我查詢數據庫和發送(ABCDE
  4. 我成功接收來自服務器,然後我從L刪除ABCD OCAL DB
  5. 我收到第二請求成功,那麼我從本地DB

結果刪除ABCDE:ABCD是有兩倍的服務器數據庫上因爲有兩個請求具有相同點

+1

不太清楚你的意思。你可能會多次訂閱,並希望確定它們隨後被執行? –

+0

是的,只有在前一個Observable完成時才發送點,然後才查詢數據庫...我在問題底部添加了詳細信息 – Jaythaking

回答

2

選項1 - 除

您可以嘗試使用Subject - 這既充當觀察者和觀察到的事情。

訂閱它,並通知其他地方的新活動(onNext())。用戶將隨後處理事件。

我用SerializedSubject萬一你會從不同的線程調用notifyNewEvent(),否則你可以使用BehaviourSubject

SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create()) 

public void initialize() { 
    // Since you access your incoming event from doOnCompleted, 
    // need this extra flatMap function so that you can access your event 
    // outside rx java chain. 
    subject.flatMap(new Func1() { 
     @Override 
     public Observable call(StoreChangeEvent event) { 
      return Observable 
        .just(database.getAllPoints()) 
        .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList) 
          .map(result -> deletePoint(result)) 
          .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage()))) 
          .doOnCompleted(() -> emitStoreChange(finalEvent))); 
     } 
    }) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribeOn(AndroidSchedulers.from(backgroundLooper)) 
      .subscribe(); 
} 

public void notifyNewEvent(StoreChangeEvent event) { 
    subject.onNext(event); 
} 

選擇2 - 執行人

如果你不訪問UI,爲什麼還要用科目和observeOn,subscribeOn打擾。用一個線程創建一個執行程序(隨後執行所有任務),並將任務提交給它,從而充分利用RxJava的有用功能。

ExecutorService executorService = Executors.newSingleThreadExecutor(); 

public void notifyNewEvent(final StoreChangeEvent event) { 
    executorService.execute(new Runnable() { 
     public void run() { 
      Observable.just(database.getAllPoints()) 
       // Blocking, so that the thread doesn't exit 
       // and blocks on subscribe() till completion. 
       .toBlocking() 
       .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList) 
        .map(result -> deletePoint(result)) 
        .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage()))) 
        .doOnCompleted(() -> emitStoreChange(finalEvent))) 
       .subscribe(); 
     } 
    }); 
} 
+0

傳入事件不是來自.doOnComplete()。它來自我有一個調度員班,當從GPS獲取一個定位點時。對不起,這並不清楚我的代碼 – Jaythaking

+0

@Jaythaking當然,但你可以在'doOnComplete'中訪問它。沒有保存在某個地方,你無法訪問它,因爲在鏈中已經丟失(轉換)。 –

+0

我有點迷失在這裏我不得不承認......我嘗試了兩種都失敗了,我必須在請求完成後通知視圖,從而訪問UI ...我使用StoreEventChange通知我的視圖體系結構 – Jaythaking