2016-07-29 47 views
1

簡而言之,是否有任何解決方案可以解決RxJava中的背壓問題,而無需藉助丟棄物品,序列化操作或無限制緩衝?處理背壓而不丟棄物品或在RxJava中序列化

考慮以下任務作爲何時可能有用的示例。從磁盤

  1. 將數據讀入存儲器
  2. 壓縮數據
  3. 通過網絡

的直接的方法是做在單個後臺線程的所有任務順序地傳輸壓縮數據,作爲在:

observeBlocksOfFileContents(file). 
    .subscribeOn(backgroundScheduler) 
    .map(compressBlock) 
    .subscribe(transmitBlock); 

雖然這工作沒有問題,從性能Ë角度看,它是次優的運行時間是所有三個業務的總和,因爲它在並行運行的,而不是最大的人:

observeBlocksOfFileContents(file). 
    .subscribeOn(diskScheduler) 
    .observeOn(cpuScheduler) 
    .map(compressBlock) 
    .observeOn(networkScheduler) 
    .subscribe(transmitBlock); 

如果數據是從讀這可以但是會由於背壓磁盤比它能被壓縮和傳輸的速度快。通常的反壓解決方案是不可取的,原因如下:

  1. 掉落物品:文件必須完全無缺件
  2. 序列化在單個線程傳輸:流水線性能的改善丟失
  3. 調用棧阻塞:not supported in RxJava
  4. 增加observeOn緩衝區:內存消耗可能會成爲幾十倍的文件大小
  5. 重新實現observeOn沒有MissingBackpressureException:大量的工作,打破流暢API

還有其他解決方案嗎?或者這是從根本上不適合ReactiveX可觀察模型的東西?

+0

我不明白如果從'physicalFileContents'發出'compress'的數據來源是從物理的角度來看的話。此操作無法並行運行。 – Divers

+0

我應該指出,該文件被觀察爲許多內容塊,而不是一塊,在這種情況下,是的,它不能並行運行。我已經更新了這個問題來反映這一點。 –

回答

1

6)實現observeBlocksOfFileContents,使其支持背壓。

文件系統已經拉式(InputStream.read()發生在你想讓它和你不拋出),所以認爲合理的塊大小和讀取,在每個請求:

Observable.create(SyncOnSubscribe.createStateful(
    () -> new FileInputStream("file.dat") 
    (in, out) -> { 
     byte[] buf = new byte[4096]; 
     int r = in.read(buf); 
     if (r < 0) { 
      out.onCompleted(); 
     } else { 
      if (r == buf.length) { 
       out.onNext(buf); 
      } else { 
       byte[] buf2 = new byte[r]; 
       System.arraycopy(buf, 0, buf2, 0, r); 
       out.onNext(buf2); 
      } 
     } 

    }, 
    in -> in.close() 
)); 

(爲簡潔起見,省略了try-catch。)