0

反序列化對象protostuff獲得以下異常,而在消費者卡夫卡反序列化字節[]到protostuff對象問題在消費者卡夫卡

java.lang.NegativeArraySizeException 
at com.dyuproject.protostuff.GraphIOUtil.mergeDelimitedFrom(GraphIOUtil.java:209) 
at com.gme.protocols.protostuff.GmeTrade.readExternal(GmeTrade.java:2772) 
at java.io.ObjectInputStream.readExternalData(Unknown Source) 
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) 
at java.io.ObjectInputStream.readObject0(Unknown Source) 
at java.io.ObjectInputStream.readObject(Unknown Source) 

轉換protostuff對象爲byte []使用以下代碼。

public static byte[] toBytes(Object o) 
{ 
    try 
    { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     ObjectOutputStream oos = new ObjectOutputStream(baos); 
     oos.writeObject(o); 
     oos.close(); 
     byte[] b = baos.toByteArray(); 
     return b; 
    } 
    catch (IOException e) 
    { 
     return new byte[0]; 
    } 
} 

發送的字節[]使用卡夫卡生產者與主題「XX」,其中字節[]長度只是240 收到的使用卡夫卡消費者該記錄。 record.value()。length(byte [])長度是相同的240我從製作方發送的。

使用以下代碼將該字節[]反序列化爲對象。

public static Object fromBytes(byte[] bytes) 
{ 
    try 
    { 
     return new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject(); 
    } 
    catch (Exception e) 
    { 
     e.printStackTrace(); 
     return null; 
    } 
} 

獲得上述例外。我做錯了什麼? 使用kafka_2.11-0.9.0.0作爲參考。還有其他需要的東西嗎?

回答

0

我找到了解決方案。問題在toBytes和fromBytes中。 需要使用ProtostuffIOUtil.toByteArray方法將其轉換爲byte []。

序列化

public static byte[] toBytes(Foo o) 
{ 
    LinkedBuffer BUFFER = LinkedBuffer.allocate(1024*1024); 
    Schema<Foo> SCHEMA = Foo.getSchema(); 
    return ProtostuffIOUtil.toByteArray(o, SCHEMA, BUFFER); 
} 

同樣需要轉換字節[],用ProtostuffIOUtil.mergeFrom方法對象。

反序列化

public static Foo fromBytes(byte[] bytes) 
{ 
    Foo tmp = Foo.getSchema().newMessage(); 
    ProtostuffIOUtil.mergeFrom(bytes, tmp, Foo.getSchema()); 
    return tmp; 
} 

注:序列化/反序列化用的ObjectOutputStream/ObjectInputStream的將不是protostuff對象。