2017-06-15 20 views
0

任何人都可以幫助我從PubSub-Message接收屬性嗎?消息顯示正確,但沒有屬性。我接收級看起來是這樣的:如何接收PubSub中的屬性

public class ReceiveMessageServlet extends HttpServlet { 

@Override 
public final void doPost(final HttpServletRequest req, 
         final HttpServletResponse resp) 
     throws IOException { 
    // Validating unique subscription token before processing the message 
    String subscriptionToken = System.getProperty(
      Constants.BASE_PACKAGE + ".subscriptionUniqueToken"); 
    if (!subscriptionToken.equals(req.getParameter("token"))) { 
     resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); 
     resp.getWriter().close(); 
     return; 
    } 

    ServletInputStream inputStream = req.getInputStream(); 

    // Parse the JSON message to the POJO model class 
    JsonParser parser = JacksonFactory.getDefaultInstance() 
      .createJsonParser(inputStream); 
    parser.skipToKey("message"); 
    PubsubMessage message = parser.parseAndClose(PubsubMessage.class); 

    // Store the message in the datastore 
    Entity messageToStore = new Entity("PubsubMessage"); 
    messageToStore.setProperty("message", 
      new String(message.decodeData(), "UTF-8")); 
    messageToStore.setProperty("receipt-time", System.currentTimeMillis()); 
    DatastoreService datastore = 
      DatastoreServiceFactory.getDatastoreService(); 
    datastore.put(messageToStore); 

    // Invalidate the cache 
    MemcacheService memcacheService = 
      MemcacheServiceFactory.getMemcacheService(); 
    memcacheService.delete(Constants.MESSAGE_CACHE_KEY); 

    // Acknowledge the message by returning a success code 
    resp.setStatus(HttpServletResponse.SC_OK); 
    resp.getWriter().close(); 


} 

}

我與message.getAttributes嘗試過(),但沒有成功。

回答

0

您可能想要檢查添加的PubsubIO功能,該功能允許ReadWrite轉換提供對latest release中添加的Cloud Pub/Sub消息屬性的訪問。

嘗試使用withAttributes方法。這,

導致源返回包含Pubsub屬性的PubsubMessage。用戶必須提供解析函數以將PubsubMessage轉換爲輸出類型。輸出類型T的編碼器必須通過PCollection.setCoder(Coder)註冊或設置在輸出上。

希望它有幫助。

+0

非常感謝....這可能會在我的班級中調用此方法嗎?我以前沒有這樣做過,所以如果你能幫助我,我會非常高興 – user3732065

相關問題