2016-11-12 31 views
2

RxJava中的背壓並不是真正的反壓,而只是忽略了一些元素。RxJava中REAL背壓的最佳實施

但是如果我不能釋放任何元素而且我需要以某種方式減慢情緒?

RxJava不會影響元素意志,所以開發者需要自己實現它。但是如何?

想到最簡單的方法就是使用一些計數器,在完成時遞增和遞減。

就像是:

public static void sleep(int ms) { 
    try { 
     Thread.sleep(ms); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

public static void main(String[] args) throws InterruptedException { 

    AtomicInteger counter = new AtomicInteger(); 

    Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1)); 
    Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5)); 

    Observable.create(s -> { 
     while (!s.isUnsubscribed()) { 
      if (counter.get() < 100) { 
       s.onNext(Math.random()); 
       counter.incrementAndGet(); 
      } else { 
       sleep(100); 
      } 
     } 
    }).subscribeOn(sA) 
      .flatMap(r -> 
        Observable.just(r) 
          .subscribeOn(sB) 
          .doOnNext(x -> sleep(1000)) 
          .doOnNext(x -> counter.decrementAndGet()) 
      ) 
      .subscribe(); 
} 

但我覺得這樣很可憐。有更好的解決方案嗎?

回答

-1

正如你自己提到的,這實際上與RxJava無關。
如果你最終必須處理所有的事件,但要做到這一點,在自己的節奏,用隊列:

ExecutorService emiter = Executors.newSingleThreadExecutor(); 
    ScheduledExecutorService workers = Executors.newScheduledThreadPool(4); 
    BlockingQueue<String> events = new LinkedBlockingQueue<>(); 


    emiter.submit(() -> { 
     System.out.println("I'll send 100 events as fast as I can"); 

     for (int i = 0; i < 100; i++) { 
      try { 
       events.put(UUID.randomUUID().toString()); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    workers.scheduleWithFixedDelay(
      () -> { 
       String result = null; 
       try { 
        result = events.take(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 

       System.out.println(String.format("I don't care, got %s only now", result)); 
      }, 0, 1, TimeUnit.SECONDS 
    ); 
+1

隊列沒有被反壓。背壓點是向生產者發出信號以減緩排放。這可能不總是可能的,但在絕大多數情況下,這是可能的。這就是爲什麼在JDK9的新反應流規範中顯式反壓。 https://github.com/reactive-streams/reactive-streams-jvm – kaqqao

2

好了,背壓RxJava是不是真實的背壓

RxJava的背壓通過請求通道實現後續生產者和消費者之間的非阻塞合作。消費者通過request()請求一定數量的元素,並且製作者通過onNext創建/生成/發出至多該項目的數量,有時延遲在onNext之間。

但只忽略了一些元素集。

只有當您明確告訴RxJava刪除任何溢出時,纔會發生這種情況。

RxJava不會影響元素意志,所以開發者需要自己實現它。但是如何?

使用Observable.create需要有關如何實施非阻塞反壓的實用知識,實際上不建議圖書館用戶使用。 RxJava有很多方法可以讓你啓用背壓,流無併發症:

Observable.range(1, 100) 
.map(v -> Math.random()) 
.subscribeOn(sA) 
.flatMap(v -> 
    Observable.just(v).subscribeOn(sB) 
    .doOnNext(x -> sleep(1000)) 
) 
.subscribe(); 

Observable.create(SyncOnSubscribe.createStateless(
    o -> o.onNext(Math.random()) 
) 
.subscribeOn(sA) 
... 
+0

我讀過有關背壓的話題。我正在尋求減慢我的例子的元素選擇的方式。假設我正在通過光標從數據庫中讀取記錄,並且我需要放慢它,導致系統進程記錄速度較慢,然後生成器發出它們。 – corvax

+0

這是基於拉動的,並且拉動速度與下游可以消耗的速度一樣快。 – akarnokd