0

我開發了一個集成流程,我從MongoDbMessageSource獲得用戶,並且爲每個與用戶關聯的社交媒體獲得發給他的評論。Spring集成MongoDbStoringMessageHandler ClassCastException:BasicDBObject不能轉換爲BasicDBList

這些評論我想堅持他們在MongoDB中與MongoDbStoringMessageHandler連接通道storeChannel幫助。

的流程如下:

@Configuration 
@IntegrationComponentScan 
public class InfrastructureConfiguration { 

    private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class); 

    /** 
    * The Pollers builder factory can be used to configure common bean definitions or 
    * those created from IntegrationFlowBuilder EIP-methods 
    */ 
    @Bean(name = PollerMetadata.DEFAULT_POLLER) 
    public PollerMetadata poller() { 
     return Pollers.fixedDelay(10, TimeUnit.SECONDS).get(); 
    } 

    @Bean 
    public TaskExecutor taskExecutor() { 
     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(5); 
     executor.setMaxPoolSize(10); 
     executor.setQueueCapacity(25); 
     return executor; 
    } 

    /** 
    * 
    * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
    * which is the result of execution of a Query 
    */ 
    @Bean 
    @Autowired 
    public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) { 
     MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}")); 
     messageSource.setExpectSingleResult(false); 
     messageSource.setEntityClass(UserEntity.class); 
     messageSource.setCollectionNameExpression(new LiteralExpression("users")); 
     return messageSource; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "storeChannel") 
    public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception { 
     MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo); 
     adapter.setCollectionNameExpression(new LiteralExpression("comments")); 
     return adapter; 
    } 

    @Bean 
    @Autowired 
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) { 
     return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller)) 
       .<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList 
         -> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia)) 
       ) 
       .split(new AbstractMessageSplitter() { 
        @Override 
        protected Object splitMessage(Message<?> msg) { 
         return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet(); 
        } 
       }) 
       .channel("directChannel_1") 
       .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key"))) 
       .split(new AbstractMessageSplitter() { 
        @Override 
        protected Object splitMessage(Message<?> msg) { 
         return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue(); 
        } 
       }) 
       .channel(MessageChannels.executor("executorChannel", this.taskExecutor())) 
       .<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(), 
         m 
         -> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() { 
           @Override 
           public Object handle(SocialMediaEntity payload, Map<String, Object> headers) { 
            ObjectId userId = (ObjectId)headers.get("user-id"); 
            logger.info("TEST FACEBOOK Channel for user id: " + userId); 
            return Arrays.asList(new CommentEntity[] { 
             new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId), 
             new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId) 
            }); 
           } 
          })) 
          .subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() { 
           @Override 
           public Object handle(SocialMediaEntity payload, Map<String, Object> headers) { 
            ObjectId userId = (ObjectId)headers.get("user-id"); 
            logger.info("TEST YOUTUBE Channel for user id: " + userId); 
            return Arrays.asList(new CommentEntity[] { 
             new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId), 
             new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId) 
            }); 
           } 
          })) 
          .subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() { 
           @Override 
           public Object handle(SocialMediaEntity payload, Map<String, Object> headers) { 
            ObjectId userId = (ObjectId)headers.get("user-id"); 
            logger.info("TEST INSTAGRAM Channel for user id: " + userId); 
            return Arrays.asList(new CommentEntity[] { 
             new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId), 
             new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId) 
            }); 
           } 
          })) 
       ) 
       .channel("directChannel_2") 
       .aggregate() 
       .channel("directChannel_3") 
       .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
         comments.stream().flatMap(List::stream).collect(Collectors.toList())) 
       .aggregate() 
       .channel("directChannel_4") 
       .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
         comments.stream().flatMap(List::stream).collect(Collectors.toList())) 
       .channel("storeChannel") 
       .get(); 
    } 

} 

錯誤之前的調試消息是這些:

2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel : preSend on channel 'storeChannel', message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler : mongodbAdapter received message: GenericMessage [payload=[[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 

凡顯然,通道「storeChannel」來「CommentEntity」名單

@Document(collection="comments") 
public class CommentEntity { 

    @Id 
    private ObjectId id; 

    @Field("message") 
    private String message; 

    private ObjectId user; 

    @PersistenceConstructor 
    public CommentEntity(String message, ObjectId user) { 
     this.message = message; 
     this.user = user; 
    } 

    public ObjectId getId() { 
     return id; 
    } 

    public void setId(ObjectId id) { 
     this.id = id; 
    } 

    public String getMessage() { 
     return message; 
    } 

    public void setMessage(String message) { 
     this.message = message; 
    } 

    public ObjectId getUser() { 
     return user; 
    } 

    public void setUser(ObjectId user) { 
     this.user = user; 
    } 

} 

此異常則發生:

2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[[email protected], [email protected], [email protected], sanchez.sanchez.sergio.persistence.entity[email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}] 

我目前使用的是嵌入的MongoDB:

<dependency> 
    <groupId>de.flapdoodle.embed</groupId> 
    <artifactId>de.flapdoodle.embed.mongo</artifactId> 
</dependency> 

有誰知道我做錯了嗎?提前致謝。

回答

1

嗯,這顯然例外說,MongoDbStoringMessageHandler不支持收藏保存:

protected void handleMessageInternal(Message<?> message) throws Exception { 
    Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method"); 
    String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class); 
    Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null"); 

    Object payload = message.getPayload(); 

    this.mongoTemplate.save(payload, collectionName); 
} 

你不需要.aggregate()建立藏品保存。你只能一個接一個地保存它們。

我認爲這應該是一個很好的補充,讓該組件來執行:

/** 
* Insert a mixed Collection of objects into a database collection determining the collection name to use based on the 
* class. 
* 
* @param collectionToSave the list of objects to save. 
*/ 
void insertAll(Collection<? extends Object> objectsToSave); 

請,提高對此事進行了JIRA和貢獻,不要猶豫!

+0

感謝您的回覆!你可以通過Spring Data Repository來完成嗎?它會優雅嗎? –

+1

存儲庫與Spring集成目標無關。這是與協議無關的消息傳遞,以及一個由POJO交互驅動的域 –

相關問題