2017-10-20 26 views
1

我正在研究將我的(Android)應用程序中的某些邏輯轉換爲使用RxJava,但是我正在努力想出一種方法來執行一些更高級的邏輯。使用RxJava更新並保留現有數據

用例如下:我想向用戶展示一種Feed。 Feed包含來自不同來源的項目,例如消息,文章等。由於API限制,應用程序本身必須收集各個資源並將其顯示在一個列表中。是

例如,假設我在飼料項目如下:

class FeedItem { 
    Type feedItem; // Type of the item, e.g. article, message, etc. 
    ... 
} 

目前,該飼料是建立在一個單獨的線程,並且UI使用監聽器當飼料已更新通知。爲了讓你瞭解它是如何完成的,下面是一些(僞)Java代碼(線程和其他管理代碼爲了清楚起見而被省略)。

class FeedProducer { 
    List<FeedItem> currentData = new ArrayList(); 

    public void refreshData() { 
     for (FeedSource source: getFeedSources()) { 
      Type sourceType = source.getType(); 
      // Remove existing items 
      currentData.removeIf(item -> item.feedItem.equals(sourceType)); 
      List<FeedItem> newItems = source.produceItems(); 
      // Add the new items 
      currentData.addAll(newItems); 
      // Notify the UI things have changed 
      notifyDataChanged(currentData); 
     } 
     // Notify the UI we are done loading 
     notifyLoadingComplete(); 
    } 
} 

這種方法refreshData()將在每次用戶想要刷新數據時被調用。這樣,可以僅更新一些源,而其他源保持不變(例如通過更改返回值getFeedSources())。

這些來源也在應用程序的其他部分單獨使用;我已經將它們轉換爲Observables那裏。這使事情變得更容易,例如如果數據庫發生更改,則Observable簡單地將更改推送到UI。

因此,我的問題是如何(優雅地)將這些可觀察的源代碼合併到一個Observable中,但是前面的結果存在一個「全局」狀態。我研究過各種結合運營商,但還沒有找到我需要的。我很抱歉,如果我忽略了一些顯而易見的事情,因爲我對RxJava相當陌生。

回答

1

天真的辦法是,來電保存最後一個列表,並給它作爲參數,當您正在請求新的數據:

public class ReactiveMultipleSources { 

    // region Classes 
    public enum SourceType { 
     TYPE_ARTICLE, 
     TYPE_MESSAGE, 
     TYPE_VIDEO 
    } 

    public static class Feed { 
     private SourceType sourceType; 
     private String content; 

     Feed(SourceType sourceType, String content) { 
      this.sourceType = sourceType; 
      this.content = content; 
     } 

     SourceType getSourceType() { 
      return sourceType; 
     } 
    } 
    // endregion 

    public static void main(String[] args) throws InterruptedException { 
     final List<Feed>[] currentList = new List[]{new ArrayList()}; 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 
     System.out.println(); 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 


    } 

    private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) { 
     return Observable.fromIterable(getSourceTypes()) 
       .observeOn(Schedulers.io()) 
       // Get List<Feed> forEach sourceType 
       .concatMap(ReactiveMultipleSources::getFeedItemsBySourceType) 
       .observeOn(Schedulers.computation()) 
       // Get list of "List of Feed for sourceType", = List<List<Feed>> 
       .toList() 
       .map(lists -> { 
        for (List<Feed> list : lists) { 
         SourceType sourceType = list.get(0).getSourceType(); 
         // Remove items of currentFeed whose sourceType has new List<Feed> 
         currentFeed.removeIf(temp -> temp.getSourceType() == sourceType); 
         // Add new items 
         currentFeed.addAll(list); 
        } 
        return currentFeed; 
       }) 
       .toObservable(); 
    } 

    // region Helper 
    private static List<SourceType> getSourceTypes() { 
     return new ArrayList<>(Arrays.asList(SourceType.values())); 
    } 

    private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) { 
     String content; 
     if (sourceType == SourceType.TYPE_ARTICLE) 
      content = "article "; 
     else if (sourceType == SourceType.TYPE_MESSAGE) 
      content = "message "; 
     else if (sourceType == SourceType.TYPE_VIDEO) 
      content = "video "; 
     else 
      content = "article "; 

     Feed feed1 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed2 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed3 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed4 = new Feed(sourceType, content + createRandomInt()); 

     return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4)); 
    } 

    // For simulating different items each time List<Feed> is required 
    private static int createRandomInt() { 
     return ThreadLocalRandom.current().nextInt(0, 21); 
    } 
    // endregion 
} 

輸出示例:

article 19 
article 15 
article 18 
article 18 
message 3 
message 2 
message 9 
message 1 
video 19 
video 17 
video 18 
video 11 

article 0 
article 4 
article 18 
article 15 
message 11 
message 16 
message 16 
message 4 
video 1 
video 7 
video 20 
video 2 
1

如果您有3個單獨的任務返回[0, 1],[10, 11][20, 21],您希望將它們合併到一個列表中。在這種情況下,您可以使用zip操作。

public class TestRx { 
    public static void main(String[] args) { 
     // some individual observables. 
     Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1)); 
     Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11)); 
     Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21)); 

     Observable.zip(observable1, observable2, observable3, 
       new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() { 
        @Override 
        public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) { 
         // TODO: Remove existing items 

         // merge all lists 
         List<Integer> mergedList = new ArrayList<>(); 
         mergedList.addAll(list1); 
         mergedList.addAll(list2); 
         mergedList.addAll(list3); 
         return mergedList; 
        } 
       }) 
       .subscribe(new Observer<List<Integer>>() { 
        @Override 
        public void onNext(List<Integer> mergedList) { 
         System.out.println(mergedList); 
         // TODO: notifyDataChanged(mergedList) 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.out.println(throwable.toString()); 
         // TODO: handle exceptions 
        } 

        @Override 
        public void onCompleted() { 
         // TODO: notifyLoadingComplete() 
        } 
       }); 
    } 
} 

因此,它打印像這樣[0, 1, 10, 11, 20, 21]

相關問題