2015-05-05 46 views
4

我有大量的向量字符串向量: 大約有50000個字符串向量, 其中每個向量包含2-15個長度爲1-20個字符的字符串。在Julia的大型常量數據結構上並行操作

MyScoringOperation是一個函數,它對一個字符串向量(數據)進行操作並返回一個10100分數的數組(作爲Float64s)。這需要大約0.01秒的運行MyScoringOperation(取決於數據的長度)

function MyScoringOperation(state:State, datum::Vector{String}) 
     ... 
     score::Vector{Float64} #Size of score = 10000 

我有一個相當於一個嵌套循環。 外環通常的做法會運行500次迭代

data::Vector{Vector{String}} = loaddata() 
for ii in 1:500 
    score_total = zeros(10100) 
    for datum in data 
     score_total+=MyScoringOperation(datum) 
    end 
end 

在一臺計算機上的3000(而不是50,000)小的情況下,測試這需要每外環100-300秒。

我有3個安裝了Julia 3.9的強大服務器(並且可以更輕鬆地獲得3個,然後在下一個規模可以獲得數百個)。


我有@parallel基本經驗,但現在看來似乎是花費了大量的時間拷貝常數(它或多或少地掛在較小的測試用例)

這看起來像:

data::Vector{Vector{String}} = loaddata() 
state = init_state() 
for ii in 1:500 

    score_total = @parallel(+) for datum in data 
     MyScoringOperation(state, datum) 
    end 
    state = update(state, score_total) 
end 

我的這個實現使用@parallel的方式理解是,它:

對於每個ii

  1. 分區data到每個工人
  2. 吸盤發送一個吸盤把每個工人
  3. 所有工作過程中有豆腐塊
  4. 主要過程總結的結果,因爲他們到達。

我想刪除第2步, 讓而不是發送數據塊到每個工人的, 我只是發了一系列指標的每個工作人員,他們看它自己的拷貝了data。甚至更好,只給每個只有自己的塊,並讓他們每次重複使用(節省大量的RAM)。


分析支持我對@parellel功能的信念。 對於類似的範圍問題(數據更小), 非並行版本在0.09秒運行, 和 並行運行並且分析器顯示幾乎所有時間都用了185秒。分析器顯示幾乎100%是花費與網絡IO交互。

+1

我懷疑這是密切相關的http://stackoverflow.com/questions/26067574/how-to-run-a-method-in-parallel-using-julia –

回答

4

這應該讓你開始:

function get_chunks(data::Vector, nchunks::Int) 
    base_len, remainder = divrem(length(data),nchunks) 
    chunk_len = fill(base_len,nchunks) 
    chunk_len[1:remainder]+=1 #remained will always be less than nchunks 
    function _it() 
     for ii in 1:nchunks 
      chunk_start = sum(chunk_len[1:ii-1])+1 
      chunk_end = chunk_start + chunk_len[ii] -1 
      chunk = data[chunk_start: chunk_end] 
      produce(chunk) 
     end 
    end 
    Task(_it) 
end 

function r_chunk_data(data::Vector) 
    all_chuncks = get_chunks(data, nworkers()) |> collect; 
    remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())] 
    #Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue 
end 



function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef}) 
    total = nothing 
    #TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe 
    @sync for rr in rem_results 
     function gather(rr) 
      res=fetch(rr) 
      if total===nothing 
       total=res 
      else 
       total=red_acc(total,res) 
      end 
     end 
     @async gather(rr) 
    end 
    total 
end 

function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function) 
    rem_results = map(r_chunks) do rchunk 
     function do_mapred() 
      @assert r_chunk.where==myid() 
      @pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _) 
     end 
     remotecall(r_chunk.where,do_mapred) 
    end 
    @pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _) 
end 

rchunk_data把數據分散成塊,(由get_chunks方法定義)和發送每個那些塊到不同的工人,在那裏它們被存儲在RemoteRefs。 的RemoteRefs對你的其他proccesses(和潛在的計算機)對存儲器的引用,即

prechunked_map_reduce確實在一類地圖的變化減小到每個工人第一次運行map_fun每個的它的吸盤元素,進而降低了所有其卡盤中的元素使用red_acc(減少累加器功能)。最後,每個工作人員返回結果,然後使用red_acc這一次使用fetch_reduce將它們全部一起減少,以便我們可以添加第一個完成的第一個。

fetch_reduce是一個非阻塞提取和減少操作。我相信它沒有競賽條件,但這可能是因爲@async@sync中的實現細節。當朱莉婭0.4出場時,很容易鎖定一下,使其顯然沒有競賽狀況。

這段代碼並沒有真正加強戰鬥力。我不;噸認爲 你也可能想看看是使吸盤大小可調,這樣就可以看到更多的數據,以更快的工人(如果有的有更好的網絡或更快的CPU)

您需要reexpress代碼作爲一個map-reduce問題,看起來並不難。


測試,與:

data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb 
chunk_data(:data, data) 
@time prechunked_mapreduce(:data, mean, (+)) 

接過〜0.03秒,當在8名工人(他們都不在同一臺機器上啓動)

分佈VS只是在本地運行:

@time reduce(+,map(mean,data)) 

花了約0.06秒。

+0

更新非常顯着使用遠程refs –