我希望創建一個自定義消息處理程序來使用流中的檢查點。此外,這些檢查點將存儲在ElasticSearch。如何在Spring Integration自定義消息處理程序中自動裝入Bean?
我創建了一個類檢查點:
@Component
public class Checkpoint {
public static final String TASK_HEADER_KEY = "task";
public static CheckpointMessageHandlerSpec warn(String message) {
return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message));
}
}
// ... methods omitted: error, info etc
接下來,我創建CheckpointMessageHandlerSpec:
public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> {
public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) {
this.target = checkpointHandler;
}
public CheckpointMessageHandlerSpec apply(Message<?> message) {
this.target.handleMessage(message);
return _this();
}
@Override
protected CheckpointHandler doGet() {
throw new UnsupportedOperationException();
}
}
CheckpointHandler,在這個類我想注入的東西,比如服務或資料庫來自Spring的數據:
public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler {
private String status;
private String message;
// I want inject services or repositories here
public CheckpointHandler(String status, String message) {
this.status = status;
this.message = message;
}
@Override
public void handleMessage(Message<?> message) {
// Test to watch if I have the bean factory. It is always null
this.getBeanFactory();
Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'");
// Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected
Object obj = expression.getValue(message);
}
}
最後,使用的例子,一個流之內:
@Bean
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) {
return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow"))
.enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName))
.handle(new AppendMessageHandler())
.wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)))
.handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload()))
.get();
}
private class AppendMessageHandler implements GenericHandler {
@Override
public String handle(Object payload, Map headers) {
return new StringBuilder().append(testMessage).toString();
}
}
我錯過?有可能這樣做嗎?我在這個問題後有這個想法How to create custom component and add it to flow in spring java dsl?
謝謝!