2017-08-02 80 views
2

我正在Scala編寫應用程序,我正在使用Akka流。Akka流 - 按流中元素的數量進行過濾

在某一點上,我需要過濾掉N個元素少於N個的流。因此,例如,與N=5

Source(List(1,2,3)).via(myFilter)  // => List() 
Source(List(1,2,3,4)).via(myFilter)  // => List() 

將成爲空流,

Source(List(1,2,3,4,5)).via(myFilter) // => List(1,2,3,4,5) 
Source(List(1,2,3,4,5,6)).via(myFilter) // => List(1,2,3,4,5,6) 

將保持不變。

當然,我們無法知道流中元素的數量,直到它結束爲止,並且在推送之前等到最後纔可能不是最好的主意。

所以,相反,我已經考慮過下面的算法:

  1. 用於第一N-1的元素,只是緩衝它們,而無需進一步傳遞;
  2. 如果輸入流在到達第N個元素之前完成,則輸出空流;
  3. 如果輸入流到達第N個元素,輸出緩衝的N-1個元素,然後輸出第N個元素,然後傳遞所有後面的元素。

但是,我不知道如何構建一個實現它的Flow元素。是否有一些可以使用的內置Akka元素?

編輯:

好了,我打了昨天,我想出了這樣的事情:

Flow[Int]. 
    prefixAndTail(N). 
    flatMapConcat { 
    case (prefix, tail) if prefix.length == N => 
     Source(prefix).concat(tail) 
    case _ => 
     Source.empty[Int] 
    } 

將它做我想做什麼?

回答

1

這可能是其中一個小「狀態」可以走很長的路。即使解決方案不是「純粹的功能」,更新狀態將被系統的其他部分隔離和無法訪問。我認爲這是斯卡拉的美麗之一:當FP解決方案不明顯時,您可以始終以孤立的方式回覆命令......

完成的Flow將是多個子部分的組合。第一流只會組的元素融入尺寸N的序列:

現在的非功能部分,一個過濾器,只能通過當第一順序是正確的尺寸允許分組Seq值:

val minSizeRequirement : Int => Seq[Int] => Boolean = 
    (minSize) => { 
    var isFirst : Boolean = True 

    var passedMinSize : Boolean = False 

    (testSeq) => { 
     if(isFirst) { 
     isFirst = False 
     passedMinSize = testSeq.size >= minSize 
     passedMinSize 
     } 
     else 
     passedMinSize 
     } 
    } 
    } 

val minSizeFilter : Int => Flow[Seq[Int], Seq[Int], _] = 
    (minSize) => Flow[Seq[Int]].filter(minSizeRequirement(minSize)) 

的最後一步是將Seq[Int]值轉換回Int值:

val flatten = Flow[Seq[Int]].flatMapConcat(l => Source(l)) 

最後,結合它們放在一起:

val combinedFlow : Int => Flow[Int, Int, _] = 
    (minSize) => 
    group(minSize) 
     .via(minSizeFilter(minSize)) 
     .via(flatten) 
0

也許statefulMapConcat可以幫助你:

import akka.actor.ActorSystem 
import akka.stream.scaladsl.{Sink, Source} 
import akka.stream.{ActorMaterializer, Materializer} 

import scala.collection.mutable.ListBuffer 
import scala.concurrent.ExecutionContext 

object StatefulMapConcatExample extends App { 

    implicit val system: ActorSystem = ActorSystem() 
    implicit val materializer: Materializer = ActorMaterializer() 
    implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global 

    def filterLessThen(threshold: Int): (Int) => List[Int] = { 
    var buffering = true 
    val buffer: ListBuffer[Int] = ListBuffer() 
    (elem: Int) => 
     if (buffering) { 
     buffer += elem 
     if (buffer.size < threshold) { 
      Nil 
     } else { 
      buffering = false 
      buffer.toList 
     } 
     } else { 
     List(elem) 
     } 
    } 

    //Nil 
    Source(List(1, 2, 3)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Nil 
    Source(List(1, 2, 3, 4)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Vector(1,2,3,4,5) 
    Source(List(1, 2, 3, 4, 5)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 

    //Vector(1,2,3,4,5,6) 
    Source(List(1, 2, 3, 4, 5, 6)).statefulMapConcat(() => filterLessThen(5)) 
    .runWith(Sink.seq).map(println) 
}