2017-06-12 36 views
0

我遇到了一個問題,即需要讀取非常大的文件,然後打印每個塊的分析結果。不是最終的完整列表。如何在Elixir流中的每個塊中寫入文件

到目前爲止,我可以得到一個地圖集的uniq的結果,但我無法弄清楚如何使用這種方法來獲得唯一的文件名

def new_file_name do 
    hex = :crypto.hash(:md5, Integer.to_string(:os.system_time(:millisecond))) 
    |> Base.encode16 
end 

於是寫信給每chunk_size

文件我所擁有的最好的是這給了我一個具有塊大小獨特結果的MapSets列表。這是一個MapSets列表,它可能最終導致內存太大而無法容納。

def parse(file_path, chunk_size) do 
    file_path 
    |> File.stream!(read_ahead: chunk_size) 
    |> Stream.drop(1) # remove header 
    |> Stream.map(&"#{&1}\") # Prepare to be written as a csv 
    |> Stream.chunk(chunk_size, chunk_size, []) # break up into chunks 
    |> method # method to write per chunk to file. 
end 

我有什麼之前是

|> Stream.map(&MapSet.new(&1)) # Create MapSet of unique values from each chunk 

,但我似乎無法找到任何的例子寫一個地圖集到文件。

+0

直到調用其中一個'Enum'函數或'Stream.run/1',纔會執行計算。所以可能你想完成使用'Enum.map'而不是'Stream.map' – GavinBrelstaff

+0

從文檔https://hexdocs.pm/elixir/MapSet.html看起來你唯一能做的就是將它轉換成到一個列表(使用'Mapset.to_list(map_set)'),然後寫入文件。我沒有自己嘗試過 - 所以如果你想在以後讀回數據,可能需要特別注意。 – GavinBrelstaff

+0

MapSet包含什麼?你想在新文件中保留行嗎?對於你放棄一個你想要存儲在MapSet中的數據的例子,這會是一個問題嗎?這將有助於理解問題並提出建議方法 –

回答

2

您可以使用Enum.reduce/3與文件句柄作爲蓄能器打開一個文件一次,然後寫信給它一個塊在一個時間:

def parse(file_path, chunk_size) do 
    file_path 
    |> File.stream!(read_ahead: chunk_size) 
    |> Stream.drop(1) # remove header 
    |> Stream.map(&"#{&1}\") # Prepare to be written as a csv 
    |> Stream.chunk(chunk_size, chunk_size, []) # break up into chunks 
    |> Enum.reduce(File.open!("output.txt", [:write]), fn chunk, file -> 
    :ok = IO.write(file, chunk) 
    file 
    end) 
end 

你可能需要調整你想怎麼寫塊到文件。以上將chunk視爲iodata,有效連接塊中的字符串並寫入它。

如果你想寫每塊只有獨特的物品,你可以添加:

|> Stream.map(fn chunk -> chunk |> MapSet.new |> MapSet.to_list end) 

管道進入Enum.reduce/3之前。

+0

使用'|> Stream.map'版本效果很好。只需要以'Stream.run'結尾 – mjwrazor

+0

你在哪裏添加了'Stream.run'? – Dogbert

+0

在流的末尾所以 '|> Stream.map(** Stuff **)|> Stream.map(枚舉減少每個塊到文件的方法)|> Stream.run' – mjwrazor

1

在@Dogbert的幫助下找到了一個有趣的方法。使用Stream會鎖定我的CPU使用率達到最高100%。有了這個,我能夠達到頂級256%的CPU使用率。這是每個300MB的文件上運行的。 30分鐘解析。

def alt_flow_parse_dir(path, out_file, chunk_size) do 
    concat_unique = File.open!(path <> "/" <> out_file, [:read, :utf8, :write]) 

    Path.wildcard(path <> "/*.csv") 
    |> Flow.from_enumerable 
    |> Flow.map(&append_to_file(&1, path, concat_unique, chunk_size)) 
    |> Flow.run 

    File.close(concat_unique) 
end 

# I just want the unique items of the first column 
def append_to_file(filename, path, out_file, chunk_size) do 
    file = filename 
    |> String.split("/") 
    |> Enum.take(-1) 
    |> List.to_string 
    path <> file 
    |> File.stream! 
    |> Stream.drop(1) 
    |> Flow.from_enumerable 
    |> Flow.map(&String.split(&1, ",") |> List.first) 
    |> Flow.map(&String.trim(&1,"\n")) 
    |> Flow.partition 
    |> Stream.chunk(chunk_size, chunk_size, []) 
    |> Flow.from_enumerable 
    |> Flow.map(fn chunk -> 
     chunk 
      |> MapSet.new 
      |> MapSet.to_list 
      |> List.flatten 
     end) 
    |> Flow.map(fn line -> 
     Enum.map(line, fn item -> 
      IO.puts(out_file, item) 
      end) 
     end) 
    |> Flow.run 
    end 
相關問題