1

。我正在使用數據流來處理這些日誌並將相關列存儲在BigQuery中。可有人請與發佈 - 訂閱消息負載的一個LogEntry對象轉換幫助。 我曾嘗試下面的代碼:如何發佈 - 訂閱有效載荷轉換爲LogEntry對象已啓用了日誌出口到一家酒吧子課題原木出口

@ProcessElement 
public void processElement(ProcessContext c) throws Exception { 
    PubsubMessage pubsubMessage = c.element(); 

    ObjectMapper mapper = new ObjectMapper(); 

    byte[] payload = pubsubMessage.getPayload(); 
    String s = new String(payload, "UTF8"); 
    LogEntry logEntry = mapper.readValue(s, LogEntry.class); 
} 

但我得到了以下錯誤:

com.fasterxml.jackson.databind.JsonMappingException: Can not find a (Map) Key deserializer for type [simple type, class com.google.protobuf.Descriptors$FieldDescriptor] 

編輯: 我嘗試下面的代碼:

try { 
     ByteArrayInputStream stream = new ByteArrayInputStream(Base64.decodeBase64(pubsubMessage.getPayload())); 
     LogEntry logEntry = LogEntry.parseDelimitedFrom(stream); 
     System.out.println("Log Entry = " + logEntry); 
    } catch (InvalidProtocolBufferException e) { 
     e.printStackTrace(); 
    } 

,但我得到以下現在的錯誤:

com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

回答

0

JSON format parser應該能夠做到這一點。 Java不是我的力量,但我認爲你正在尋找類似的東西:

@ProcessElement 
public void processElement(ProcessContext c) throws Exception { 
    LogEntry.Builder entryBuilder = LogEntry.newBuilder(); 
    JsonFormat.Parser.usingTypeRegistry(
     JsonFormat.TypeRegistry.newBuilder() 
      .add(LogEntry.getDescriptor()) 
      .build()) 
     .ignoringUnknownFields() 
     .merge(c.element(), entryBuilder); 
    LogEntry entry = entryBuilder.build(); 
    ... 
} 

你也許能夠在不註冊類型的情況下離開。我認爲在C++中,原型類型被鏈接到全局註冊表中。

你會想萬一「ignoringUnknownFields」的服務增加了新的領域,並出口他們,你有沒有更新您的原描述符的副本。導出的JSON中的任何「@type」字段也會導致問題。

您可能需要有效載荷的特殊處理(即條如果從JSON,然後分別解析它)。如果是JSON,我希望解析器嘗試填充不存在的子消息。如果它是原始的......如果您也註冊Any類型,它實際上可能會起作用。

相關問題