2015-05-05 88 views
6

我正在嘗試使用RxJava編寫一個簡單的程序來生成無限序列的自然數。所以,我已經找到了兩種使用Observable.timer()Observable.interval()生成數字序列的方法。我不確定這些功能是否是解決這個問題的正確方法。我期待着一個簡單的函數,就像我們在Java 8中生成的函數一樣,可以生成無限自然數。使用RxJava生成無限序列的自然數

IntStream.iterate(1,value - > value +1).forEach(System.out :: println);

我嘗試使用IntStream與Observable但不能正常工作。它僅向第一個用戶發送無限數量的數字流。我怎樣才能正確生成無限自然數序列?

import rx.Observable; 
import rx.functions.Action1; 

import java.util.stream.IntStream; 

public class NaturalNumbers { 

    public static void main(String[] args) { 
     Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
      IntStream stream = IntStream.iterate(1, val -> val + 1); 
      stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber)); 
     }); 

     Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber); 
     Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber); 
     Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber); 
     naturalNumbers.subscribe(first); 
     naturalNumbers.subscribe(second); 
     naturalNumbers.subscribe(third); 

    } 
} 

回答

3

的問題是,在naturalNumbers.subscribe(first);,在OnSubscribe你實現被調用,你是在一個無限流做forEach,因此爲什麼你的程序永遠不會終止。

你可以處理它的一種方法是在不同的線程上異步訂閱它們。很容易地看到我的成果引入到睡覺流處理:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> { 
    IntStream stream = IntStream.iterate(1, i -> i + 1); 
    stream.peek(i -> { 
     try { 
      // Added to visibly see printing 
      Thread.sleep(50); 
     } catch (InterruptedException e) { 
     } 
    }).forEach(subscriber::onNext); 
}); 

final Subscription subscribe1 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(first); 
final Subscription subscribe2 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(second); 
final Subscription subscribe3 = naturalNumbers 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe(third); 

Thread.sleep(1000); 

System.out.println("Unsubscribing"); 
subscribe1.unsubscribe(); 
subscribe2.unsubscribe(); 
subscribe3.unsubscribe(); 
Thread.sleep(1000); 
System.out.println("Stopping"); 
+0

感謝邁克你的答案。如果我在創建Observable的時候調用subscribeOn方法,而不是像上面的代碼片段所示的那樣調用它三次,它會有什麼不同。我測試了它,行爲相同但仍想確認。 – Shekhar

+0

這個問題被正確識別,但這是不好的建議 - 你不應該使用'subscribeOn'解決這個問題 - 請參閱我的答案爲什麼。 –

+1

以這種方式調用'unsubscribe'會斷開用戶連接,因此它會停止接收消息,但它不會停止發生器的循環,這會持續無限地佔用CPU的能量。請參閱我的回答,瞭解如何解決故事的兩個方面。 –

2

Observable.Generate是完全被動地來解決這一類問題的運營商。我也認爲這是一個教學示例,因爲無論如何,使用迭代器可能會更好。

您的代碼會在用戶的線程上產生整個流。由於這是一個無限流,subscribe呼叫永遠不會完成。除了這個明顯的問題,取消訂閱也會有問題,因爲你沒有在你的循環中檢查它。

您想使用調度程序來解決這個問題 - 當然不要使用subscribeOn,因爲這會給所有觀察者帶來負擔。安排每個號碼的遞送至onNext - 並且作爲每個計劃操作的最後一步,安排下一個。

本質上這就是Observable.generate給你的 - 每個迭代都在提供的調度程序上進行調度(如果不指定它,缺省爲引入併發性的調度程序)。調度程序操作可以取消並避免線程匱乏。

Rx.NET解決它像這樣的(實際上有一個async/await模型好多了,但在Java中不可據我所知):

static IObservable<int> Range(int start, int count, IScheduler scheduler) 
{ 
    return Observable.Create<int>(observer => 
    { 
     return scheduler.Schedule(0, (i, self) => 
     { 
      if (i < count) 
      { 
       Console.WriteLine("Iteration {0}", i); 
       observer.OnNext(start + i); 
       self(i + 1); 
      } 
      else 
      { 
       observer.OnCompleted(); 
      } 
     }); 
    }); 
} 

兩件事情,這裏要注意:

  • 通話到Schedule將返回一個回傳給觀察者的訂閱句柄
  • 該計劃是遞歸的 - self參數是對用於調用下一次迭代的計劃程序的引用。這允許取消訂閱取消操作。

不知道這是如何在RxJava看起來,但這個想法應該是一樣的。再次,Observable.generate可能會更簡單,因爲它旨在照顧這種情況。

1

當創建無限sequencies護理應注意:

  1. 訂閱,觀察在不同的線程;否則只有服務單個用戶
  2. 一旦訂閱終止,將停止生成值;否則失控循環將使用subscribeOn()observeOn()和各種調度吃你的CPU

的第一個問題就解決了。

第二個問題最好通過使用庫提供的方法Observable.generate()Observable.fromIterable()來解決。他們做適當的檢查。

檢查:

Observable<Integer> naturalNumbers = 
     Observable.<Integer, Integer>generate(() -> 1, (s, g) -> { 
      logger.info("generating {}", s); 
      g.onNext(s); 
      return s + 1; 
     }).subscribeOn(Schedulers.newThread()); 
Disposable sub1 = naturalNumbers 
     .subscribe(v -> logger.info("1 got {}", v)); 
Disposable sub2 = naturalNumbers 
     .subscribe(v -> logger.info("2 got {}", v)); 
Disposable sub3 = naturalNumbers 
     .subscribe(v -> logger.info("3 got {}", v)); 

Thread.sleep(100); 

logger.info("unsubscribing..."); 
sub1.dispose(); 
sub2.dispose(); 
sub3.dispose(); 
Thread.sleep(1000); 

logger.info("done");