2012-07-21 38 views
1

這是我的previous問題的後續行動。如何編寫過程文件並在Scala中並行寫入結果?

假設我並行處理文件。現在我想將處理結果寫入文件。由於結果不適合內存,我不能等待所有文件的處理完成,然後寫入結果。我必須以某種方式並行處理和寫作。

例如:假設我有帶數字的文件。文件大小爲約500M。文件的數量約爲200。每個文件都適合內存,但所有文件都不適合。現在我想將這些文件中的所有甚至號碼寫入其他文件。

在斯卡拉如何做到這一點(與Futures和斯卡拉parallel collections)?

+0

線()在scalax.io的是懶洋洋地評估 也可以看看在未來的高管HTTP:// jesseeichar.github.com/scala-io-doc/0.4.0/index.html#!/core/future_exec – oluies 2012-07-21 07:01:42

回答

4

在某些時候,你必須同步寫作。如果您不想阻止其他線程,則可以使用actor將結果寫入文件。這可能是這樣的:

class FileWriterActor(path: String) extends Actor { 

    val file = ... // init FileWriter 

    // this is how you implement an akka actor 
    // plain scala actors look a bit different   
    def receive = { 
    case x: MyResult => file.write(x.toString) 
    } 

    override def postStop() = file.close() 
} 

// usage 
val result = ... // calculation stuff 
fileWriter ! result 
+1

這是一個很好的策略。這裏可能不完全清楚的是線程的位置。首先,actor表示一個活動線程:它不斷地在其輸入隊列上接收消息,並將它們按順序寫入文件。其次,所有客戶端線程使用「fileWriter!value」序列將消息發送給(單個)actor。 '!'算子是從Hoare的CSP代數中借用的,概念也是如此。作爲替代方案,可以直接使用CSP,例如,通過JCSP,以這種方式,這個演員的工作方式將更加明確。 – 2012-07-22 09:13:08

+0

在一個使用並行集合在大輸入文件的每一行執行NLP的應用程序中,我從同步的println切換到使用Actor來寫入數據。 CPU使用率從180%提高到700%,每10萬條時間從12秒延長到2.5秒。 – schmmd 2013-05-10 18:05:43

1

對於那些不熟悉阿卡:

import java.io.{File, PrintWriter} 
import akka.actor.{Actor,ActorSystem,Props} 

object AkkaWriterExample extends App{ 

    val outputPath : String = ??? 
    val system = ActorSystem("WriterSystem") 
    val writer = system.actorOf(Props(new WriterActor(new File(outputPath))), name="writer") 
    writer ! "this is a test" 
    system.shutdown() 
    system.awaitTermination() 
} 

class WriterActor(outFile: File) extends Actor { 

    val writer = new PrintWriter(outFile) 

    // this is how you implement an akka actor 
    // plain scala actors look a bit different   
    def receive = { 
    case str:String => println(str); writer.write(str); 
    } 

    override def postStop() = { 
    writer.flush(); 
    writer.close(); 
    } 
}