0
我開發了一個集成流程,我從MongoDbMessageSource獲得用戶,並且爲每個與用戶關聯的社交媒體獲得發給他的評論。Spring集成MongoDbStoringMessageHandler ClassCastException:BasicDBObject不能轉換爲BasicDBList
這些評論我想堅持他們在MongoDB中與MongoDbStoringMessageHandler連接通道storeChannel的幫助。
的流程如下:
@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {
private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);
/**
* The Pollers builder factory can be used to configure common bean definitions or
* those created from IntegrationFlowBuilder EIP-methods
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
/**
*
* MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload
* which is the result of execution of a Query
*/
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setExpectSingleResult(false);
messageSource.setEntityClass(UserEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("users"));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "storeChannel")
public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
adapter.setCollectionNameExpression(new LiteralExpression("comments"));
return adapter;
}
@Bean
@Autowired
public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
.<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
-> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
)
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
}
})
.channel("directChannel_1")
.enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
}
})
.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
.<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
m
-> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST FACEBOOK Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST YOUTUBE Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId)
});
}
}))
.subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
@Override
public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
ObjectId userId = (ObjectId)headers.get("user-id");
logger.info("TEST INSTAGRAM Channel for user id: " + userId);
return Arrays.asList(new CommentEntity[] {
new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId),
new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId)
});
}
}))
)
.channel("directChannel_2")
.aggregate()
.channel("directChannel_3")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.aggregate()
.channel("directChannel_4")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.channel("storeChannel")
.get();
}
}
錯誤之前的調試消息是這些:
2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel : preSend on channel 'storeChannel', message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler : mongodbAdapter received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
凡顯然,通道「storeChannel」來「CommentEntity」名單
@Document(collection="comments")
public class CommentEntity {
@Id
private ObjectId id;
@Field("message")
private String message;
private ObjectId user;
@PersistenceConstructor
public CommentEntity(String message, ObjectId user) {
this.message = message;
this.user = user;
}
public ObjectId getId() {
return id;
}
public void setId(ObjectId id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public ObjectId getUser() {
return user;
}
public void setUser(ObjectId user) {
this.user = user;
}
}
此異常則發生:
2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[[email protected], [email protected], [email protected], sanchez.sanchez.sergio.persistence.entity[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
我目前使用的是嵌入的MongoDB:
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>
有誰知道我做錯了嗎?提前致謝。
感謝您的回覆!你可以通過Spring Data Repository來完成嗎?它會優雅嗎? –
存儲庫與Spring集成目標無關。這是與協議無關的消息傳遞,以及一個由POJO交互驅動的域 –