阿卡流'groupBy
是一種方法。但是groupBy有一個maxSubstreams
參數,它需要你知道最大ID號範圍在前面。所以:溶液下面使用scan
來識別相同-ID塊,並splitWhen
分割成子:
object Main extends App {
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
def extractId(s: String) = {
val a = s.split(",")
a(0) -> a(1)
}
val file = new File("/tmp/example.csv")
private val lineByLineSource = FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
val future: Future[Done] = lineByLineSource
.map(extractId)
.scan((false,"",""))((l,r) => (l._2 != r._1, r._1, r._2))
.drop(1)
.splitWhen(_._1)
.fold(("",Seq[String]()))((l,r) => (r._2, l._2 ++ Seq(r._3)))
.concatSubstreams
.runForeach(println)
private val reply = Await.result(future, 10 seconds)
println(s"Received $reply")
Await.ready(system.terminate(), 10 seconds)
}
extractId
分割線成ID - >的數據元組。 scan
prepends id - >具有ID開始範圍標誌的數據元組。 drop
將引物組件降至scan
。 splitwhen
爲每個範圍的開始啓動一個新的子流。 fold
將子流連接到列表並刪除ID範圍開始布爾值,以便每個子流生成一個元素。代替摺疊,你可能需要一個自定義的SubFlow
,它處理單個ID的行流,併發出一些ID範圍的結果。 concatSubstreams
合併由分割產生的每個ID範圍子流時返回到由runForEach
打印的單個流中。
與
運行:
$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof
輸出是:
(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))
的問題:你怎麼現在管理背壓?你是否從單個節點讀取文件?你在使用akka集羣進行處理嗎? – Aivean
我不管理背壓。我已經嘗試了一些東西,但他們都是黑客(比如長時間輪詢'處理來自處理角色的'繼續'消息,並手動迭代讀取,這非常脆弱)。所以我選擇了破解它,而不是通過給我的應用程序足夠的堆空間來消耗整個輸入文件到內存中。我不能再這樣做了,因爲我必須將其部署到共享服務器,並且我無法再吞噬每個人的內存。 – dannytoone