2012-10-30 72 views
1

所以這是一個我已經打了一堵牆的任務。在這一點上,我主要希望能夠擁有更多的眼睛,因爲在重做和精煉這麼多次後,我實在看不出我的代碼有什麼問題。Java-多線程壓縮與Deflater

我們需要在Java中編寫一個多線程壓縮器,可以使用gzip -d調用正確解壓。我們不能使用GZIPOutputStream調用。相反,我們手動生成標題和預告片,並使用Deflater壓縮數據。我們從標準輸入讀取並寫入標準輸出。

基本上我用Executor來維護一個線程池。我讀取輸入,並將其寫入設置大小的緩衝區中。一旦緩衝區已滿,我將該數據塊傳遞給線程(將任務放入隊列中)。每個線程都有自己的Deflater,並傳遞輸入和其他任何需要壓縮該數據的信息。我還使用每個塊的最後32Kb作爲下一個塊的字典。

我已確認我的標題和預告片是正確的。我使用GZIPOutputStream壓縮文件,並使用hexdump獲取字節,以便將其與輸出進行比較。我檢查過不同大小的文件,並且標題和預告片是相同的,因此在所有可能的情況下,問題都在壓縮數據中。我得到的錯誤是:無效的壓縮數據 - crc錯誤

我已經確認,當我通過一個相對較小的輸入(以便只有一個線程,因爲我從未填滿緩衝區,只有一個任務在隊列中)輸出是正確的。我可以對壓縮數據調用gzip -d並獲取完全相同的輸入。

換句話說,問題出在有足夠數據的時候,多個線程啓動並運行。我在我的輸出中使用了一個hexdump作爲大文件,並將它與GZIPOutputStream的hexdump進行了比較,它們非常相似(不完全相同,但即使是小文件,hexdump也與壓縮數據略有不同。這種情況下,gzip -d仍然有效)。這也是我如何知道標題和預告片是否正確。

傳入碼轉儲

import java.lang.Runtime; 
import java.lang.String; 
import java.lang.Integer; 
import java.util.Arrays; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.nio.ByteBuffer; 
import java.io.*; 
import java.util.zip.*; 

/*Warning: Do not compress files larger than 2GB please. Since this is just 
    an assignment and not meant to replace an actual parallel compressor, I cut corners 
    by casting longs to ints, since it's easier to convert to 4 bytes*/ 

public class Main { 
    private static final int BLOCK_SIZE = 128*1024; 
    private static final int DICT_SIZE = 32*1024; 
    private static byte[] header = {(byte)0x1f, (byte)0x8b, (byte)0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; 

    public static void main(String[] args){ 

    class workerThread implements Callable<byte[]> { 
     private boolean lastBlock; 
     private boolean dictAvailable; 
     private byte[] input; 
     private byte[] dictionary; 
     private int lastSize;  

     private byte[] output = new byte[BLOCK_SIZE]; 
     private int compressedLength; 
     private ByteArrayOutputStream bOut = new ByteArrayOutputStream(); 
     Deflater compress = new Deflater (Deflater.DEFAULT_COMPRESSION, true); 

     workerThread(boolean last, byte[] blockIn, byte[] dict, boolean useDictionary, int lastBSize){ 
      this.lastBlock = last; 
      this.input = blockIn; 
      this.dictionary = dict; 
      this.dictAvailable = useDictionary; 
      this.lastSize = lastBSize; 
     } 

     public byte[] call() { 
      //System.out.println("running thread "); 
      if (lastBlock) { 
       // System.out.println("Last block!"); 
       compress.setInput(input,0,lastSize); 
       if(dictAvailable) { 
        compress.setDictionary(dictionary); 
       } 
       compress.finish(); 
       compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH); 
      } 
      else { 
       //System.out.println("Not last block!"); 
       compress.setInput(input,0,BLOCK_SIZE); 
       if(dictAvailable) { 
        compress.setDictionary(dictionary); 
       } 
       compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH); 
      } 
      byte[] finalOut = Arrays.copyOfRange(output,0,compressedLength); 
      return finalOut; 
     } 
    } 

    getProcessors p = new getProcessors(); 
    boolean useDict = true; 
    int numProcs = p.getNumProcs(); 
    boolean customProcs = false; 
    boolean foundProcs = false; 
    boolean foundDict = false; 

    /*Checking if arguments are correct*/ 
    ........ 
    /*Correct arguments, proceeding*/ 

    BufferedInputStream inBytes = new BufferedInputStream(System.in); 
    byte[] buff = new byte[BLOCK_SIZE]; 
    byte[] dict = new byte[DICT_SIZE]; 
    int bytesRead = 0; 
    int offset = 0; 
    int uncompressedLength = 0; 
    int lastBlockSize = 0; 
    boolean isLastBlock = false; 
    boolean firstBlockDone = false; 

    /*Using an executor with a fixed thread pool size in order to manage threads 
    as well as obtain future results to maintain synchronization*/ 
    ExecutorService exec = Executors.newFixedThreadPool(numProcs); 
    CRC32 checksum = new CRC32(); 
    checksum.reset(); 
    List<Future<byte[]>> results = new ArrayList<Future<byte[]>>(); 

    //byte[] temp; 

    System.out.write(header,0,header.length); 
    try{ 
     bytesRead = inBytes.read(buff,0, BLOCK_SIZE); 
     while (bytesRead != -1) { 
      uncompressedLength += bytesRead; 
      checksum.update(buff,offset,bytesRead); 
      offset += bytesRead; 

      if (offset == BLOCK_SIZE) { 
       offset = 0; 
       if(!firstBlockDone){ 
       firstBlockDone = true; 
       results.add(exec.submit(new workerThread(isLastBlock,buff,dict,false,lastBlockSize))); 
       } 
       else { 
        results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize))); 
       } 

       if (useDict) { 
        System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE); 
       } 
      } 

      /*Implementation warning! Because of the way bytes are read in, this program will fail if 
      the file being zipped is exactly a multiple of 128*1024*/ 
      if((bytesRead=inBytes.read(buff,offset,BLOCK_SIZE-offset)) == -1) { 
       isLastBlock = true; 
       lastBlockSize = offset; 
       results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize))); 
      } 
     }  
     try { 
      for(Future<byte[]> result: results) { 
      //System.out.println("Got result!"); 
      System.out.write(result.get(),0,result.get().length); 
      //temp = result.get(); 
      } 
     } 
     catch (InterruptedException ex) { 
      ex.printStackTrace(); 
      System.err.println("Interrupted thread!"); 
     } 
     catch (ExecutionException ex) { 
      ex.printStackTrace(); 
      System.err.println("Interrupted thread!"); 
     } 
     finally{ 
      exec.shutdownNow(); 
     } 

    /*Converting CRC sum and total length to bytes for trailer*/ 
    byte[] trailer = new byte[8]; 
    getTrailer trail = new getTrailer(checksum.getValue(),uncompressedLength); 
    trail.writeTrailer(trailer,0); 
    System.out.write(trailer); 

    } 
    catch (IOException ioe) { 
     ioe.printStackTrace(); 
     System.out.println("IO error."); 
     System.exit(-1);  
    } 
    catch (Throwable e) { 
     System.out.println("Unexpected exception or error."); 
     System.exit(-1); 
    } 
    } 
} 

啊哎呀,格式得到了由代碼塊格式揭去了一下。正如你所看到的,我一直在閱讀輸入,直到buff已滿爲止。原因是因爲這不是一個文件,所以有可能第一次讀取的讀取沒有讀取足夠的字節來填充數組(給我留下一堆空白,我不想搞亂任何東西)。一旦它滿了,我將它交給執行程序,以便線程執行任務。我實現了Callable而不是Runnable,這樣我就可以將輸出作爲字節數組返回並且因爲我需要將來的接口。 exec.get()方法允許我保持線程同步。我已經用一個任意的案例來測試(打印數字1 - 100以確保它們確實按順序打印)。

有一個缺陷,這個程序不能用於BLOCK_SIZE的倍數的文件,但這不是我現在的問題。這個程序在輸入小到足以讓我只運行一個線程時工作。

對於除最後一個塊以外的每個塊,我用SYNC_FLUSH選項調用deflate。這是我可以在一個字節邊界上結束。我通常壓縮的最後一塊,然後調用完成。

對不起,真的很長的職位。我真的需要更多的意見,除了我自己,因爲我似乎無法找到錯誤。如果有人想要編譯並運行它以查看自己,那麼下面是我的其他類(只是爲了獲得進程數並生成預告片,這兩種工作都很好)。

import java.io.*; 

public class getTrailer { 
    private long crc; 
    private int total; 
    public getTrailer (long crcVal, int totalIn) { 
     this.crc = crcVal; 
     this.total = totalIn; 
    } 
    public void writeTrailer(byte[] buf, int offset) throws IOException { 
     writeInt((int)crc, buf, offset); // CRC-32 of uncompr. data 
     writeInt(total, buf, offset + 4); // Number of uncompr. bytes 
    } 

    /* Writes integer in Intel byte order to a byte array, starting at a 
    * given offset 
    */ 

    public void writeInt(int i, byte[] buf, int offset) throws IOException { 
     writeShort(i & 0xffff, buf, offset); 
     writeShort((i >> 16) & 0xffff, buf, offset + 2); 
    } 

    /* 
    * Writes short integer in Intel byte order to a byte array, starting 
    * at a given offset 
    */ 

    public void writeShort(int s, byte[] buf, int offset) throws IOException { 
     buf[offset] = (byte)(s & 0xff); 
     buf[offset + 1] = (byte)((s >> 8) & 0xff); 
    } 
} 

拖車功能被完整複製從Java的文檔

public class getProcessors { 
    private Runtime runner = Runtime.getRuntime(); 
    private int nProcs = runner.availableProcessors(); 

    int getNumProcs() { 
     return nProcs; 
    } 
} 

粘貼我意識到這是怎麼長的,但我真的需要別人的意見。如果有人看到他們認爲可能導致問題的任何事情,請告訴我。我不需要有人爲我編寫程序(我想我幾乎在那裏),但我只是......看不出有什麼問題。

回答

0

所以,我的第一個猜測是你正在寫錯誤的字節順序的CRC。這似乎是您第一次一次寫入4個字節。

+0

你的意思是我什麼時候得到預告片?情況並非如此。我使用hexdump將其與正確的預告片進行比較,它們是相同的。 crc錯誤意味着它讀取我的壓縮數據,解壓縮它,然後將它與CRC期望看到的內容進行比較,它們不一樣。預告片只是包含該信息的字節,因此每次對於同一個文件它們應該是相同的(即使壓縮字節由於降級等原因而看起來不同)。 – user1777900

+0

我將不得不查看更多評論的代碼,但是作爲旁白,您是否看過ByteBuffer?它可以按字節順序讀取/寫入所有基本類型。 – Zagrev

0

如果你這樣做了類,到什麼被接受,那麼我希望這個類是關於結構化,程序化編程,你所呈現外觀相似的,因爲你所提出和內容的面向對象的解決辦法看起來像是相隔數英里。

您的評論,

「的exec.get()方法可以讓我去保持同步線程我測試過,與任意的情況下(打印出數字1 - 100,以確保,他們確實按順序打印)「。

與人們對多線程解決方案的期望完全相反。多線程解決方案將以完全不可預知的順序輸出1-100。它的出現意味着你已經同步了多線程的所有好處。立即等待緩衝區填滿,然後立即將我視爲一個問題點。

根據責任將解決方案分解成類。您正在爲行爲建模(即getProcessors,getTrailer),這是錯誤的。不要根據活動或狀態對類進行建模。大多數時候,簡單地談論你正在嘗試做什麼會產生正確的類(例如,我們有一些輸入數據,壓縮器,解壓縮器,某種工作隊列,拖車等等。如果你需要操縱一個處理器列表,然後有一個處理器類包裝(有一個,而不是一個)列表。每個類在整個解決方案中都有特定的責任,每個類只運行在它自己(沒有公共訪問器)上。能夠在獨立測試中執行其功能,那麼您就可以在多線程解決方案中使用它們的實例了

如果您創建了一個由您認爲屬於解決方案的類組成的領域模型,那麼開始通過向適當的類添加方法來對功能進行建模,模型本身將開始通知您應如何編碼交互。提示:構造函數ca n接受低級結構作爲參數,其他方法不應該。

首先,不要以爲線性 - 你有從上往下進行處理一個main()方法 - BUZZZZZ。錯誤的迴應。解決方案應該是一組類之間相互作用的症狀,每個類都提供了整體解決方案的一個獨立部分。

最好的多線程解決方案不要求同步 - 線程是謹慎的,並能夠以最快的速度運行。實現這一點的一個簡單方法是確保每個線程都使用其自己的任何涉及類的實例 - 不要使用共享內存。如果在輸出端需要同步,則線程應該將其結果轉儲到一個類中,該類將在輸出之前作爲最後一步執行排序。

最後,你確定你是多標題正確的東西嗎?我認爲我更希望在不同數據源上啓動多個實例,而不是一個數據源上的多個線程。我們知道,每個源都需要從頭到尾完全處理 - 問題變成了我們希望儘可能快地處理每個源,還是希望能夠同時處理多個源,以便我們不關心每個人都可能會走。

在單個線程上執行單個源的處理允許我同時處理多個源,並且在解決方案方面非常簡單 - 跡象表明這是執行多線程的好地方。執行單個源的多線程處理增加了相當大的複雜性,並且由於數據的性質(必須按輸入順序),多線程並不是一個好的解決方案。