2017-08-07 38 views
0

我想連接用戶和反應堆通量。然而我的小例子並不產生任何輸出:如何將訂戶與reactor.core.publisher.Flux連接?

public static void main(String[] args) throws InterruptedException { 
    Flux.just("a", "b", "c") 
      .subscribe(new BaseSubscriber<String>() { 
       @Override 
       protected void hookOnSubscribe(Subscription ignored) { 
       } 

       @Override 
       protected void hookOnNext(String value) { 
        System.out.print(value); 
       } 
      }); 
    Thread.sleep(1000); 
} 

我嘗試同樣有org.reactivestreams.Subscriber

Flux.just("a", "b", "c") 
      .subscribe(new Subscriber<String>() { 
       @Override 
       public void onSubscribe(Subscription s) { 
       } 

       @Override 
       public void onNext(String s) { 
        System.out.println(s); 
       } 

       @Override 
       public void onError(Throwable t) { 
       } 

       @Override 
       public void onComplete() { 
       } 
      }); 

但同樣,沒有輸出。

如果我嘗試從最簡單的:

Flux.just("a", "b", "c").subscribe(System.out::println); 

...這將產生預期的輸出:

a 
b 
c 

我如何使用用戶從流量接收值?

回答

1

關鍵是request從源頭獲得更多值。直到那個電話沒有發生。

實施例與BaseSubscriber

Flux<String> source = Flux.just("a", "b", "c"); 
source.subscribe(new BaseSubscriber<String>() { 
      @Override 
      protected void hookOnSubscribe(Subscription subscription) { 
       request(1); // <-- here 
      } 

      @Override 
      protected void hookOnNext(String value) { 
       request(1); // <-- here 
       System.out.println(value); 
      } 
     }); 

並且用標準Subscriber相同:

Flux<String> source = Flux.just("a", "b", "c"); 
source.subscribe(new Subscriber<String>() { 
    private Subscription subscription; 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     this.subscription = subscription; 
     subscription.request(1); // <-- here 
    } 

    @Override 
    public void onNext(String s) { 
     subscription.request(1); // <-- here 
     System.out.println(s); 
    } 

    @Override 
    public void onError(Throwable t) { 
    } 

    @Override 
    public void onComplete() { 
    } 
}); 
+1

這是正確的。另外,如果你希望源儘可能快地發出,你可以在'BaseSubscriber.hookOnSubscribe'中做'requestUnbounded()'(之後不需要更多的'request') –