2017-05-28 64 views
2

我正在嘗試阿卡流,但我無法得到背壓在我簡單的例子中工作。我承認沒有經歷akka(流),所以可能我錯過了一些很大的東西。瞭解阿卡流中的背壓Source.queue

我正在生產(提供隊列)整數比消耗它們更快,所以我認爲backpressure會踢。 我的目標是總是消耗最近放入隊列中的項目(這就是爲什麼我有bufferSize = 1和 源隊列上的OverflowStrategy.dropHead())。

public class SimpleStream { 
    public static void main(String[] argv) throws InterruptedException { 
     final ActorSystem system = ActorSystem.create("akka-streams"); 
     final Materializer materializer = ActorMaterializer.create(system); 

     final Procedure<Integer> slowConsumer = (i) -> { 
      System.out.println("consuming [" + i + "]"); 
      ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS); 
     }; 

     final SourceQueue<Integer> q = Sink 
       .<Integer>foreach(slowConsumer) 
       .runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer); 

     final AtomicInteger i = new AtomicInteger(0); 
     final Thread t = new Thread(() -> { 
      while (!Thread.currentThread().isInterrupted()) { 
       int n = i.incrementAndGet(); 
       q.offer(n); 
       System.out.println("produced: [" + n + "]"); 
       ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS); 
      } 
     }); 
     t.setName("ticking"); 
     t.start(); 

     // run some time... to observe the effects. 
     ThreadUtils.sleepQuietly(1, TimeUnit.HOURS); 
     t.interrupt(); 
     t.join(); 

     // eventually shutdown akka here... 
    } 
} 

不過這是結果:

produced: [1] 
consuming [1] 
produced: [2] 
produced: [3] 
consuming [2] <-- Expected to be consuming 3 here. 
produced: [4] 
produced: [5] 
consuming [3] <-- Expected to be consuming 5 here. 
produced: [6] 
produced: [7] 

請在此處忽略線程的東西,那裏只是從外部 源假獲取數據(例如,如果我不得不用這個會發生這種事一個真正的項目)。

任何想法我失蹤?

+0

背壓不適用於'Source.queue'。您可以儘可能多地調用其「提供」。你需要檢查什麼'offer'返回。你很可能希望生產者獨立於消費者隊列。看看'MergeHub'。也許它會對你更好。 – expert

回答

0

Source.queue終止背壓信號。這就是爲什麼Source.queue方法需要一個OverflowStrategy。如果背壓可以通過隊列向上遊發送,則不需要處理隊列可能溢出的情況。但是由於背壓不會傳播到隊列中,所以需要定義一個策略來處理比消費者更快的生產者。

對於典型的流,最終的SourceSink收到需求以產生更多結果。但是,從Source.queue創建的流,「最終來源」是一個隊列。這個隊列只能排空內容,如果有的話。由於上游位於offer方法的另一側,因此它不能向上遊發信號以產生更多結果。

+0

好的,有意義的是,「背壓」不能向上傳播超過隊列。在我的問題中,我稱之爲「背壓」,即通過隊列實施溢出策略。如果這個溢出策略儘管名稱起作用(即它是反壓),那也沒有問題。 –