2014-04-11 53 views
4

我正在嘗試從大量的中小文件中構建zip存檔。我希望能夠同時做到這一點,因爲壓縮是CPU密集型的,而且我正在多核服務器上運行。我也不想把整個檔案放在記憶裏,因爲它可能會變大。Go中的並行zip壓縮

我的問題是,我必須壓縮每個文件,然後結合所有的東西一起拉鍊頭,校驗和等?

任何幫助將不勝感激。

+1

這讓我想起了一點http://blog.golang.org/pipelines – VonC

回答

3

我不認爲你可以合併壓縮標題。

你可以做的是,運行zip.Writer順序,在一個單獨的goroutine,然後產卵要讀取每個文件的新的goroutine和管道那些被荏苒他們夠程。

這應該通過順序讀取文件來減少IO開銷,儘管它可能不會利用多個核心進行歸檔。

下面是一個工作示例。需要注意的是,讓事情變得簡單,

  • 它不處理錯誤好聽,只是恐慌,如果出現錯誤,
  • ,它不使用defer聲明太多,證明的順序事情應該發生。

由於deferis LIFO,當您將它們堆疊在一起時,有時可能會引起混淆。

package main 

import (
    "archive/zip" 
    "io" 
    "os" 
    "sync" 
) 

func ZipWriter(files chan *os.File) *sync.WaitGroup { 
    f, err := os.Create("out.zip") 
    if err != nil { 
     panic(err) 
    } 
    var wg sync.WaitGroup 
    wg.Add(1) 
    zw := zip.NewWriter(f) 
    go func() { 
     // Note the order (LIFO): 
     defer wg.Done() // 2. signal that we're done 
     defer f.Close() // 1. close the file 
     var err error 
     var fw io.Writer 
     for f := range files { 
      // Loop until channel is closed. 
      if fw, err = zw.Create(f.Name()); err != nil { 
       panic(err) 
      } 
      io.Copy(fw, f) 
      if err = f.Close(); err != nil { 
       panic(err) 
      } 
     } 
     // The zip writer must be closed *before* f.Close() is called! 
     if err = zw.Close(); err != nil { 
      panic(err) 
     } 
    }() 
    return &wg 
} 

func main() { 
    files := make(chan *os.File) 
    wait := ZipWriter(files) 

    // Send all files to the zip writer. 
    var wg sync.WaitGroup 
    wg.Add(len(os.Args)-1) 
    for i, name := range os.Args { 
     if i == 0 { 
      continue 
     } 
     // Read each file in parallel: 
     go func(name string) { 
      defer wg.Done() 
      f, err := os.Open(name) 
      if err != nil { 
       panic(err) 
      } 
      files <- f 
     }(name) 
    } 

    wg.Wait() 
    // Once we're done sending the files, we can close the channel. 
    close(files) 
    // This will cause ZipWriter to break out of the loop, close the file, 
    // and unblock the next mutex: 
    wait.Wait() 
} 

用法:go run example.go /path/to/*.log

這是哪些事情應該發生的順序:寫

  1. 打開輸出文件。
  2. 用該文件創建一個zip.Writer
  3. 啓動一個goroutine監聽頻道上的文件。
  4. 瀏覽每個文件,這可以在每個文件的一個goroutine中完成。
  5. 將每個文件發送到在步驟3中創建的goroutine。
  6. 在處理上述goroutine中的每個文件後,關閉文件以釋放資源。
  7. 一旦每個文件發送到所述goroutine,關閉該通道。
  8. 等待壓縮完成(依次完成)。
  9. 一旦壓縮完成(頻道耗盡),壓縮文件應該關閉。
  10. 僅當拉鍊式寫入器關閉時,輸出文件應該關閉。
  11. 最後一切都關閉了,所以關閉sync.WaitGroup告訴調用函數我們很好。 (一個頻道也可以在這裏使用,但sync.WaitGroup看起來更優雅。)
  12. 當你從拉鍊編輯器得到一切正常關閉的信號時,你可以退出main並很好地終止。

這可能不會回答你的問題,但我一直在使用類似的代碼來生成一個Web服務的zip文件。它表現相當好,即使實際的壓縮是在一個goroutine中完成的。克服IO瓶頸已經可以改善。

2

從它的外觀,你將無法parallelise使用標準庫archive/zip包,因爲壓縮:

  1. 壓縮是由zip.Writer.CreateCreateHeader返回io.Writer執行。
  2. 調用Create/CreateHeader隱式關閉前一次調用返回的作者。

因此,將由Create返回的作者傳遞給多個goroutines並向其並行寫入它們將不起作用。

如果你想寫自己的平行拉鍊作家,你可能要構建它是這樣的:

  1. 擁有多夠程壓縮使用compress/flate模塊文件,並跟蹤CRC32值和未壓縮數據的長度。輸出應該被定向到臨時文件。請注意數據的壓縮大小。
  2. 一旦所有東西都被壓縮了,開始寫頭文件開始的Zip文件。
  3. 寫出文件頭,後面跟着每個壓縮文件的相應臨時文件的內容。
  4. 在文件末尾寫出中央目錄記錄和結束記錄。所有必需的信息都應在此時提供。

爲了增加並行性,步驟1可以通過使用通道指示每個文件的壓縮完成時與其餘步驟並行執行。

由於文件格式的原因,如果不將壓縮數據存儲在內存或臨時文件中,您將無法執行並行壓縮。