我正在嘗試阿卡流,但我無法得到背壓在我簡單的例子中工作。我承認沒有經歷akka(流),所以可能我錯過了一些很大的東西。瞭解阿卡流中的背壓Source.queue
我正在生產(提供隊列)整數比消耗它們更快,所以我認爲backpressure會踢。 我的目標是總是消耗最近放入隊列中的項目(這就是爲什麼我有bufferSize = 1和 源隊列上的OverflowStrategy.dropHead())。
public class SimpleStream {
public static void main(String[] argv) throws InterruptedException {
final ActorSystem system = ActorSystem.create("akka-streams");
final Materializer materializer = ActorMaterializer.create(system);
final Procedure<Integer> slowConsumer = (i) -> {
System.out.println("consuming [" + i + "]");
ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS);
};
final SourceQueue<Integer> q = Sink
.<Integer>foreach(slowConsumer)
.runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer);
final AtomicInteger i = new AtomicInteger(0);
final Thread t = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
int n = i.incrementAndGet();
q.offer(n);
System.out.println("produced: [" + n + "]");
ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS);
}
});
t.setName("ticking");
t.start();
// run some time... to observe the effects.
ThreadUtils.sleepQuietly(1, TimeUnit.HOURS);
t.interrupt();
t.join();
// eventually shutdown akka here...
}
}
不過這是結果:
produced: [1]
consuming [1]
produced: [2]
produced: [3]
consuming [2] <-- Expected to be consuming 3 here.
produced: [4]
produced: [5]
consuming [3] <-- Expected to be consuming 5 here.
produced: [6]
produced: [7]
請在此處忽略線程的東西,那裏只是從外部 源假獲取數據(例如,如果我不得不用這個會發生這種事一個真正的項目)。
任何想法我失蹤?
背壓不適用於'Source.queue'。您可以儘可能多地調用其「提供」。你需要檢查什麼'offer'返回。你很可能希望生產者獨立於消費者隊列。看看'MergeHub'。也許它會對你更好。 – expert