2012-12-12 30 views
4

假設我需要將兩個函數f: String => Ag: A => B應用於大型文本文件中的每一行,最終創建一個B的列表。在Scala中並行讀取和處理文件

由於該文件很大,fg昂貴,我想使處理併發。我可以使用「並行集合」並執行類似io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l))的操作,但它不會同時執行讀取文件fg

在這個例子中實現併發的最好方法是什麼?

+2

爲什麼要以這種方式分解它,而不是讓每個線程一次處理10行代碼塊的所有方面? –

+0

@RexKerr好的。假設我想一次處理10行代碼塊。我正在逐行閱讀文件,當我讀完10行時,我派生出一個演員處理它們,異步發送該塊並獲取Future。一旦我完成閱讀,我打電話給所有的未來,以獲得結果。是否有意義? – Michael

回答

3

您可以在Future使用map

val futures = io.Source.fromFile(fileName).getLines.map{ s => Future{ stringToA(s) }.map{ aToB } }.toIndexedSeq 

val results = futures.map{ Await.result(_, 10 seconds) } 
// alternatively: 
val results = Await.result(Future.sequence(futures), 10 seconds) 
+0

謝謝。然而'stringToA'和'aToB'並不是在這裏並行,是嗎? – Michael

+0

@邁克爾:他們有。 「未來」上的「地圖」會在目標完成後創建新的「未來」。所以'stringToA'和'aToB'將在不同的線程中執行。我不知道這種行爲對你是否有用。 – senia

+0

@Michael:你確定我的答案是你想要的嗎?有了'Future',你可以使用你配置好的'ExecutionContext'。未來的'map'可以讓你分割任務(它可能對線程池有幫助)。但@dhg給你一個簡單的解決方案。如果您不必配置「ExecutionContext」和分割任務,情況會更好。 – senia

12

首先,一個重要的注意事項:不要在List使用.par,因爲它需要複製所有數據(因爲List只能順序讀取)。相反,請使用Vector之類的東西,因爲.par轉換可能會發生而無需複製。

看起來你似乎認爲並行是錯誤的。這裏會發生什麼:

如果你有一個這樣的文件:

0 
1 
2 
3 
4 
5 
6 
7 
8 
9 

而且功能fg

def f(line: String) = { 
    println("running f(%s)".format(line)) 
    line.toInt 
} 

def g(n: Int) = { 
    println("running g(%d)".format(n)) 
    n + 1 
} 

然後,你可以這樣做:

io.Source.fromFile("data.txt").getLines.toIndexedSeq[String].par.map(l => g(f(l))) 

而且獲得輸出:

running f(3) 
running f(0) 
running f(5) 
running f(2) 
running f(6) 
running f(1) 
running g(2) 
running f(4) 
running f(7) 
running g(4) 
running g(1) 
running g(6) 
running g(3) 
running g(5) 
running g(0) 
running g(7) 
running f(9) 
running f(8) 
running g(9) 
running g(8) 

因此,即使整個g(f(l))操作發生在同一個線程上,您可以看到每行可能會並行處理。因此,許多fg操作可以在單獨的線程上同時發生,但fg對於特定行將按順序發生。

畢竟,這是您應該期待的方式,因爲實際上沒有辦法讀取該行,請運行f並且運行g並行。例如,如果該行尚未被讀取,它怎麼能在f的輸出上執行g