2017-07-19 213 views
10

關於Java9中新的Flow相關接口,我來到了article。從那裏示例代碼:流程編程:訂閱者和發佈者跟蹤計數?

public class MySubscriber<T> implements Subscriber<T> { 
    private Subscription subscription;  
    @Override 
    public void onSubscribe(Subscription subscription) { 
    this.subscription = subscription; 
    subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded 
    }   
    @Override 
    public void onNext(T item) { 
    System.out.println("Got : " + item); 
    subscription.request(1); //a value of Long.MAX_VALUE may be considered as effectively unbounded 
    } 

正如你所看到的,onNext()請求一個新項目推。

現在我想知道:

  • 如果onSubscribe()要求,說5個項目
  • 並且第一個項目被交付後,request(1)被稱爲像上面

目前預計到服務器發送

  • 5項(5請求-1發送+1必要相關捐資)
  • 或一個項目(因爲以前的請求被通過新的請求「丟棄」)

換句話說:當request()叫了幾次,做這些數字加起來;或之前的請求是否被「丟棄」?

通向問題標題 - 用戶是否需要跟蹤收到的項目,以避免在某個時候請求「太多」項目。

+6

似乎將增加:[_Adds給定的數字'N'項目,以目前的** ** unfulfille爲d這subscription_需求(http://download.java.net/java/jdk9/docs/ api/java/util/concurrent/Flow.Subscription.html#request-long-) –

+0

@SotiriosDelimanolis我認爲你應該寫更多的答案,單獨評論真的很有幫助。在別處表達我的感激之情;-) – GhostCat

回答

6

由於索蒂里奧斯points out,則request method's Javadoc國家(重點煤礦):

添加給定的項目爲這個訂閱目前未實現的需求數量n。如果n小於或等於零,則訂閱服務器將收到帶有IllegalArgumentException參數的onError信號。否則,訂戶將收到最多n附加上的下一個調用(或更少,如果終止)。

因此,答案顯然是肯定的,用戶需要跟蹤項目的。實際上,這就是機制的全部重點。一些背景:request方法意在允許訂戶應用backpressure,通知上游組件它已經過載並且「需要中斷」。因此,訂戶的(和只有它的)任務仔細審查何時以及要求多少新項目。在這一行中,它不能「重新考慮」並減少要接收的項目數量。

降低數量也將使在這個意義上,完全請求的項目數量可能會突然降低(因爲它代表它只能增加),發佈者和訂閱「非單調」之間的通信。這不僅在抽象意義上令人討厭,而且帶來了具體的一致性問題:當訂戶突然將請求的項目數量減少到1時,發佈商可能正在交付幾件商品 - 現在呢?

3

雖然Java 9沒有實現Reactive Streams API,但它提供了幾乎相同的API並定義了相應的行爲。

而在Reactive Streams specification這個相加由以下定義:

1.1對於出版者:

onNext信號的由發佈者發送到訂戶的總數必須小於或等於該訂戶在任何時間所請求的元素總數。

以及用於3.8認購:

雖然認購沒有被取消,Subscription.request(長N)必須註冊的附加元件給定數量的待產生的相應的用戶。

所以Java堅持那是無流API中給出的規範。

相關問題