使用Spring Cloud DataFlow 1.3.0.M2與Spring Cloud Stream Starters Celsius.M1。Spring Cloud DataFlow:獲取有效載荷作爲列表<Map>
我有兩個處理器。首先產生一個List<Map>
,它應該被另一個消耗掉。這裏是簡化的代碼。
// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
final List<Map<String, Object>> results = worker.doWork(payload);
LOG.debug("Returning " + results.size() + " objects");
return results;
}
// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
LOG.debug("Received " + payload.size() + " objects");
final List<Map<String, Object>> results = worker.moreWork(payload);
return results;
}
我部署這兩款處理器在使用新加坡民防部隊殼管道:
<source> | otherProcessors | processor1 | processor2 | log
調試消息處理器1表示,它已在列表2級的對象。處理器2的調試消息表明它收到了40個對象(每個映射有20個鍵=值對) - 看起來這兩個映射變成了一個key = value對的列表。
我已啓用對org.spring.integration
調試日誌記錄和顯示的消息具有的地圖格式的列表(這是從處理器2):
preSend on channel 'input', message: GenericMessage
[payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
{"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]
我想處理器2接收所產生的2個地圖處理器1.我不知道這是否與泛型類型有關。有人可以指向我的配置,以實現這一目標嗎?
----更新阿爾喬姆的評論----
處理器1具有這種在其application.properties
文件:
spring.cloud.stream.bindings.output.content-type=application/json
我還曾試圖改變這樣的流定義,但它沒」不像是會有所作爲:
<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log
你知道這看起來不像JSON,但的確如'GenericMessage.toString()'。你會介意在這個問題上分享你的頭像嗎?至少'contentType' –
是的,你在這些處理器上有什麼「輸入/輸出」配置?我的意思是'綁定'設置 –
更新後回答問題,阿爾喬姆,謝謝。 – user944849