2017-02-11 22 views
0

我意識到30491785只討論了這一點,但也就是爲什麼產生差錯的沒有真正的解釋,涉及外來代碼的例子。我想用一個5行的例子來問這個問題。'有人能理解java.lang.IllegalStateException:只有一個允許觀察員在這短短的例子嗎?

問題是什麼正在發生的事情是不會與swindows(比其他事實的類型是不同的),並沒有任何變通gwindows?

public class OneObservableError { 

    public static void main(String[] args) throws Exception { 
     Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS); 
     Observable<Observable<Long>> swindows = source.window(source, s -> Observable.interval(3, TimeUnit.SECONDS)); 
     Observable<GroupedObservable<Long, Long>> groups = source.groupBy(x -> x % 4); 
     Observable<Observable<Observable<Long>>> gwindows 
       = groups.map(g -> g.window(g, i -> Observable.interval(3, TimeUnit.SECONDS))); 
     //swindows.flatMap(gw->gw).subscribe(System.out::println); //Works 
     gwindows.flatMap(gw -> gw).subscribe(System.out::println); //Fails with Only one Observable allowed  
     sleep(10000); 
    } 
+0

http://stackoverflow.com/questions/30491785/rxjava-java-lang-illegalstateexception-only-one-subscriber-allowed – akarnokd

回答

0

GroupedObservable是一種單播類型的源,您不能多次使用它。問題出在你的代碼在g.window(g,...)窗口操作符嘗試訂閱兩次。使用發佈(功能)共享單個使用G:

g.publish(gs -> gs.window(gs, ...).flatMap(gw -> gw))... 
+0

是的,這個作品。 (我用g.share(),而不是g.publish因爲我無法弄清楚如何連接組。我會很感激知道該怎麼做。)還有一個問題,請。團隊有什麼特別之處?例如,如果不是使用GROUPBY我使用過濾器進行分區我流,然後在過濾的流,我不碰上錯誤操作。爲什麼過濾的流不會受到同樣的問題? –

+0

'groupBy'創建子流,'filter'不創建子流。 – akarnokd

相關問題