我有卡夫卡羣集接收消息。消息是zip文件的一個字節數組。 zip文件包含二進制protobuf數據文件作爲條目。我正在閱讀zip文件並嘗試反序列化protobuf條目,這就是我的代碼正在碰到"protocol message has invalid UTF-8,invalid tag"
例外的地方。無法解壓縮壓縮協議緩衝區
我能夠反序列化二進制protobuf文件,然後將其作爲壓縮字節數組發送給代理。
但是當我壓縮這些二進制protobuf文件,產生消息給卡夫卡,消耗它,然後嘗試反序列化zip流中的條目,我面臨的問題。
我不知道哪一個是這裏的罪魁禍首。
因爲這些二進制協議緩衝區被GZipped,壓縮他們又搞亂了事情?
有人可以解釋一下。
感謝
**************編輯**************
Producer Side:
public byte[] getZipfileBytes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipOutputStream zipOut = new ZipOutputStream(baos);
CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());
try {
ZipEntry zipEntry = new ZipEntry(testFile.getName());
byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
System.out.println("bytes length:\t"+protoBytes.length);
zipEntry.setSize(protoBytes.length);
zipOut.putNextEntry(zipEntry);
zipOut.write(protoBytes);
zipOut.close();
System.out.println("checksum:"+checkSum.getChecksum().getValue());
zipBytes = baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return zipBytes;
}
Consumer Side:
processConsumerRecord(ConsumerRecord<String, byte[]> record) {
String key = record.key();
byte[] dataPacket = record.value();
ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));
CheckedInputStream checkSum = new CheckedInputStream(zipIn,
new Adler32());
ZipEntry zipEntry;
try {
zipEntry = zipIn.getNextEntry();
while (zipEntry != null) {
String name = zipEntry.getName();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
IOUtils.copy(zipIn, baos);
byte[] protoBytes = baos.toByteArray();
二進制protobuf的字節gzipped所以我需要gunzip
如果我做gunzip它不會以gzip格式引發。
如果我跳過gunzip並執行parseFrom我得到無效標記異常。 。
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(baos.toByteArray()));
MyProtoObject mpo = null;
try {
mpo = MyProtoObject.parseFrom(protoBytes);
} catch (InvalidProtocolBufferException e1) {
e1.printStackTrace();
}
} catch (IOException e1) {
e1.printStackTrace();
}
checkSum.getChecksum()的getValue(),而在調試過程中生產和消費拉鍊字節數組
以下是ZipEntry的變量的值返回1:
producer
zipEntry ZipEntry (id=44)
comment null
crc 2147247736
csize 86794
extra null
flag 2056
method 8
name "test.dat" (id=49)
size 92931
time 1214084891
consumer
zipEntry ZipEntry (id=34)
comment null
crc 2147247736
csize 86794
extra null
flag 0
method 8
name "test.dat" (id=39)
size 92931
time 1214084891
我甚至測試的其它方式,而不是處理內存中的protobytes,我把zip文件寫入磁盤,通過winzip手動提取它,然後反序列化提取的二進制原始文件,它工作!
我在做的壓縮/解壓縮走錯了路, 讓我知道
小/大端轉換(或缺少它)可能會導致問題嗎? – Emil
@Emil你選擇的protobuf庫應該在內部處理字節序; protobuf規範使這些細節清晰可見,如果任何已建立的庫出錯,它會讓我感到驚訝。你的zip/unzip代碼*不應該關心字節順序* - 它只需要在進程的開始和結束時以相同的順序得到相同的字節。 –
我完全同意。正如您所說,我認爲忽略protobuf步驟並確保zip/unzip步驟按預期工作是個好主意。只是說在傳輸數據時Endianess會導致問題 – Emil