2017-01-02 90 views
0

我想並行加載用戶對象。RxJava:減少不按預期工作

final User user = new User(); 
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
      .flatMap(field -> getOrchestrator(user, field)) 
      .scan(new User(), (finalUser, event) -> { 
       finalUser = event; 
       return finalUser; 
      }); 

掃描確實發出三個用戶對象,因爲reduce根本不發射任何項目?我在這裏做錯了什麼。

final User user = new User(); 
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
      .flatMap(field -> getOrchestrator(user, field)) 
      .reduce(new User(), (finalUser, event) -> { 
       finalUser = event; 
       return finalUser; 
      }); 

getOrchestrator返回Observable。任何幫助將不勝感激。

下面是完整的代碼片斷

public class Orchestrator { 
    private String userId; 

    public Orchestrator(final String userId) { 
     this.userId = userId; 
    } 

    public static void main(final String[] args) throws Exception { 
     final User user = new User(); 
     final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
       .flatMap(field -> getOrchestrator(user, field)) 
       .scan(new User(), (finalUser, event) -> { 
        finalUser = event; 
        return finalUser; 
       }); 

     userObs.subscribeOn(Schedulers.io()).subscribe(result -> { 
      System.out.println(result.toString()); 
     }); 

     TimeUnit.SECONDS.sleep(10); 

    } 

    private static Observable<User> getOrchestrator(final User user, final String fieldName) { 
     switch (fieldName) { 
      case "CURRENT_ADDRESS": 
       return new AddressOrchestrator().getCurrentAddress(user.getUserId()) 
         .map(currentAddress -> { 
          user.setAddress(currentAddress); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 
      case "ADDRESSES": 
       return new AddressOrchestrator().getAddresses(user.getUserId()) 
         .map(addresses -> { 
          user.setAddresses(addresses); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 

      case "NAMES": 
       return new NameOrchestrator().getNames(user.getUserId()) 
         .map(names -> { 
          user.setNames(names); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 
     } 
     return null; 
    } 

    public User getUser() { 
     final Random r = new Random(); 
     if (r.nextInt(3) % 2 == 0) { 
      return new User(); 
     } 
     throw new RuntimeException(); 
    } 
} 

每個配器返回觀測。您創建一個使用Observable.create(大紅旗,除非你真的知道自己在做什麼)不終止

public class AddressOrchestrator { 

    public Observable<List<Address>> getAddresses(final String userId) { 
     return Observable.create(s -> { 
      final Address currentAddress = this.getBaseAddress(userId); 
      final Address anotherAddress = this.getBaseAddress(userId); 
      anotherAddress.setState("NE"); 
      s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
     }); 

    } 

    public Observable<Address> getCurrentAddress(final String userId) { 
     return Observable.create(s -> s.onNext(this.getBaseAddress(userId))); 
    } 

    public Address getBaseAddress(final String userId) { 
     final Address address = new Address(); 
     address.setLine1("540 Caddo Lake Dr"); 
     address.setCity("Georgetown"); 
     address.setCountry("USA"); 
     address.setState("TX"); 

     return address; 
    } 
} 


public class NameOrchestrator { 

    public Observable<List<Name>> getNames(final String userId) { 
     return Observable.create(s -> { 
      final Name name = new Name(); 
      name.setName("Vanchi"); 
      final Name formerName = new Name(); 
      formerName.setName("Vanchinathan"); 
      s.onNext(Arrays.asList(name, formerName)); 
     }); 
    } 
} 
+1

顯示完整的示例。對於我們所知道的您的訂閱是錯誤的。 – weston

+0

看來你的流是無限的。 –

+0

使用掃描時,流確實結束。使用減少時,它不會發射一個項目。 @weston添加了完整的代碼段 –

回答

2

你的業務流程。這導致無限的流(從某種意義上說,一個完整的事件永遠不會被髮射)。你需要的東西有點像

public Observable<List<Address>> getAddresses(final String userId) { 
     return Observable.create(s -> { 
      final Address currentAddress = this.getBaseAddress(userId); 
      final Address anotherAddress = this.getBaseAddress(userId); 
      anotherAddress.setState("NE"); 
      s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
      s.onCompleted(); 
     }); 

注意被調用的onCompleted。這將解決你的膚淺問題,但你最好擺脫Observable.create首先,並使用像Observable.defer

+0

您能否解釋創建是如何變得糟糕以及延遲如何提供幫助? –

+0

總之,當你使用'Observable'時,有很多方法搞砸了。創造'這就是爲什麼一般不建議如果你有其他選擇。 'Observable.defer'[這裏]有一篇很好的博客文章(http://blog.danlew.net/2014/10/08/grokking-rxjava-part-4)。 – JohnWowUs

1

發生這種情況是因爲scan發出所有上游項目,而reduce只有在流完成時纔會發出。

更換

public Observable<Address> getCurrentAddress(final String userId) { 
    return Observable.create(s -> s.onNext(this.getBaseAddress(userId))); 
} 

public Observable<Address> getCurrentAddress(final String userId) { 
    return Observable.fromCallable(() -> this.getBaseAddress(userId)); 
} 

public Observable<List<Address>> getAddresses(final String userId) { 
    return Observable.create(s -> { 
     final Address currentAddress = this.getBaseAddress(userId); 
     final Address anotherAddress = this.getBaseAddress(userId); 
     anotherAddress.setState("NE"); 
     s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
    }); 
} 

public Observable<List<Address>> getAddresses(final String userId) { 
    return Observable.fromCallable(() -> this.getBaseAddress(userId)) 
      .zipWith(Observable.fromCallable(() -> this.getBaseAddress(userId)) 
        .map(address -> address.setState("NE")), 
        (currentAddress, anotherAddress) -> Arrays.asList(currentAddress, anotherAddress)); 
} 

,改變你的setAddress()方法本

public Address setState(String state) { 
    this.state = state; 
    return this; 
} 

fromCallable()是一種更好的方式來創建一個Observable發射至多有一個元素,因爲它處理錯誤和背壓爲您服務。