2016-03-01 143 views
1

我有卡夫卡羣集接收消息。消息是zip文件的一個字節數組。 zip文件包含二進制protobuf數據文件作爲條目。我正在閱讀zip文件並嘗試反序列化protobuf條目,這就是我的代碼正在碰到"protocol message has invalid UTF-8,invalid tag"例外的地方。無法解壓縮壓縮協議緩衝區

我能夠反序列化二進制protobuf文件,然後將其作爲壓縮字節數組發送給代理。

但是當我壓縮這些二進制protobuf文件,產生消息給卡夫卡,消耗它,然後嘗試反序列化zip流中的條目,我面臨的問題。

我不知道哪一個是這裏的罪魁禍首。

因爲這些二進制協議緩衝區被GZipped,壓縮他們又搞亂了事情?

有人可以解釋一下。

感謝

**************編輯**************

Producer Side: 

public byte[] getZipfileBytes() { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     ZipOutputStream zipOut = new ZipOutputStream(baos); 
     CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32()); 

     try { 
      ZipEntry zipEntry = new ZipEntry(testFile.getName()); 
      byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile)); 
      System.out.println("bytes length:\t"+protoBytes.length); 
      zipEntry.setSize(protoBytes.length); 
      zipOut.putNextEntry(zipEntry); 
      zipOut.write(protoBytes); 
      zipOut.close(); 
      System.out.println("checksum:"+checkSum.getChecksum().getValue()); 
      zipBytes = baos.toByteArray(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

     return zipBytes; 
    } 



    Consumer Side: 
     processConsumerRecord(ConsumerRecord<String, byte[]> record) { 
       String key = record.key(); 
       byte[] dataPacket = record.value(); 

       ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket)); 

       CheckedInputStream checkSum = new CheckedInputStream(zipIn, 
         new Adler32()); 
       ZipEntry zipEntry; 
       try { 
        zipEntry = zipIn.getNextEntry(); 
        while (zipEntry != null) { 
         String name = zipEntry.getName(); 
         ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
          try { 
           IOUtils.copy(zipIn, baos); 
           byte[] protoBytes = baos.toByteArray(); 

二進制protobuf的字節gzipped所以我需要gunzip

如果我做gunzip它不會以gzip格式引發。

如果我跳過gunzip並執行parseFrom我得到無效標記異常。 。

GZIPInputStream gzip = new GZIPInputStream(
         new ByteArrayInputStream(baos.toByteArray())); 
         MyProtoObject mpo = null; 
         try { 
          mpo = MyProtoObject.parseFrom(protoBytes); 
         } catch (InvalidProtocolBufferException e1) { 
          e1.printStackTrace(); 
         } 
        } catch (IOException e1) { 
         e1.printStackTrace(); 
        } 

checkSum.getChecksum()的getValue(),而在調試過程中生產和消費拉鍊字節數組
以下是ZipEntry的變量的值返回1:

producer 
     zipEntry ZipEntry (id=44) 
     comment null  
     crc 2147247736 
     csize 86794 
     extra null  
     flag 2056  
     method 8 
     name "test.dat" (id=49) 
     size 92931 
     time 1214084891 


    consumer 
     zipEntry ZipEntry (id=34) 
     comment null  
     crc 2147247736 
     csize 86794 
     extra null  
     flag 0 
     method 8 
     name "test.dat" (id=39) 
     size 92931 
     time 1214084891 

我甚至測試的其它方式,而不是處理內存中的protobytes,我把zip文件寫入磁盤,通過winzip手動提取它,然後反序列化提取的二進制原始文件,它工作!

我在做的壓縮/解壓縮走錯了路, 讓我知道

回答

0

有在這裏打球兩個不同的東西:壓縮/解壓縮,處理protobuf的。這聽起來像是第一個問題,聽起來像是破壞了protobuf數據。所以,現在:忘記了protobuf,只專注於zip/unzip。記錄原始消息(壓縮之前 - 可能是二進制文件或base-64塊)。現在在接收端,在解壓縮後(二進制文件或base-64塊),跟蹤你正在獲得的二進制二進制文件。如果它們不是絕對100%相同,則所有其他投注均關閉。直到你可以成功地複製原始的二進制文件,protobuf就沒有機會了。

如果這是問題:最好是顯示您的zip/unzip代碼,以便我們可以看到它。

如果你正確壓縮/解壓二進制文件,那麼問題將出現在你的protobuf代碼中。

如果這是問題:最好是顯示序列化/反序列化代碼,以便我們可以看到它。

+0

小/大端轉換(或缺少它)可能會導致問題嗎? – Emil

+0

@Emil你選擇的protobuf庫應該在內部處理字節序; protobuf規範使這些細節清晰可見,如果任何已建立的庫出錯,它會讓我感到驚訝。你的zip/unzip代碼*不應該關心字節順序* - 它只需要在進程的開始和結束時以相同的順序得到相同的字節。 –

+0

我完全同意。正如您所說,我認爲忽略protobuf步驟並確保zip/unzip步驟按預期工作是個好主意。只是說在傳輸數據時Endianess會導致問題 – Emil