2015-08-28 30 views
1

給定一個隊列像這樣:Scalaz流分塊多達N

val queue: Queue[Int] = async.boundedQueue[Int](1000) 

欲拉斷該隊列和它傳輸到下游水槽,在UP的組塊100。

queue.dequeue.chunk(100).to(downstreamConsumer) 

作品之類的,但如果我有說101個的消息就不會清空隊列。剩下1條消息,除非另有99個消息被推入。我希望儘可能多地從隊列中抽取100個消息,這與我的下游過程可以處理的速度一樣快。

有一個現有的組合子可用?

回答

0

對於這一點,你可能需要從它出隊時,監視隊列的大小。那麼,如果大小達到0,你就不會再等待更多的元素。實際上,您可以根據隊列的大小實施elastic批量調整。即:

val q = async.unboundedQueue[String] 

val deq:Process[Task,(String,Int)] = q.dequeue zip q.size 
val elasticChunk: Process1[(String,Int), Vector[String]] = ??? 
val downstreamConsumer : Sink[Task,Vector[String]] = ??? 

deq.pipe(elasticChunk) to downstreamConsumer 
+0

你將如何實現elasticChunk? –

+0

我實際上使用方便的q.dequeueBatch方法解決了這個問題。不知道它存在。 –