2012-04-09 62 views
0

我在亞馬遜S3上有巨大的csv文件(100MB +),我想用塊讀取它們並使用ruby CSV庫處理它們。我有一個很難創建CSV處理正確的IO對象:Ruby中的緩衝/ RingBuffer IO + Amazon S3非阻塞區塊讀取

buffer = TheRightIOClass.new 
bytes_received = 0 
RightAws::S3Interface.new(<access_key>, <access_secret>).retrieve_object(bucket, key) do  |chunk| 
    bytes_received += buffer.write(chunk) 
    if bytes_received >= 1*MEGABYTE 
    bytes_received = 0 
    csv(buffer).each do |row| 
     process_csv_record(row) 
    end 
    end 
end 

def csv(io) 
    @csv ||= CSV.new(io, headers: true) 
end 

我不知道這裏的正確設置應該是什麼和什麼TheRightIOClass是。我不想用StringIO將整個文件加載到內存中。在ruby中是否有bufferedio或ringbuffer來執行此操作? 如果有人使用線程(無進程)和管道有一個很好的解決方案,我很想看到它。

回答

2

您可以使用StringIO並進行一些巧妙的錯誤處理,以確保您在處理塊之前擁有整行。此示例中的打包器類僅將分析的行累積到內存中,直到將它們刷新到磁盤或數據庫。

packer = Packer.new 
object = AWS::S3.new.buckets[bucket].objects[path] 
io = StringIO.new 
csv = ::CSV.new(io, headers: true) 
object.read do |chunk| 
    #Append the most recent chunk and rewind the IO 
    io << chunk 
    io.rewind 
    last_offset = 0 
    begin 
    while row = csv.shift do 
     #Store the parsed row unless we're at the end of a chunk 
     unless io.eof? 
     last_offset = io.pos 
     packer << row.to_hash 
     end 
    end 
    rescue ArgumentError, ::CSV::MalformedCSVError => e 
    #Only rescue malformed UTF-8 and CSV errors if we're at the end of chunk 
    raise e unless io.eof? 
    end 
    #Seek to our last offset, create a new StringIO with that partial row & advance the cursor 
    io.seek(last_offset) 
    io.reopen(io.read) 
    io.read 
    #Flush our accumulated rows to disk every 1 Meg 
    packer.flush if packer.bytes > 1*MEGABYTES 
end 
#Read the last row 
io.rewind 
packer << csv.shift.to_hash 
packer