所以這是一個我已經打了一堵牆的任務。在這一點上,我主要希望能夠擁有更多的眼睛,因爲在重做和精煉這麼多次後,我實在看不出我的代碼有什麼問題。Java-多線程壓縮與Deflater
我們需要在Java中編寫一個多線程壓縮器,可以使用gzip -d調用正確解壓。我們不能使用GZIPOutputStream調用。相反,我們手動生成標題和預告片,並使用Deflater壓縮數據。我們從標準輸入讀取並寫入標準輸出。
基本上我用Executor來維護一個線程池。我讀取輸入,並將其寫入設置大小的緩衝區中。一旦緩衝區已滿,我將該數據塊傳遞給線程(將任務放入隊列中)。每個線程都有自己的Deflater,並傳遞輸入和其他任何需要壓縮該數據的信息。我還使用每個塊的最後32Kb作爲下一個塊的字典。
我已確認我的標題和預告片是正確的。我使用GZIPOutputStream壓縮文件,並使用hexdump獲取字節,以便將其與輸出進行比較。我檢查過不同大小的文件,並且標題和預告片是相同的,因此在所有可能的情況下,問題都在壓縮數據中。我得到的錯誤是:無效的壓縮數據 - crc錯誤
我已經確認,當我通過一個相對較小的輸入(以便只有一個線程,因爲我從未填滿緩衝區,只有一個任務在隊列中)輸出是正確的。我可以對壓縮數據調用gzip -d並獲取完全相同的輸入。
換句話說,問題出在有足夠數據的時候,多個線程啓動並運行。我在我的輸出中使用了一個hexdump作爲大文件,並將它與GZIPOutputStream的hexdump進行了比較,它們非常相似(不完全相同,但即使是小文件,hexdump也與壓縮數據略有不同。這種情況下,gzip -d仍然有效)。這也是我如何知道標題和預告片是否正確。
傳入碼轉儲
import java.lang.Runtime;
import java.lang.String;
import java.lang.Integer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.nio.ByteBuffer;
import java.io.*;
import java.util.zip.*;
/*Warning: Do not compress files larger than 2GB please. Since this is just
an assignment and not meant to replace an actual parallel compressor, I cut corners
by casting longs to ints, since it's easier to convert to 4 bytes*/
public class Main {
private static final int BLOCK_SIZE = 128*1024;
private static final int DICT_SIZE = 32*1024;
private static byte[] header = {(byte)0x1f, (byte)0x8b, (byte)0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
public static void main(String[] args){
class workerThread implements Callable<byte[]> {
private boolean lastBlock;
private boolean dictAvailable;
private byte[] input;
private byte[] dictionary;
private int lastSize;
private byte[] output = new byte[BLOCK_SIZE];
private int compressedLength;
private ByteArrayOutputStream bOut = new ByteArrayOutputStream();
Deflater compress = new Deflater (Deflater.DEFAULT_COMPRESSION, true);
workerThread(boolean last, byte[] blockIn, byte[] dict, boolean useDictionary, int lastBSize){
this.lastBlock = last;
this.input = blockIn;
this.dictionary = dict;
this.dictAvailable = useDictionary;
this.lastSize = lastBSize;
}
public byte[] call() {
//System.out.println("running thread ");
if (lastBlock) {
// System.out.println("Last block!");
compress.setInput(input,0,lastSize);
if(dictAvailable) {
compress.setDictionary(dictionary);
}
compress.finish();
compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
}
else {
//System.out.println("Not last block!");
compress.setInput(input,0,BLOCK_SIZE);
if(dictAvailable) {
compress.setDictionary(dictionary);
}
compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
}
byte[] finalOut = Arrays.copyOfRange(output,0,compressedLength);
return finalOut;
}
}
getProcessors p = new getProcessors();
boolean useDict = true;
int numProcs = p.getNumProcs();
boolean customProcs = false;
boolean foundProcs = false;
boolean foundDict = false;
/*Checking if arguments are correct*/
........
/*Correct arguments, proceeding*/
BufferedInputStream inBytes = new BufferedInputStream(System.in);
byte[] buff = new byte[BLOCK_SIZE];
byte[] dict = new byte[DICT_SIZE];
int bytesRead = 0;
int offset = 0;
int uncompressedLength = 0;
int lastBlockSize = 0;
boolean isLastBlock = false;
boolean firstBlockDone = false;
/*Using an executor with a fixed thread pool size in order to manage threads
as well as obtain future results to maintain synchronization*/
ExecutorService exec = Executors.newFixedThreadPool(numProcs);
CRC32 checksum = new CRC32();
checksum.reset();
List<Future<byte[]>> results = new ArrayList<Future<byte[]>>();
//byte[] temp;
System.out.write(header,0,header.length);
try{
bytesRead = inBytes.read(buff,0, BLOCK_SIZE);
while (bytesRead != -1) {
uncompressedLength += bytesRead;
checksum.update(buff,offset,bytesRead);
offset += bytesRead;
if (offset == BLOCK_SIZE) {
offset = 0;
if(!firstBlockDone){
firstBlockDone = true;
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,false,lastBlockSize)));
}
else {
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
}
if (useDict) {
System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
}
}
/*Implementation warning! Because of the way bytes are read in, this program will fail if
the file being zipped is exactly a multiple of 128*1024*/
if((bytesRead=inBytes.read(buff,offset,BLOCK_SIZE-offset)) == -1) {
isLastBlock = true;
lastBlockSize = offset;
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
}
}
try {
for(Future<byte[]> result: results) {
//System.out.println("Got result!");
System.out.write(result.get(),0,result.get().length);
//temp = result.get();
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
System.err.println("Interrupted thread!");
}
catch (ExecutionException ex) {
ex.printStackTrace();
System.err.println("Interrupted thread!");
}
finally{
exec.shutdownNow();
}
/*Converting CRC sum and total length to bytes for trailer*/
byte[] trailer = new byte[8];
getTrailer trail = new getTrailer(checksum.getValue(),uncompressedLength);
trail.writeTrailer(trailer,0);
System.out.write(trailer);
}
catch (IOException ioe) {
ioe.printStackTrace();
System.out.println("IO error.");
System.exit(-1);
}
catch (Throwable e) {
System.out.println("Unexpected exception or error.");
System.exit(-1);
}
}
}
啊哎呀,格式得到了由代碼塊格式揭去了一下。正如你所看到的,我一直在閱讀輸入,直到buff已滿爲止。原因是因爲這不是一個文件,所以有可能第一次讀取的讀取沒有讀取足夠的字節來填充數組(給我留下一堆空白,我不想搞亂任何東西)。一旦它滿了,我將它交給執行程序,以便線程執行任務。我實現了Callable而不是Runnable,這樣我就可以將輸出作爲字節數組返回並且因爲我需要將來的接口。 exec.get()方法允許我保持線程同步。我已經用一個任意的案例來測試(打印數字1 - 100以確保它們確實按順序打印)。
有一個缺陷,這個程序不能用於BLOCK_SIZE的倍數的文件,但這不是我現在的問題。這個程序在輸入小到足以讓我只運行一個線程時工作。
對於除最後一個塊以外的每個塊,我用SYNC_FLUSH選項調用deflate。這是我可以在一個字節邊界上結束。我通常壓縮的最後一塊,然後調用完成。
對不起,真的很長的職位。我真的需要更多的意見,除了我自己,因爲我似乎無法找到錯誤。如果有人想要編譯並運行它以查看自己,那麼下面是我的其他類(只是爲了獲得進程數並生成預告片,這兩種工作都很好)。
import java.io.*;
public class getTrailer {
private long crc;
private int total;
public getTrailer (long crcVal, int totalIn) {
this.crc = crcVal;
this.total = totalIn;
}
public void writeTrailer(byte[] buf, int offset) throws IOException {
writeInt((int)crc, buf, offset); // CRC-32 of uncompr. data
writeInt(total, buf, offset + 4); // Number of uncompr. bytes
}
/* Writes integer in Intel byte order to a byte array, starting at a
* given offset
*/
public void writeInt(int i, byte[] buf, int offset) throws IOException {
writeShort(i & 0xffff, buf, offset);
writeShort((i >> 16) & 0xffff, buf, offset + 2);
}
/*
* Writes short integer in Intel byte order to a byte array, starting
* at a given offset
*/
public void writeShort(int s, byte[] buf, int offset) throws IOException {
buf[offset] = (byte)(s & 0xff);
buf[offset + 1] = (byte)((s >> 8) & 0xff);
}
}
拖車功能被完整複製從Java的文檔
public class getProcessors {
private Runtime runner = Runtime.getRuntime();
private int nProcs = runner.availableProcessors();
int getNumProcs() {
return nProcs;
}
}
粘貼我意識到這是怎麼長的,但我真的需要別人的意見。如果有人看到他們認爲可能導致問題的任何事情,請告訴我。我不需要有人爲我編寫程序(我想我幾乎在那裏),但我只是......看不出有什麼問題。
你的意思是我什麼時候得到預告片?情況並非如此。我使用hexdump將其與正確的預告片進行比較,它們是相同的。 crc錯誤意味着它讀取我的壓縮數據,解壓縮它,然後將它與CRC期望看到的內容進行比較,它們不一樣。預告片只是包含該信息的字節,因此每次對於同一個文件它們應該是相同的(即使壓縮字節由於降級等原因而看起來不同)。 – user1777900
我將不得不查看更多評論的代碼,但是作爲旁白,您是否看過ByteBuffer?它可以按字節順序讀取/寫入所有基本類型。 – Zagrev