2017-04-04 81 views
0

我有一個特定的功能,應該發射物品,無論何時有人訂閱或不。此外,這個功能應該只執行一次,如果有人訂閱它,它不應該被執行如何只創建一次observable?

Observable<CharSequence> observable = Observable.create(subscriber -> { 

      try { 

       sseEventSource.connect(); //this should be called once when created; 

       final SseEventReader sseEventReader = sseEventSource.getEventReader(); 
       SseEventType type = sseEventReader.next(); 
       while (type != SseEventType.EOS) { 
        if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) { 
         CharSequence data = sseEventReader.getData(); 
         if (!subscriber.isUnsubscribed()) { 
          subscriber.onNext(data); 
         } 
        } 
        type = sseEventReader.next(); 
       } 
       sseEventSource.close(); 
       Log.d("SseService", "closed"); 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onCompleted(); 
       } 
      } catch (URISyntaxException | IOException e) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(e); 
       } 
      } 

回答

0

可以完成,使用Subjects,例如:

public static void main(String[] args) { 
    Main m = new Main(); 
    m.getChanges().subscribe(x -> { 
     //Data 
    }, e -> { 
     //Errors 
    }); 
    m.connect(); 
} 

private PublishSubject<CharSequence> ps = PublishSubject.create(); 

public void connect() { 
    sseEventSource.connect(); 
} 

public Observable<CharSequence> getChanges() { 
    try { 
     final SseEventReader sseEventReader = sseEventSource.getEventReader(); 
     SseEventType type = sseEventReader.next(); 
     while (type != SseEventType.EOS) { 
      if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) { 
       CharSequence data = sseEventReader.getData(); 
       ps.onNext(data); 
      } 
      type = sseEventReader.next(); 
     } 
     sseEventSource.close(); 
     ps.onComplete(); 
    } catch (URISyntaxException | IOException e) { 
     ps.onError(e); 
    } 
    return ps; 
} 

這樣,你可以訂閱/取消訂閱需要的主題更改並僅建立一次連接。如果您想收到緩衝更改,請考慮使用ReplaySubject

+0

我也忘了提及這個connect()和其他的東西應該在IO線程內執行。 –

+0

因此,在IO線程中運行 – Divers

+0

有沒有什麼辦法可以創建一些即使沒有人訂閱也會發射項目的Hot觀察?所有這些代碼應該在一個線程中一次執行。你要我在rx鏈裏面執行一部分,在其他地方執行一個peice。 –

1

嘗試運行share()

Observable<CharSequence> observable = Observable.create(subscriber -> { 

     try { 

      sseEventSource.connect(); //this should be called once when created; 

      final SseEventReader sseEventReader = sseEventSource.getEventReader(); 
      SseEventType type = sseEventReader.next(); 
      while (type != SseEventType.EOS) { 
       if (type != null && type.equals(SseEventType.DATA) && sseEventReader.getData() != null) { 
        CharSequence data = sseEventReader.getData(); 
        if (!subscriber.isUnsubscribed()) { 
         subscriber.onNext(data); 
        } 
       } 
       type = sseEventReader.next(); 
      } 
      sseEventSource.close(); 
      Log.d("SseService", "closed"); 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onCompleted(); 
      } 
     } catch (URISyntaxException | IOException e) { 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onError(e); 
      } 
     }).share() ; 

如果你真心希望一個熱點觀察到的使用publish()connect()發起觀察到,即使沒有人認購。