2016-04-15 39 views
5

我有一個文件處理作業,目前使用akka actors與手動管理backpressure來處理處理管道,但我從來沒有能夠成功地管理背壓輸入文件閱讀階段。Sink for line-by-line file IO with backpressure

這項工作需要一個輸入文件並按照每行開始時的ID號對行進行分組,然後一旦它用新的ID號碼擊中一行,它就會通過消息將分組的行推送給處理角色,然後繼續使用新的ID號碼,直到達到文件末尾。

這似乎將是一個很好的使用情況阿卡流,使用文件作爲接收器,但我仍然不知道的三兩件事:

1)我如何通過讀取文件中的行線?

2)我怎樣才能根據每行上出現的ID進行分組?我目前使用非常必要的處理,我不認爲我會在流水線中具有相同的能力。

3)如何應用背壓,以便我不會更快地將數據讀入內存中,而不是我可以在下游處理數據?

+0

的問題:你怎麼現在管理背壓?你是否從單個節點讀取文件?你在使用akka集羣進行處理嗎? – Aivean

+0

我不管理背壓。我已經嘗試了一些東西,但他們都是黑客(比如長時間輪詢'處理來自處理角色的'繼續'消息,並手動迭代讀取,這非常脆弱)。所以我選擇了破解它,而不是通過給我的應用程序足夠的堆空間來消耗整個輸入文件到內存中。我不能再這樣做了,因爲我必須將其部署到共享服務器,並且我無法再吞噬每個人的內存。 – dannytoone

回答

7

阿卡流'groupBy是一種方法。但是groupBy有一個maxSubstreams參數,它需要你知道最大ID號範圍在前面。所以:溶液下面使用scan來識別相同-ID塊,並splitWhen分割成子:

object Main extends App { 
    implicit val system = ActorSystem("system") 
    implicit val materializer = ActorMaterializer() 

    def extractId(s: String) = { 
    val a = s.split(",") 
    a(0) -> a(1) 
    } 

    val file = new File("/tmp/example.csv") 

    private val lineByLineSource = FileIO.fromFile(file) 
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) 
    .map(_.utf8String) 

    val future: Future[Done] = lineByLineSource 
    .map(extractId) 
    .scan((false,"",""))((l,r) => (l._2 != r._1, r._1, r._2)) 
    .drop(1) 
    .splitWhen(_._1) 
    .fold(("",Seq[String]()))((l,r) => (r._2, l._2 ++ Seq(r._3))) 
    .concatSubstreams 
    .runForeach(println) 

    private val reply = Await.result(future, 10 seconds) 
    println(s"Received $reply") 
    Await.ready(system.terminate(), 10 seconds) 
} 

extractId分割線成ID - >的數據元組。 scan prepends id - >具有ID開始範圍標誌的數據元組。 drop將引物組件降至scansplitwhen爲每個範圍的開始啓動一個新的子流。 fold將子流連接到列表並刪除ID範圍開始布爾值,以便每個子流生成一個元素。代替摺疊,你可能需要一個自定義的SubFlow,它處理單個ID的行流,併發出一些ID範圍的結果。 concatSubstreams合併由分割產生的每個ID範圍子流時返回到由runForEach打印的單個流中。

運行:

$ cat /tmp/example.csv 
ID1,some input 
ID1,some more input 
ID1,last of ID1 
ID2,one line of ID2 
ID3,2nd before eof 
ID3,eof 

輸出是:

(ID1,List(some input, some more input, last of ID1)) 
(ID2,List(one line of ID2)) 
(ID3,List(2nd before eof, eof)) 
+0

我想我在這裏遵循邏輯,但你能解釋什麼'mergeSubstreams'呢?我似乎無法在API文檔中找到它的定義。 – dannytoone

+0

除了scaladocs,我沒有找到任何文檔,但是我爲舞臺添加了評論。此外,我用'concatSubstreams'替換'mergeSubstreams',因爲在這種情況下,子流將按順序(根據ID範圍)開始和終止,並且concat將以與ID範圍相同的順序產生輸出,而不管子流中的任何異步處理。 – tariksbl

0

看起來,在不引入大的修改的情況下向系統添加「背壓」的最簡單方法是簡單地將使用Actor的輸入組的郵箱類型更改爲BoundedMailbox

  1. 變化具有高mailbox-push-timeout-time消耗你的線條,以BoundedMailbox的演員類型:

    bounded-mailbox { 
        mailbox-type = "akka.dispatch.BoundedDequeBasedMailbox" 
        mailbox-capacity = 1 
        mailbox-push-timeout-time = 1h 
    } 
    
    val actor = system.actorOf(Props(classOf[InputGroupsConsumingActor]).withMailbox("bounded-mailbox")) 
    
  2. 從文件創建迭代器,創建組合(按id)從迭代器迭代器。然後循環訪問數據,將組發送給使用Actor。注意,在這種情況下,發送者將阻止當Actor的郵箱滿了。

    def iterGroupBy[A, K](iter: Iterator[A])(keyFun: A => K): Iterator[Seq[A]] = { 
        def rec(s: Stream[A]): Stream[Seq[A]] = 
        if (s.isEmpty) Stream.empty else { 
         s.span(keyFun(s.head) == keyFun(_)) match { 
         case (prefix, suffix) => prefix.toList #:: rec(suffix) 
        } 
        } 
        rec(iter.toStream).toIterator 
    } 
    
    val lines = Source.fromFile("input.file").getLines() 
    
    iterGroupBy(lines){l => l.headOption}.foreach { 
        lines:Seq[String] => 
         actor.tell(lines, ActorRef.noSender) 
    } 
    

這就是它! 您可能希望將文件讀取內容移至單獨的線程,因爲它會阻止。通過調整mailbox-capacity,您可以調整消耗的內存量。但是,如果從文件中讀取批次總是比處理速度更快,它似乎是合理的保持能力小,如1只或2

UPDiterGroupByStream實現的,經過測試不產生StackOverflow