2017-03-27 67 views
1

我向我的NiFi流程添加了SpringContextProcessor,它按預期執行並更新FlowFile內容和屬性。但在NiFi的而不是看到發送/接收數據出處部分,我看到NiFi SpringContextProcessor中斷流程

03/27/2017 11:47:57.164 MDT RECEIVE 42fa1c3f-edde-4cb7-8e73-ce752f7e3d66 
03/27/2017 11:47:57.163 MDT DROP 667094a7-8eef-4657-981a-dc9fdc6c4056 
03/27/2017 11:47:57.163 MDT SEND 667094a7-8eef-4657-981a-dc9fdc6c4056 

貌似是被丟棄原始消息,並通過新的消息取代。我在其他組件中沒有看到這種行爲,即它們似乎都保留了原始的Flow File UUID。 Spring處理器代碼的簡化版本:

@ServiceActivator(inputChannel = "fromNiFi", outputChannel = "toNiFi") 
public Message<byte[]> process1(Message<byte[]> inMessage) { 
    String inMessagePayload = new String(inMessage.getPayload()); 
    String userId = getUserIdFromDb(inMessagePayload); 
    String outMessagePayload = inMessagePayload + userId; 

    return MessageBuilder.withPayload(outMessagePayload.getBytes()) 
      .copyHeaders(inMessage.getHeaders()) 
      .setHeader("userId", userId) 
      .build(); 
} 

有沒有辦法在傳出消息中保留原始流文件UUID?

回答

1

這可能是我們目前的疏忽,所以是的,請舉出一個JIRA。 但是,作爲解決方法,您可以嘗試從傳入的郵件頭中提取FlowFile屬性,然後將它們傳播回傳出郵件。

public Message<byte[]> process1(Message<byte[]> inMessage) { 
    String myHeader = inMessage.getHeader("someHeader"); 
    . . . 
    return MessageBuilder.withPayload(outMessagePayload.getBytes()) 
      .copyHeaders(inMessage.getHeaders()) 
      .setHeader("userId", userId) 
      .setHeader("someHeader", myHeader) 
      .build(); 
} 
+0

你能指導我如何在Apache Nifi中實現SpringContextProcessor的任何鏈接..無法找到一個工作示例 – Ranjan