2016-08-23 79 views
2

我希望創建一個自定義消息處理程序來使用流中的檢查點。此外,這些檢查點將存儲在ElasticSearch如何在Spring Integration自定義消息處理程序中自動裝入Bean?

我創建了一個類檢查點

@Component 
public class Checkpoint { 

    public static final String TASK_HEADER_KEY = "task"; 

    public static CheckpointMessageHandlerSpec warn(String message) { 
     return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message)); 
    } 
} 
// ... methods omitted: error, info etc 

接下來,我創建CheckpointMessageHandlerSpec

public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> { 

    public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) { 
     this.target = checkpointHandler; 
    } 

    public CheckpointMessageHandlerSpec apply(Message<?> message) { 
     this.target.handleMessage(message); 
     return _this(); 
    } 

    @Override 
    protected CheckpointHandler doGet() { 
     throw new UnsupportedOperationException(); 
    } 
} 

CheckpointHandler,在這個類我想注入的東西,比如服務或資料庫來自Spring的數據:

public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler { 

    private String status; 
    private String message; 

    // I want inject services or repositories here 

    public CheckpointHandler(String status, String message) { 
     this.status = status; 
     this.message = message; 
    } 

    @Override 
    public void handleMessage(Message<?> message) { 
     // Test to watch if I have the bean factory. It is always null 
     this.getBeanFactory(); 

     Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'"); 

     // Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected 
     Object obj = expression.getValue(message); 
    } 
} 

最後,使用的例子,一個流之內:

@Bean 
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) { 
    return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow")) 
      .enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName)) 
      .handle(new AppendMessageHandler()) 
      .wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m))) 
      .handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload())) 
      .get(); 
} 

private class AppendMessageHandler implements GenericHandler { 

    @Override 
    public String handle(Object payload, Map headers) { 
     return new StringBuilder().append(testMessage).toString(); 
    } 
} 

我錯過?有可能這樣做嗎?我在這個問題後有這個想法How to create custom component and add it to flow in spring java dsl?

謝謝!

回答

1

Bean可以自動佈線,如果它們是,那麼就是bean。

讓我們再看看你的代碼!

c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)) 

真正的豆在這裏正是Lambda :)。當然,可悲,但不是你的自定義工廠,隨後apply()。您的自定義代碼將在每個傳入消息的目標Lambda中完全調用,但不知道BeanFactory

要解決你的問題,你應該用你的工廠如:

.wireTap(c -> c.handle(checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '"))) 

和框架,將需要大約爲豆你註冊並CheckpointHandler因此,自動裝配照顧。

正如你可能已經猜到,你不需要apply()方法。僅僅因爲需要區分彙編階段,當Java DSL填充bean的樹時。初始化和註冊階段,當這個樹被框架分析並且bean被註冊在應用程序上下文中時。最後,還有一個運行階段,即當消息從一個通道傳送到另一個通道時,雖然所有這些消息處理程序,變換器等都有。

相關問題